mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-02-15 07:25:14 -05:00
Compare commits
4 Commits
docs/docum
...
pulltip
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9692993b6a | ||
|
|
2ac30f5ce6 | ||
|
|
7418c00ad6 | ||
|
|
66342655fd |
@@ -88,9 +88,6 @@ func (s *Store) pullTips(state state.BeaconState, node *Node, jc, fc *ethpb.Chec
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if uf.Epoch > s.unrealizedFinalizedCheckpoint.Epoch {
|
if uf.Epoch > s.unrealizedFinalizedCheckpoint.Epoch {
|
||||||
s.unrealizedJustifiedCheckpoint = &forkchoicetypes.Checkpoint{
|
|
||||||
Epoch: uj.Epoch, Root: bytesutil.ToBytes32(uj.Root),
|
|
||||||
}
|
|
||||||
s.unrealizedFinalizedCheckpoint = &forkchoicetypes.Checkpoint{
|
s.unrealizedFinalizedCheckpoint = &forkchoicetypes.Checkpoint{
|
||||||
Epoch: uf.Epoch, Root: bytesutil.ToBytes32(uf.Root),
|
Epoch: uf.Epoch, Root: bytesutil.ToBytes32(uf.Root),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -130,6 +130,10 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) {
|
|||||||
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitAttestationsV2")
|
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitAttestationsV2")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
if shared.IsSyncing(ctx, w, s.SyncChecker, s.HeadFetcher, s.TimeFetcher, s.OptimisticModeFetcher) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
versionHeader := r.Header.Get(api.VersionHeader)
|
versionHeader := r.Header.Get(api.VersionHeader)
|
||||||
if versionHeader == "" {
|
if versionHeader == "" {
|
||||||
httputil.HandleError(w, api.VersionHeader+" header is required", http.StatusBadRequest)
|
httputil.HandleError(w, api.VersionHeader+" header is required", http.StatusBadRequest)
|
||||||
@@ -238,22 +242,14 @@ func (s *Server) handleAttestationsElectra(
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
targetState, err := s.AttestationStateFetcher.AttestationTargetState(ctx, singleAtt.Data.Target)
|
// Broadcast first using CommitteeId directly (fast path)
|
||||||
if err != nil {
|
// This matches gRPC behavior and avoids blocking on state fetching
|
||||||
return nil, nil, errors.Wrap(err, "could not get target state for attestation")
|
wantedEpoch := slots.ToEpoch(singleAtt.Data.Slot)
|
||||||
}
|
|
||||||
committee, err := corehelpers.BeaconCommitteeFromState(ctx, targetState, singleAtt.Data.Slot, singleAtt.CommitteeId)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, errors.Wrap(err, "could not get committee for attestation")
|
|
||||||
}
|
|
||||||
att := singleAtt.ToAttestationElectra(committee)
|
|
||||||
|
|
||||||
wantedEpoch := slots.ToEpoch(att.Data.Slot)
|
|
||||||
vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
|
vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, errors.Wrap(err, "could not get head validator indices")
|
return nil, nil, errors.Wrap(err, "could not get head validator indices")
|
||||||
}
|
}
|
||||||
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.GetCommitteeIndex(), att.Data.Slot)
|
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), singleAtt.CommitteeId, singleAtt.Data.Slot)
|
||||||
if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, singleAtt); err != nil {
|
if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, singleAtt); err != nil {
|
||||||
failedBroadcasts = append(failedBroadcasts, &server.IndexedError{
|
failedBroadcasts = append(failedBroadcasts, &server.IndexedError{
|
||||||
Index: i,
|
Index: i,
|
||||||
@@ -264,6 +260,23 @@ func (s *Server) handleAttestationsElectra(
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save to pool after broadcast (slow path - requires state fetching)
|
||||||
|
// Run in goroutine to avoid blocking the HTTP response
|
||||||
|
go func() {
|
||||||
|
for _, singleAtt := range validAttestations {
|
||||||
|
targetState, err := s.AttestationStateFetcher.AttestationTargetState(context.Background(), singleAtt.Data.Target)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("Could not get target state for attestation")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
committee, err := corehelpers.BeaconCommitteeFromState(context.Background(), targetState, singleAtt.Data.Slot, singleAtt.CommitteeId)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("Could not get committee for attestation")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
att := singleAtt.ToAttestationElectra(committee)
|
||||||
|
|
||||||
if features.Get().EnableExperimentalAttestationPool {
|
if features.Get().EnableExperimentalAttestationPool {
|
||||||
if err = s.AttestationCache.Add(att); err != nil {
|
if err = s.AttestationCache.Add(att); err != nil {
|
||||||
@@ -275,6 +288,7 @@ func (s *Server) handleAttestationsElectra(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
if len(failedBroadcasts) > 0 {
|
if len(failedBroadcasts) > 0 {
|
||||||
log.WithFields(logrus.Fields{
|
log.WithFields(logrus.Fields{
|
||||||
@@ -470,6 +484,10 @@ func (s *Server) SubmitSyncCommitteeSignatures(w http.ResponseWriter, r *http.Re
|
|||||||
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitPoolSyncCommitteeSignatures")
|
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitPoolSyncCommitteeSignatures")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
if shared.IsSyncing(ctx, w, s.SyncChecker, s.HeadFetcher, s.TimeFetcher, s.OptimisticModeFetcher) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var req structs.SubmitSyncCommitteeSignaturesRequest
|
var req structs.SubmitSyncCommitteeSignaturesRequest
|
||||||
err := json.NewDecoder(r.Body).Decode(&req.Data)
|
err := json.NewDecoder(r.Body).Decode(&req.Data)
|
||||||
switch {
|
switch {
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ import (
|
|||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits/mock"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits/mock"
|
||||||
p2pMock "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing"
|
p2pMock "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing"
|
||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core"
|
||||||
|
mockSync "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/initial-sync/testing"
|
||||||
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
|
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
|
||||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||||
@@ -622,6 +623,8 @@ func TestSubmitAttestationsV2(t *testing.T) {
|
|||||||
HeadFetcher: chainService,
|
HeadFetcher: chainService,
|
||||||
ChainInfoFetcher: chainService,
|
ChainInfoFetcher: chainService,
|
||||||
TimeFetcher: chainService,
|
TimeFetcher: chainService,
|
||||||
|
OptimisticModeFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
OperationNotifier: &blockchainmock.MockOperationNotifier{},
|
OperationNotifier: &blockchainmock.MockOperationNotifier{},
|
||||||
AttestationStateFetcher: chainService,
|
AttestationStateFetcher: chainService,
|
||||||
}
|
}
|
||||||
@@ -654,6 +657,7 @@ func TestSubmitAttestationsV2(t *testing.T) {
|
|||||||
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
|
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
|
||||||
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
|
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
|
||||||
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
|
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
|
||||||
|
time.Sleep(100 * time.Millisecond) // Wait for async pool save
|
||||||
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
|
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
|
||||||
})
|
})
|
||||||
t.Run("multiple", func(t *testing.T) {
|
t.Run("multiple", func(t *testing.T) {
|
||||||
@@ -673,6 +677,7 @@ func TestSubmitAttestationsV2(t *testing.T) {
|
|||||||
assert.Equal(t, http.StatusOK, writer.Code)
|
assert.Equal(t, http.StatusOK, writer.Code)
|
||||||
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
|
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
|
||||||
assert.Equal(t, 2, broadcaster.NumAttestations())
|
assert.Equal(t, 2, broadcaster.NumAttestations())
|
||||||
|
time.Sleep(100 * time.Millisecond) // Wait for async pool save
|
||||||
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
|
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
|
||||||
})
|
})
|
||||||
t.Run("phase0 att post electra", func(t *testing.T) {
|
t.Run("phase0 att post electra", func(t *testing.T) {
|
||||||
@@ -793,6 +798,7 @@ func TestSubmitAttestationsV2(t *testing.T) {
|
|||||||
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
|
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
|
||||||
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
|
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
|
||||||
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
|
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
|
||||||
|
time.Sleep(100 * time.Millisecond) // Wait for async pool save
|
||||||
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
|
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
|
||||||
})
|
})
|
||||||
t.Run("multiple", func(t *testing.T) {
|
t.Run("multiple", func(t *testing.T) {
|
||||||
@@ -812,6 +818,7 @@ func TestSubmitAttestationsV2(t *testing.T) {
|
|||||||
assert.Equal(t, http.StatusOK, writer.Code)
|
assert.Equal(t, http.StatusOK, writer.Code)
|
||||||
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
|
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
|
||||||
assert.Equal(t, 2, broadcaster.NumAttestations())
|
assert.Equal(t, 2, broadcaster.NumAttestations())
|
||||||
|
time.Sleep(100 * time.Millisecond) // Wait for async pool save
|
||||||
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
|
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
|
||||||
})
|
})
|
||||||
t.Run("no body", func(t *testing.T) {
|
t.Run("no body", func(t *testing.T) {
|
||||||
@@ -861,6 +868,27 @@ func TestSubmitAttestationsV2(t *testing.T) {
|
|||||||
assert.Equal(t, true, strings.Contains(e.Failures[0].Message, "Incorrect attestation signature"))
|
assert.Equal(t, true, strings.Contains(e.Failures[0].Message, "Incorrect attestation signature"))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
t.Run("syncing", func(t *testing.T) {
|
||||||
|
chainService := &blockchainmock.ChainService{}
|
||||||
|
s := &Server{
|
||||||
|
HeadFetcher: chainService,
|
||||||
|
TimeFetcher: chainService,
|
||||||
|
OptimisticModeFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: true},
|
||||||
|
}
|
||||||
|
|
||||||
|
var body bytes.Buffer
|
||||||
|
_, err := body.WriteString(singleAtt)
|
||||||
|
require.NoError(t, err)
|
||||||
|
request := httptest.NewRequest(http.MethodPost, "http://example.com", &body)
|
||||||
|
request.Header.Set(api.VersionHeader, version.String(version.Phase0))
|
||||||
|
writer := httptest.NewRecorder()
|
||||||
|
writer.Body = &bytes.Buffer{}
|
||||||
|
|
||||||
|
s.SubmitAttestationsV2(writer, request)
|
||||||
|
assert.Equal(t, http.StatusServiceUnavailable, writer.Code)
|
||||||
|
assert.Equal(t, true, strings.Contains(writer.Body.String(), "Beacon node is currently syncing"))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestListVoluntaryExits(t *testing.T) {
|
func TestListVoluntaryExits(t *testing.T) {
|
||||||
@@ -1057,14 +1085,19 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("single", func(t *testing.T) {
|
t.Run("single", func(t *testing.T) {
|
||||||
broadcaster := &p2pMock.MockBroadcaster{}
|
broadcaster := &p2pMock.MockBroadcaster{}
|
||||||
|
chainService := &blockchainmock.ChainService{
|
||||||
|
State: st,
|
||||||
|
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
|
||||||
|
}
|
||||||
s := &Server{
|
s := &Server{
|
||||||
|
HeadFetcher: chainService,
|
||||||
|
TimeFetcher: chainService,
|
||||||
|
OptimisticModeFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
CoreService: &core.Service{
|
CoreService: &core.Service{
|
||||||
SyncCommitteePool: synccommittee.NewStore(),
|
SyncCommitteePool: synccommittee.NewStore(),
|
||||||
P2P: broadcaster,
|
P2P: broadcaster,
|
||||||
HeadFetcher: &blockchainmock.ChainService{
|
HeadFetcher: chainService,
|
||||||
State: st,
|
|
||||||
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1089,14 +1122,19 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
|
|||||||
})
|
})
|
||||||
t.Run("multiple", func(t *testing.T) {
|
t.Run("multiple", func(t *testing.T) {
|
||||||
broadcaster := &p2pMock.MockBroadcaster{}
|
broadcaster := &p2pMock.MockBroadcaster{}
|
||||||
|
chainService := &blockchainmock.ChainService{
|
||||||
|
State: st,
|
||||||
|
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
|
||||||
|
}
|
||||||
s := &Server{
|
s := &Server{
|
||||||
|
HeadFetcher: chainService,
|
||||||
|
TimeFetcher: chainService,
|
||||||
|
OptimisticModeFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
CoreService: &core.Service{
|
CoreService: &core.Service{
|
||||||
SyncCommitteePool: synccommittee.NewStore(),
|
SyncCommitteePool: synccommittee.NewStore(),
|
||||||
P2P: broadcaster,
|
P2P: broadcaster,
|
||||||
HeadFetcher: &blockchainmock.ChainService{
|
HeadFetcher: chainService,
|
||||||
State: st,
|
|
||||||
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1120,13 +1158,18 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
|
|||||||
})
|
})
|
||||||
t.Run("invalid", func(t *testing.T) {
|
t.Run("invalid", func(t *testing.T) {
|
||||||
broadcaster := &p2pMock.MockBroadcaster{}
|
broadcaster := &p2pMock.MockBroadcaster{}
|
||||||
|
chainService := &blockchainmock.ChainService{
|
||||||
|
State: st,
|
||||||
|
}
|
||||||
s := &Server{
|
s := &Server{
|
||||||
|
HeadFetcher: chainService,
|
||||||
|
TimeFetcher: chainService,
|
||||||
|
OptimisticModeFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
CoreService: &core.Service{
|
CoreService: &core.Service{
|
||||||
SyncCommitteePool: synccommittee.NewStore(),
|
SyncCommitteePool: synccommittee.NewStore(),
|
||||||
P2P: broadcaster,
|
P2P: broadcaster,
|
||||||
HeadFetcher: &blockchainmock.ChainService{
|
HeadFetcher: chainService,
|
||||||
State: st,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1149,7 +1192,13 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
|
|||||||
assert.Equal(t, false, broadcaster.BroadcastCalled.Load())
|
assert.Equal(t, false, broadcaster.BroadcastCalled.Load())
|
||||||
})
|
})
|
||||||
t.Run("empty", func(t *testing.T) {
|
t.Run("empty", func(t *testing.T) {
|
||||||
s := &Server{}
|
chainService := &blockchainmock.ChainService{State: st}
|
||||||
|
s := &Server{
|
||||||
|
HeadFetcher: chainService,
|
||||||
|
TimeFetcher: chainService,
|
||||||
|
OptimisticModeFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
|
}
|
||||||
|
|
||||||
var body bytes.Buffer
|
var body bytes.Buffer
|
||||||
_, err := body.WriteString("[]")
|
_, err := body.WriteString("[]")
|
||||||
@@ -1166,7 +1215,13 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
|
|||||||
assert.Equal(t, true, strings.Contains(e.Message, "No data submitted"))
|
assert.Equal(t, true, strings.Contains(e.Message, "No data submitted"))
|
||||||
})
|
})
|
||||||
t.Run("no body", func(t *testing.T) {
|
t.Run("no body", func(t *testing.T) {
|
||||||
s := &Server{}
|
chainService := &blockchainmock.ChainService{State: st}
|
||||||
|
s := &Server{
|
||||||
|
HeadFetcher: chainService,
|
||||||
|
TimeFetcher: chainService,
|
||||||
|
OptimisticModeFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
|
}
|
||||||
|
|
||||||
request := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
|
request := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
|
||||||
writer := httptest.NewRecorder()
|
writer := httptest.NewRecorder()
|
||||||
@@ -1179,6 +1234,26 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
|
|||||||
assert.Equal(t, http.StatusBadRequest, e.Code)
|
assert.Equal(t, http.StatusBadRequest, e.Code)
|
||||||
assert.Equal(t, true, strings.Contains(e.Message, "No data submitted"))
|
assert.Equal(t, true, strings.Contains(e.Message, "No data submitted"))
|
||||||
})
|
})
|
||||||
|
t.Run("syncing", func(t *testing.T) {
|
||||||
|
chainService := &blockchainmock.ChainService{State: st}
|
||||||
|
s := &Server{
|
||||||
|
HeadFetcher: chainService,
|
||||||
|
TimeFetcher: chainService,
|
||||||
|
OptimisticModeFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: true},
|
||||||
|
}
|
||||||
|
|
||||||
|
var body bytes.Buffer
|
||||||
|
_, err := body.WriteString(singleSyncCommitteeMsg)
|
||||||
|
require.NoError(t, err)
|
||||||
|
request := httptest.NewRequest(http.MethodPost, "http://example.com", &body)
|
||||||
|
writer := httptest.NewRecorder()
|
||||||
|
writer.Body = &bytes.Buffer{}
|
||||||
|
|
||||||
|
s.SubmitSyncCommitteeSignatures(writer, request)
|
||||||
|
assert.Equal(t, http.StatusServiceUnavailable, writer.Code)
|
||||||
|
assert.Equal(t, true, strings.Contains(writer.Body.String(), "Beacon node is currently syncing"))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestListBLSToExecutionChanges(t *testing.T) {
|
func TestListBLSToExecutionChanges(t *testing.T) {
|
||||||
|
|||||||
@@ -52,24 +52,27 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation
|
|||||||
ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestation")
|
ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestation")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
if vs.SyncChecker.Syncing() {
|
||||||
|
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
|
||||||
|
}
|
||||||
|
|
||||||
resp, err := vs.proposeAtt(ctx, att, att.GetData().CommitteeIndex)
|
resp, err := vs.proposeAtt(ctx, att, att.GetData().CommitteeIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
if features.Get().EnableExperimentalAttestationPool {
|
if features.Get().EnableExperimentalAttestationPool {
|
||||||
if err = vs.AttestationCache.Add(att); err != nil {
|
if err := vs.AttestationCache.Add(att); err != nil {
|
||||||
log.WithError(err).Error("Could not save attestation")
|
log.WithError(err).Error("Could not save attestation")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
go func() {
|
|
||||||
attCopy := att.Copy()
|
attCopy := att.Copy()
|
||||||
if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil {
|
if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil {
|
||||||
log.WithError(err).Error("Could not save unaggregated attestation")
|
log.WithError(err).Error("Could not save unaggregated attestation")
|
||||||
return
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
@@ -82,6 +85,10 @@ func (vs *Server) ProposeAttestationElectra(ctx context.Context, singleAtt *ethp
|
|||||||
ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestationElectra")
|
ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestationElectra")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
if vs.SyncChecker.Syncing() {
|
||||||
|
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
|
||||||
|
}
|
||||||
|
|
||||||
resp, err := vs.proposeAtt(ctx, singleAtt, singleAtt.GetCommitteeIndex())
|
resp, err := vs.proposeAtt(ctx, singleAtt, singleAtt.GetCommitteeIndex())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -98,18 +105,17 @@ func (vs *Server) ProposeAttestationElectra(ctx context.Context, singleAtt *ethp
|
|||||||
|
|
||||||
singleAttCopy := singleAtt.Copy()
|
singleAttCopy := singleAtt.Copy()
|
||||||
att := singleAttCopy.ToAttestationElectra(committee)
|
att := singleAttCopy.ToAttestationElectra(committee)
|
||||||
|
go func() {
|
||||||
if features.Get().EnableExperimentalAttestationPool {
|
if features.Get().EnableExperimentalAttestationPool {
|
||||||
if err = vs.AttestationCache.Add(att); err != nil {
|
if err := vs.AttestationCache.Add(att); err != nil {
|
||||||
log.WithError(err).Error("Could not save attestation")
|
log.WithError(err).Error("Could not save attestation")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
go func() {
|
|
||||||
if err := vs.AttPool.SaveUnaggregatedAttestation(att); err != nil {
|
if err := vs.AttPool.SaveUnaggregatedAttestation(att); err != nil {
|
||||||
log.WithError(err).Error("Could not save unaggregated attestation")
|
log.WithError(err).Error("Could not save unaggregated attestation")
|
||||||
return
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -38,6 +38,7 @@ func TestProposeAttestation(t *testing.T) {
|
|||||||
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
|
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
|
||||||
TimeFetcher: chainService,
|
TimeFetcher: chainService,
|
||||||
AttestationStateFetcher: chainService,
|
AttestationStateFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
}
|
}
|
||||||
head := util.NewBeaconBlock()
|
head := util.NewBeaconBlock()
|
||||||
head.Block.Slot = 999
|
head.Block.Slot = 999
|
||||||
@@ -141,6 +142,7 @@ func TestProposeAttestation_IncorrectSignature(t *testing.T) {
|
|||||||
P2P: &mockp2p.MockBroadcaster{},
|
P2P: &mockp2p.MockBroadcaster{},
|
||||||
AttPool: attestations.NewPool(),
|
AttPool: attestations.NewPool(),
|
||||||
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
|
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
}
|
}
|
||||||
|
|
||||||
req := util.HydrateAttestation(ðpb.Attestation{})
|
req := util.HydrateAttestation(ðpb.Attestation{})
|
||||||
@@ -149,6 +151,37 @@ func TestProposeAttestation_IncorrectSignature(t *testing.T) {
|
|||||||
assert.ErrorContains(t, wanted, err)
|
assert.ErrorContains(t, wanted, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestProposeAttestation_Syncing(t *testing.T) {
|
||||||
|
attesterServer := &Server{
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: true},
|
||||||
|
}
|
||||||
|
|
||||||
|
req := util.HydrateAttestation(ðpb.Attestation{})
|
||||||
|
_, err := attesterServer.ProposeAttestation(t.Context(), req)
|
||||||
|
assert.ErrorContains(t, "Syncing to latest head", err)
|
||||||
|
s, ok := status.FromError(err)
|
||||||
|
require.Equal(t, true, ok)
|
||||||
|
assert.Equal(t, codes.Unavailable, s.Code())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProposeAttestationElectra_Syncing(t *testing.T) {
|
||||||
|
attesterServer := &Server{
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: true},
|
||||||
|
}
|
||||||
|
|
||||||
|
req := ðpb.SingleAttestation{
|
||||||
|
Data: ðpb.AttestationData{
|
||||||
|
Source: ðpb.Checkpoint{Root: make([]byte, 32)},
|
||||||
|
Target: ðpb.Checkpoint{Root: make([]byte, 32)},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, err := attesterServer.ProposeAttestationElectra(t.Context(), req)
|
||||||
|
assert.ErrorContains(t, "Syncing to latest head", err)
|
||||||
|
s, ok := status.FromError(err)
|
||||||
|
require.Equal(t, true, ok)
|
||||||
|
assert.Equal(t, codes.Unavailable, s.Code())
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetAttestationData_OK(t *testing.T) {
|
func TestGetAttestationData_OK(t *testing.T) {
|
||||||
block := util.NewBeaconBlock()
|
block := util.NewBeaconBlock()
|
||||||
block.Block.Slot = 3*params.BeaconConfig().SlotsPerEpoch + 1
|
block.Block.Slot = 3*params.BeaconConfig().SlotsPerEpoch + 1
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package sync
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"maps"
|
"maps"
|
||||||
"slices"
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -243,8 +244,10 @@ func requestDirectSidecarsFromPeers(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Compute missing indices by root, excluding those already in storage.
|
// Compute missing indices by root, excluding those already in storage.
|
||||||
|
var lastRoot [fieldparams.RootLength]byte
|
||||||
missingIndicesByRoot := make(map[[fieldparams.RootLength]byte]map[uint64]bool, len(incompleteRoots))
|
missingIndicesByRoot := make(map[[fieldparams.RootLength]byte]map[uint64]bool, len(incompleteRoots))
|
||||||
for root := range incompleteRoots {
|
for root := range incompleteRoots {
|
||||||
|
lastRoot = root
|
||||||
storedIndices := storedIndicesByRoot[root]
|
storedIndices := storedIndicesByRoot[root]
|
||||||
|
|
||||||
missingIndices := make(map[uint64]bool, len(requestedIndices))
|
missingIndices := make(map[uint64]bool, len(requestedIndices))
|
||||||
@@ -259,6 +262,7 @@ func requestDirectSidecarsFromPeers(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
initialMissingRootCount := len(missingIndicesByRoot)
|
||||||
initialMissingCount := computeTotalCount(missingIndicesByRoot)
|
initialMissingCount := computeTotalCount(missingIndicesByRoot)
|
||||||
|
|
||||||
indicesByRootByPeer, err := computeIndicesByRootByPeer(params.P2P, slotByRoot, missingIndicesByRoot, connectedPeers)
|
indicesByRootByPeer, err := computeIndicesByRootByPeer(params.P2P, slotByRoot, missingIndicesByRoot, connectedPeers)
|
||||||
@@ -301,11 +305,19 @@ func requestDirectSidecarsFromPeers(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithFields(logrus.Fields{
|
log := log.WithFields(logrus.Fields{
|
||||||
"duration": time.Since(start),
|
"duration": time.Since(start),
|
||||||
|
"initialMissingRootCount": initialMissingRootCount,
|
||||||
"initialMissingCount": initialMissingCount,
|
"initialMissingCount": initialMissingCount,
|
||||||
|
"finalMissingRootCount": len(missingIndicesByRoot),
|
||||||
"finalMissingCount": computeTotalCount(missingIndicesByRoot),
|
"finalMissingCount": computeTotalCount(missingIndicesByRoot),
|
||||||
}).Debug("Requested direct data column sidecars from peers")
|
})
|
||||||
|
|
||||||
|
if initialMissingRootCount == 1 {
|
||||||
|
log = log.WithField("root", fmt.Sprintf("%#x", lastRoot))
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug("Requested direct data column sidecars from peers")
|
||||||
|
|
||||||
return verifiedColumnsByRoot, nil
|
return verifiedColumnsByRoot, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,9 +3,9 @@ package sync
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
|
||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/blocks"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/blocks"
|
||||||
@@ -21,13 +21,23 @@ import (
|
|||||||
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
|
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
|
||||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||||
"github.com/OffchainLabs/prysm/v7/runtime/version"
|
"github.com/OffchainLabs/prysm/v7/runtime/version"
|
||||||
"github.com/OffchainLabs/prysm/v7/time"
|
|
||||||
"github.com/OffchainLabs/prysm/v7/time/slots"
|
"github.com/OffchainLabs/prysm/v7/time/slots"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
var pendingAttsLimit = 32768
|
const pendingAttsLimit = 32768
|
||||||
|
|
||||||
|
// aggregatorIndexFilter defines how aggregator index should be handled in equality checks.
|
||||||
|
type aggregatorIndexFilter int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ignoreAggregatorIndex means aggregates differing only by aggregator index are considered equal.
|
||||||
|
ignoreAggregatorIndex aggregatorIndexFilter = iota
|
||||||
|
// includeAggregatorIndex means aggregator index must also match for aggregates to be considered equal.
|
||||||
|
includeAggregatorIndex
|
||||||
|
)
|
||||||
|
|
||||||
// This method processes pending attestations as a "known" block as arrived. With validations,
|
// This method processes pending attestations as a "known" block as arrived. With validations,
|
||||||
// the valid attestations get saved into the operation mem pool, and the invalid attestations gets deleted
|
// the valid attestations get saved into the operation mem pool, and the invalid attestations gets deleted
|
||||||
@@ -50,16 +60,7 @@ func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte
|
|||||||
attestations := s.blkRootToPendingAtts[bRoot]
|
attestations := s.blkRootToPendingAtts[bRoot]
|
||||||
s.pendingAttsLock.RUnlock()
|
s.pendingAttsLock.RUnlock()
|
||||||
|
|
||||||
if len(attestations) > 0 {
|
|
||||||
start := time.Now()
|
|
||||||
s.processAttestations(ctx, attestations)
|
s.processAttestations(ctx, attestations)
|
||||||
duration := time.Since(start)
|
|
||||||
log.WithFields(logrus.Fields{
|
|
||||||
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
|
|
||||||
"pendingAttsCount": len(attestations),
|
|
||||||
"duration": duration,
|
|
||||||
}).Debug("Verified and saved pending attestations to pool")
|
|
||||||
}
|
|
||||||
|
|
||||||
randGen := rand.NewGenerator()
|
randGen := rand.NewGenerator()
|
||||||
// Delete the missing block root key from pending attestation queue so a node will not request for the block again.
|
// Delete the missing block root key from pending attestation queue so a node will not request for the block again.
|
||||||
@@ -79,26 +80,71 @@ func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte
|
|||||||
return s.sendBatchRootRequest(ctx, pendingRoots, randGen)
|
return s.sendBatchRootRequest(ctx, pendingRoots, randGen)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// processAttestations processes a list of attestations.
|
||||||
|
// It assumes (for logging purposes only) that all attestations pertain to the same block.
|
||||||
func (s *Service) processAttestations(ctx context.Context, attestations []any) {
|
func (s *Service) processAttestations(ctx context.Context, attestations []any) {
|
||||||
if len(attestations) == 0 {
|
if len(attestations) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
firstAttestation := attestations[0]
|
||||||
|
var blockRoot []byte
|
||||||
|
switch v := firstAttestation.(type) {
|
||||||
|
case ethpb.Att:
|
||||||
|
blockRoot = v.GetData().BeaconBlockRoot
|
||||||
|
case ethpb.SignedAggregateAttAndProof:
|
||||||
|
blockRoot = v.AggregateAttestationAndProof().AggregateVal().GetData().BeaconBlockRoot
|
||||||
|
default:
|
||||||
|
log.Warnf("Unexpected attestation type %T, skipping processing", v)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
validAggregates := make([]ethpb.SignedAggregateAttAndProof, 0, len(attestations))
|
||||||
|
startAggregate := time.Now()
|
||||||
atts := make([]ethpb.Att, 0, len(attestations))
|
atts := make([]ethpb.Att, 0, len(attestations))
|
||||||
|
aggregateAttAndProofCount := 0
|
||||||
for _, att := range attestations {
|
for _, att := range attestations {
|
||||||
switch v := att.(type) {
|
switch v := att.(type) {
|
||||||
case ethpb.Att:
|
case ethpb.Att:
|
||||||
atts = append(atts, v)
|
atts = append(atts, v)
|
||||||
case ethpb.SignedAggregateAttAndProof:
|
case ethpb.SignedAggregateAttAndProof:
|
||||||
s.processAggregate(ctx, v)
|
aggregateAttAndProofCount++
|
||||||
|
// Avoid processing multiple aggregates only differing by aggregator index.
|
||||||
|
if slices.ContainsFunc(validAggregates, func(other ethpb.SignedAggregateAttAndProof) bool {
|
||||||
|
return pendingAggregatesAreEqual(v, other, ignoreAggregatorIndex)
|
||||||
|
}) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.processAggregate(ctx, v); err != nil {
|
||||||
|
log.WithError(err).Debug("Pending aggregate attestation could not be processed")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
validAggregates = append(validAggregates, v)
|
||||||
default:
|
default:
|
||||||
log.Warnf("Unexpected attestation type %T, skipping", v)
|
log.Warnf("Unexpected attestation type %T, skipping", v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
durationAggregateAttAndProof := time.Since(startAggregate)
|
||||||
|
|
||||||
|
startAtts := time.Now()
|
||||||
for _, bucket := range bucketAttestationsByData(atts) {
|
for _, bucket := range bucketAttestationsByData(atts) {
|
||||||
s.processAttestationBucket(ctx, bucket)
|
s.processAttestationBucket(ctx, bucket)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
durationAtts := time.Since(startAtts)
|
||||||
|
|
||||||
|
log.WithFields(logrus.Fields{
|
||||||
|
"blockRoot": fmt.Sprintf("%#x", blockRoot),
|
||||||
|
"totalCount": len(attestations),
|
||||||
|
"aggregateAttAndProofCount": aggregateAttAndProofCount,
|
||||||
|
"uniqueAggregateAttAndProofCount": len(validAggregates),
|
||||||
|
"attCount": len(atts),
|
||||||
|
"durationTotal": durationAggregateAttAndProof + durationAtts,
|
||||||
|
"durationAggregateAttAndProof": durationAggregateAttAndProof,
|
||||||
|
"durationAtts": durationAtts,
|
||||||
|
}).Debug("Verified and saved pending attestations to pool")
|
||||||
}
|
}
|
||||||
|
|
||||||
// attestationBucket groups attestations with the same AttestationData for batch processing.
|
// attestationBucket groups attestations with the same AttestationData for batch processing.
|
||||||
@@ -303,21 +349,20 @@ func (s *Service) processVerifiedAttestation(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAggregateAttAndProof) {
|
func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAggregateAttAndProof) error {
|
||||||
res, err := s.validateAggregatedAtt(ctx, aggregate)
|
res, err := s.validateAggregatedAtt(ctx, aggregate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Debug("Pending aggregated attestation failed validation")
|
log.WithError(err).Debug("Pending aggregated attestation failed validation")
|
||||||
return
|
return errors.Wrap(err, "validate aggregated att")
|
||||||
}
|
}
|
||||||
|
|
||||||
if res != pubsub.ValidationAccept || !s.validateBlockInAttestation(ctx, aggregate) {
|
if res != pubsub.ValidationAccept || !s.validateBlockInAttestation(ctx, aggregate) {
|
||||||
log.Debug("Pending aggregated attestation failed validation")
|
return errors.New("Pending aggregated attestation failed validation")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
att := aggregate.AggregateAttestationAndProof().AggregateVal()
|
att := aggregate.AggregateAttestationAndProof().AggregateVal()
|
||||||
if err := s.saveAttestation(att); err != nil {
|
if err := s.saveAttestation(att); err != nil {
|
||||||
log.WithError(err).Debug("Could not save aggregated attestation")
|
return errors.Wrap(err, "save attestation")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = s.setAggregatorIndexEpochSeen(att.GetData().Target.Epoch, aggregate.AggregateAttestationAndProof().GetAggregatorIndex())
|
_ = s.setAggregatorIndexEpochSeen(att.GetData().Target.Epoch, aggregate.AggregateAttestationAndProof().GetAggregatorIndex())
|
||||||
@@ -325,6 +370,8 @@ func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAg
|
|||||||
if err := s.cfg.p2p.Broadcast(ctx, aggregate); err != nil {
|
if err := s.cfg.p2p.Broadcast(ctx, aggregate); err != nil {
|
||||||
log.WithError(err).Debug("Could not broadcast aggregated attestation")
|
log.WithError(err).Debug("Could not broadcast aggregated attestation")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// This defines how pending aggregates are saved in the map. The key is the
|
// This defines how pending aggregates are saved in the map. The key is the
|
||||||
@@ -336,7 +383,7 @@ func (s *Service) savePendingAggregate(agg ethpb.SignedAggregateAttAndProof) {
|
|||||||
|
|
||||||
s.savePending(root, agg, func(other any) bool {
|
s.savePending(root, agg, func(other any) bool {
|
||||||
a, ok := other.(ethpb.SignedAggregateAttAndProof)
|
a, ok := other.(ethpb.SignedAggregateAttAndProof)
|
||||||
return ok && pendingAggregatesAreEqual(agg, a)
|
return ok && pendingAggregatesAreEqual(agg, a, includeAggregatorIndex)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -391,13 +438,19 @@ func (s *Service) savePending(root [32]byte, pending any, isEqual func(other any
|
|||||||
s.blkRootToPendingAtts[root] = append(s.blkRootToPendingAtts[root], pending)
|
s.blkRootToPendingAtts[root] = append(s.blkRootToPendingAtts[root], pending)
|
||||||
}
|
}
|
||||||
|
|
||||||
func pendingAggregatesAreEqual(a, b ethpb.SignedAggregateAttAndProof) bool {
|
// pendingAggregatesAreEqual checks if two pending aggregate attestations are equal.
|
||||||
|
// The filter parameter controls whether aggregator index is considered in the equality check.
|
||||||
|
func pendingAggregatesAreEqual(a, b ethpb.SignedAggregateAttAndProof, filter aggregatorIndexFilter) bool {
|
||||||
if a.Version() != b.Version() {
|
if a.Version() != b.Version() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if filter == includeAggregatorIndex {
|
||||||
if a.AggregateAttestationAndProof().GetAggregatorIndex() != b.AggregateAttestationAndProof().GetAggregatorIndex() {
|
if a.AggregateAttestationAndProof().GetAggregatorIndex() != b.AggregateAttestationAndProof().GetAggregatorIndex() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
aAtt := a.AggregateAttestationAndProof().AggregateVal()
|
aAtt := a.AggregateAttestationAndProof().AggregateVal()
|
||||||
bAtt := b.AggregateAttestationAndProof().AggregateVal()
|
bAtt := b.AggregateAttestationAndProof().AggregateVal()
|
||||||
if aAtt.GetData().Slot != bAtt.GetData().Slot {
|
if aAtt.GetData().Slot != bAtt.GetData().Slot {
|
||||||
|
|||||||
@@ -94,7 +94,7 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) {
|
|||||||
// Process block A (which exists and has no pending attestations)
|
// Process block A (which exists and has no pending attestations)
|
||||||
// This should skip processing attestations for A and request blocks B and C
|
// This should skip processing attestations for A and request blocks B and C
|
||||||
require.NoError(t, r.processPendingAttsForBlock(t.Context(), rootA))
|
require.NoError(t, r.processPendingAttsForBlock(t.Context(), rootA))
|
||||||
require.LogsContain(t, hook, "Requesting block by root")
|
require.LogsContain(t, hook, "Requesting blocks by root")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProcessPendingAtts_HasBlockSaveUnaggregatedAtt(t *testing.T) {
|
func TestProcessPendingAtts_HasBlockSaveUnaggregatedAtt(t *testing.T) {
|
||||||
@@ -911,17 +911,17 @@ func Test_pendingAggregatesAreEqual(t *testing.T) {
|
|||||||
},
|
},
|
||||||
AggregationBits: bitfield.Bitlist{0b1111},
|
AggregationBits: bitfield.Bitlist{0b1111},
|
||||||
}}}
|
}}}
|
||||||
assert.Equal(t, true, pendingAggregatesAreEqual(a, b))
|
assert.Equal(t, true, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
|
||||||
})
|
})
|
||||||
t.Run("different version", func(t *testing.T) {
|
t.Run("different version", func(t *testing.T) {
|
||||||
a := ðpb.SignedAggregateAttestationAndProof{Message: ðpb.AggregateAttestationAndProof{AggregatorIndex: 1}}
|
a := ðpb.SignedAggregateAttestationAndProof{Message: ðpb.AggregateAttestationAndProof{AggregatorIndex: 1}}
|
||||||
b := ðpb.SignedAggregateAttestationAndProofElectra{Message: ðpb.AggregateAttestationAndProofElectra{AggregatorIndex: 1}}
|
b := ðpb.SignedAggregateAttestationAndProofElectra{Message: ðpb.AggregateAttestationAndProofElectra{AggregatorIndex: 1}}
|
||||||
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
|
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
|
||||||
})
|
})
|
||||||
t.Run("different aggregator index", func(t *testing.T) {
|
t.Run("different aggregator index", func(t *testing.T) {
|
||||||
a := ðpb.SignedAggregateAttestationAndProof{Message: ðpb.AggregateAttestationAndProof{AggregatorIndex: 1}}
|
a := ðpb.SignedAggregateAttestationAndProof{Message: ðpb.AggregateAttestationAndProof{AggregatorIndex: 1}}
|
||||||
b := ðpb.SignedAggregateAttestationAndProof{Message: ðpb.AggregateAttestationAndProof{AggregatorIndex: 2}}
|
b := ðpb.SignedAggregateAttestationAndProof{Message: ðpb.AggregateAttestationAndProof{AggregatorIndex: 2}}
|
||||||
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
|
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
|
||||||
})
|
})
|
||||||
t.Run("different slot", func(t *testing.T) {
|
t.Run("different slot", func(t *testing.T) {
|
||||||
a := ðpb.SignedAggregateAttestationAndProof{
|
a := ðpb.SignedAggregateAttestationAndProof{
|
||||||
@@ -942,7 +942,7 @@ func Test_pendingAggregatesAreEqual(t *testing.T) {
|
|||||||
},
|
},
|
||||||
AggregationBits: bitfield.Bitlist{0b1111},
|
AggregationBits: bitfield.Bitlist{0b1111},
|
||||||
}}}
|
}}}
|
||||||
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
|
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
|
||||||
})
|
})
|
||||||
t.Run("different committee index", func(t *testing.T) {
|
t.Run("different committee index", func(t *testing.T) {
|
||||||
a := ðpb.SignedAggregateAttestationAndProof{
|
a := ðpb.SignedAggregateAttestationAndProof{
|
||||||
@@ -963,7 +963,7 @@ func Test_pendingAggregatesAreEqual(t *testing.T) {
|
|||||||
},
|
},
|
||||||
AggregationBits: bitfield.Bitlist{0b1111},
|
AggregationBits: bitfield.Bitlist{0b1111},
|
||||||
}}}
|
}}}
|
||||||
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
|
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
|
||||||
})
|
})
|
||||||
t.Run("different aggregation bits", func(t *testing.T) {
|
t.Run("different aggregation bits", func(t *testing.T) {
|
||||||
a := ðpb.SignedAggregateAttestationAndProof{
|
a := ðpb.SignedAggregateAttestationAndProof{
|
||||||
@@ -984,7 +984,30 @@ func Test_pendingAggregatesAreEqual(t *testing.T) {
|
|||||||
},
|
},
|
||||||
AggregationBits: bitfield.Bitlist{0b1000},
|
AggregationBits: bitfield.Bitlist{0b1000},
|
||||||
}}}
|
}}}
|
||||||
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
|
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
|
||||||
|
})
|
||||||
|
t.Run("different aggregator index should be equal while ignoring aggregator index", func(t *testing.T) {
|
||||||
|
a := ðpb.SignedAggregateAttestationAndProof{
|
||||||
|
Message: ðpb.AggregateAttestationAndProof{
|
||||||
|
AggregatorIndex: 1,
|
||||||
|
Aggregate: ðpb.Attestation{
|
||||||
|
Data: ðpb.AttestationData{
|
||||||
|
Slot: 1,
|
||||||
|
CommitteeIndex: 1,
|
||||||
|
},
|
||||||
|
AggregationBits: bitfield.Bitlist{0b1111},
|
||||||
|
}}}
|
||||||
|
b := ðpb.SignedAggregateAttestationAndProof{
|
||||||
|
Message: ðpb.AggregateAttestationAndProof{
|
||||||
|
AggregatorIndex: 2,
|
||||||
|
Aggregate: ðpb.Attestation{
|
||||||
|
Data: ðpb.AttestationData{
|
||||||
|
Slot: 1,
|
||||||
|
CommitteeIndex: 1,
|
||||||
|
},
|
||||||
|
AggregationBits: bitfield.Bitlist{0b1111},
|
||||||
|
}}}
|
||||||
|
assert.Equal(t, true, pendingAggregatesAreEqual(a, b, ignoreAggregatorIndex))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package sync
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -44,11 +43,13 @@ func (s *Service) processPendingBlocksQueue() {
|
|||||||
if !s.chainIsStarted() {
|
if !s.chainIsStarted() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
locker.Lock()
|
locker.Lock()
|
||||||
|
defer locker.Unlock()
|
||||||
|
|
||||||
if err := s.processPendingBlocks(s.ctx); err != nil {
|
if err := s.processPendingBlocks(s.ctx); err != nil {
|
||||||
log.WithError(err).Debug("Could not process pending blocks")
|
log.WithError(err).Debug("Could not process pending blocks")
|
||||||
}
|
}
|
||||||
locker.Unlock()
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -73,8 +74,10 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
|
|||||||
randGen := rand.NewGenerator()
|
randGen := rand.NewGenerator()
|
||||||
var parentRoots [][32]byte
|
var parentRoots [][32]byte
|
||||||
|
|
||||||
|
blkRoots := make([][32]byte, 0, len(sortedSlots)*maxBlocksPerSlot)
|
||||||
|
|
||||||
// Iterate through sorted slots.
|
// Iterate through sorted slots.
|
||||||
for _, slot := range sortedSlots {
|
for i, slot := range sortedSlots {
|
||||||
// Skip processing if slot is in the future.
|
// Skip processing if slot is in the future.
|
||||||
if slot > s.cfg.clock.CurrentSlot() {
|
if slot > s.cfg.clock.CurrentSlot() {
|
||||||
continue
|
continue
|
||||||
@@ -91,6 +94,9 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
|
|||||||
|
|
||||||
// Process each block in the queue.
|
// Process each block in the queue.
|
||||||
for _, b := range blocksInCache {
|
for _, b := range blocksInCache {
|
||||||
|
start := time.Now()
|
||||||
|
totalDuration := time.Duration(0)
|
||||||
|
|
||||||
if err := blocks.BeaconBlockIsNil(b); err != nil {
|
if err := blocks.BeaconBlockIsNil(b); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -147,19 +153,34 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
cancelFunction()
|
cancelFunction()
|
||||||
|
|
||||||
// Process pending attestations for this block.
|
blkRoots = append(blkRoots, blkRoot)
|
||||||
if err := s.processPendingAttsForBlock(ctx, blkRoot); err != nil {
|
|
||||||
log.WithError(err).Debug("Failed to process pending attestations for block")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove the processed block from the queue.
|
// Remove the processed block from the queue.
|
||||||
if err := s.removeBlockFromQueue(b, blkRoot); err != nil {
|
if err := s.removeBlockFromQueue(b, blkRoot); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.WithFields(logrus.Fields{"slot": slot, "blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:]))}).Debug("Processed pending block and cleared it in cache")
|
|
||||||
|
duration := time.Since(start)
|
||||||
|
totalDuration += duration
|
||||||
|
log.WithFields(logrus.Fields{
|
||||||
|
"slotIndex": fmt.Sprintf("%d/%d", i+1, len(sortedSlots)),
|
||||||
|
"slot": slot,
|
||||||
|
"root": fmt.Sprintf("%#x", blkRoot),
|
||||||
|
"duration": duration,
|
||||||
|
"totalDuration": totalDuration,
|
||||||
|
}).Debug("Processed pending block and cleared it in cache")
|
||||||
}
|
}
|
||||||
|
|
||||||
span.End()
|
span.End()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, blkRoot := range blkRoots {
|
||||||
|
// Process pending attestations for this block.
|
||||||
|
if err := s.processPendingAttsForBlock(ctx, blkRoot); err != nil {
|
||||||
|
log.WithError(err).Debug("Failed to process pending attestations for block")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return s.sendBatchRootRequest(ctx, parentRoots, randGen)
|
return s.sendBatchRootRequest(ctx, parentRoots, randGen)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -379,6 +400,19 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra
|
|||||||
req = roots[:maxReqBlock]
|
req = roots[:maxReqBlock]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if logrus.GetLevel() >= logrus.DebugLevel {
|
||||||
|
rootsStr := make([]string, 0, len(roots))
|
||||||
|
for _, req := range roots {
|
||||||
|
rootsStr = append(rootsStr, fmt.Sprintf("%#x", req))
|
||||||
|
}
|
||||||
|
|
||||||
|
log.WithFields(logrus.Fields{
|
||||||
|
"peer": pid,
|
||||||
|
"count": len(req),
|
||||||
|
"roots": rootsStr,
|
||||||
|
}).Debug("Requesting blocks by root")
|
||||||
|
}
|
||||||
|
|
||||||
// Send the request to the peer.
|
// Send the request to the peer.
|
||||||
if err := s.sendBeaconBlocksRequest(ctx, &req, pid); err != nil {
|
if err := s.sendBeaconBlocksRequest(ctx, &req, pid); err != nil {
|
||||||
tracing.AnnotateError(span, err)
|
tracing.AnnotateError(span, err)
|
||||||
@@ -438,8 +472,6 @@ func (s *Service) filterOutPendingAndSynced(roots [][fieldparams.RootLength]byte
|
|||||||
roots = append(roots[:i], roots[i+1:]...)
|
roots = append(roots[:i], roots[i+1:]...)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithField("blockRoot", fmt.Sprintf("%#x", r)).Debug("Requesting block by root")
|
|
||||||
}
|
}
|
||||||
return roots
|
return roots
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -51,14 +51,12 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
|
|||||||
// Decode the message, reject if it fails.
|
// Decode the message, reject if it fails.
|
||||||
m, err := s.decodePubsubMessage(msg)
|
m, err := s.decodePubsubMessage(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Failed to decode message")
|
|
||||||
return pubsub.ValidationReject, err
|
return pubsub.ValidationReject, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reject messages that are not of the expected type.
|
// Reject messages that are not of the expected type.
|
||||||
dcsc, ok := m.(*eth.DataColumnSidecar)
|
dcsc, ok := m.(*eth.DataColumnSidecar)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.WithField("message", m).Error("Message is not of type *eth.DataColumnSidecar")
|
|
||||||
return pubsub.ValidationReject, errWrongMessage
|
return pubsub.ValidationReject, errWrongMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -361,7 +361,7 @@ func (dv *RODataColumnsVerifier) SidecarParentSeen(parentSeen func([fieldparams.
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !dv.fc.HasNode(parentRoot) {
|
if !dv.fc.HasNode(parentRoot) {
|
||||||
return columnErrBuilder(errSidecarParentNotSeen)
|
return columnErrBuilder(errors.Wrapf(errSidecarParentNotSeen, "parent root: %#x", parentRoot))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
3
changelog/james-prysm_align-atter-pool-apis.md
Normal file
3
changelog/james-prysm_align-atter-pool-apis.md
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
### Changed
|
||||||
|
|
||||||
|
- the /eth/v2/beacon/pool/attestations and /eth/v1/beacon/pool/sync_committees now returns a 503 error if the node is still syncing, the rest api is also working in a similar process to gRPC broadcasting immediately now.
|
||||||
3
changelog/manu-agg.md
Normal file
3
changelog/manu-agg.md
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
### Changed
|
||||||
|
|
||||||
|
- Pending aggregates: When multiple aggregated attestations only differing by the aggregator index are in the pending queue, only process one of them.
|
||||||
2
changelog/manu-remove-error-logs.md
Normal file
2
changelog/manu-remove-error-logs.md
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
### Changed
|
||||||
|
- `validateDataColumn`: Remove error logs.
|
||||||
3
changelog/satushh-pulltip.md
Normal file
3
changelog/satushh-pulltip.md
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
### Fixed
|
||||||
|
|
||||||
|
- Removed redundant justified checkpoint update in pullTips.
|
||||||
Reference in New Issue
Block a user