Compare commits

...

5 Commits

Author SHA1 Message Date
terence
7950a24926 feat(primitives): add BuilderIndex SSZ type (#16169)
This pr adds primitives.BuilderIndex for builder registry indexing in
Gloas
2025-12-20 04:29:42 +00:00
Potuz
ea51253be9 Do not process slots and copy state for payload attributes post Fulu (#16168)
When computing payload attributes post-Fulu, we do not need to process
slots, nor copy the state if we need to find out if the node is
proposing in the next slot. This prevents an immediate epoch processing
after block 31 is processed unless we are actually proposing.
2025-12-19 22:03:52 +00:00
Manu NALEPA
2ac30f5ce6 Pending aggregates: When multiple aggregated attestations only differing by the aggregator index are in the pending queue, only process one of them. (#16153)
**What type of PR is this?**
Other

**What does this PR do? Why is it needed?**
When an (potentially aggregated) attestation is received **before** the
block being voted for, Prysm queues this attestation, then processes the
queue when the block has been received.

This behavior is consistent with the [Phase0 specification
](https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id).

> [IGNORE] The block being voted for
(attestation.data.beacon_block_root) has been seen (via gossip or
non-gossip sources) (a client MAY queue attestations for processing once
block is retrieved).

Once the block being voted for is processed, previously queued
(potentially aggregated) attestations are then processed, and
broadcasted.

Processing (potentially aggregated) attestations takes some non
negligible time. For this reason, (potentially aggregated) attestations
are deduplicated before being introduced into the pending queue, to
avoid eventually processing duplicates.

Before this PR, two aggregated attestations were considered duplicated
if all of the following conditions were gathered:
1. Attestations have the same version, 
2. **Attestations have the same aggregator index (aka., the same
validator aggregated them)**,
3. Attestations have the same slot, 
4. Attestations have the same committee index, and
5. Attestations have the same aggregation bits

Aggregated attestations are then broadcasted.
The final purpose of aggregated attestations is to be packed into the
next block by the next proposer.
When packing attestations, the aggregator index is not used any more.

This pull request modifies the deduplication function used in the
pending aggregated attestations queue by considering that multiple
aggregated attestations only differing by the aggregator index are
equivalent (removing `2.` of the previous list.)

As a consequence, the count of aggregated attestations to be introduced
in the pending queue is reduced from 1 aggregated attestation by
aggregator to, in the best case,
[MAX_COMMITTEE_PER_SLOT=64](https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#misc-1).

Also, only a single aggregated attestation for a given version, slot,
committee index and aggregation bits will be re-broadcasted. This is a
correct behavior, since no data to be included in a block will be lost.
(We can even say that this will reduce by a bit the total networking
volume.)

**How to test**:
1. Start a beacon node (preferably, on a slow computer) from a
checkpoint.
2. Filter logs containing `Synced new block` and `Verified and saved
pending attestations to pool`. (You can pipe logs into `grep -E "Synced
new block|Verified and saved pending attestations to pool"`.

- In `Synced new block` logs, monitor the `sinceSlotStartTime` value.
This should monotonically decrease.
- In `Verified and saved pending attestations to pool`, monitor the
`pendingAggregateAttAndProofCount` value. It should be a "honest" value.
"honest" is not really quantifiable here, since it depends on the
aggregators. But it's likely to be less than
`5*MAX_COMMITTEE_PER_SLOT=320`.

**Which issues(s) does this PR fix?**

Partially fixes:
- https://github.com/OffchainLabs/prysm/issues/16160

**Other notes for review**
Please read commit by commit, with commit messages.
The important commit is b748c04a67.

**Acknowledgements**
- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2025-12-19 14:05:50 +00:00
Manu NALEPA
7418c00ad6 validateDataColumn: Remove error logs. (#16157)
**What type of PR is this?**
Other

**What does this PR do? Why is it needed?**
When we receive data column sidecars via gossip, if the sidecar does not
respect the validation rules, a scary ERROR log is displayed. We can't
to anything about it, since the error comes from an invalid incoming
sidecar, so there is no need to print an ERROR message.

Node: As all REJECTED gossip message, a DEBUG log is also always
displayed.

Example of ERROR log:
```
[2025-12-18 15:38:26.46] ERROR sync: Failed to decode message error=invalid ssz encoding. first variable element offset indexes into fixed value data
[2025-12-18 15:38:26.46] DEBUG sync: Gossip message was rejected agent=erigon/caplin error=invalid ssz encoding. first variable element offset indexes into fixed value data gossipScore=0 multiaddress=/ip4/141.147.32.105/tcp/9000 peerID=16Uiu2HAmHu88k97iBist1vJg7cPNuTjJFRARKvDF7yaH3Pv3Vmso topic=/eth2/c6ecb76c/data_column_sidecar_30/ssz_snappy
```

(After this PR, the DEBUG one will still be printed.)

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2025-12-18 16:18:02 +00:00
james-prysm
66342655fd throw 503 error when submit attestation and sync committee are called on syncing node + align changes to gRPC (#16152)
<!-- Thanks for sending a PR! Before submitting:

1. If this is your first PR, check out our contribution guide here
https://docs.prylabs.network/docs/contribute/contribution-guidelines
You will then need to sign our Contributor License Agreement (CLA),
which will show up as a comment from a bot in this pull request after
you open it. We cannot review code without a signed CLA.
2. Please file an associated tracking issue if this pull request is
non-trivial and requires context for our team to understand. All
features and most bug fixes should have
an associated issue with a design discussed and decided upon. Small bug
   fixes and documentation improvements don't need issues.
3. New features and bug fixes must have tests. Documentation may need to
be updated. If you're unsure what to update, send the PR, and we'll
discuss
   in review.
4. Note that PRs updating dependencies and new Go versions are not
accepted.
   Please file an issue instead.
5. A changelog entry is required for user facing issues.
-->

**What type of PR is this?**

Bug fix


**What does this PR do? Why is it needed?**

Prysm starting throwing this error `Could not write response message"
error="write tcp 10.104.92.212:5052->10.104.92.196:41876: write: broken
pipe` because a validator got attestation data from a synced node and
submitted attestation to a syncing node, when the node couldn't replay
the state the validator context deadlined and disconnected but the
writer when it finally responded tries to write it gets this broken pipe
error.

applies to `/eth/v2/beacon/pool/attestations` and
`/eth/v1/beacon/pool/sync_committees`

the solution is 2 part.
1. we shouldn't allow submission of an attestation if the node is
syncing because we can't save the attestation without the state
information.
2. we were doing the expensive state call before broadcast before in
rest and now it should match gRPC where it happens afterward in its own
go routine.

Tested manually running kurtosis with rest validators

```
participants:
 # Super-nodes
 - el_type: nethermind
   cl_type: prysm
   cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest
   count: 2
   supernode: true
   cl_extra_params:
     - --subscribe-all-subnets
     - --verbosity=debug
   vc_extra_params:
     - --enable-beacon-rest-api
     - --verbosity=debug

 # Full-nodes
 - el_type: nethermind
   cl_type: prysm
   cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest
   validator_count: 63
   cl_extra_params:
     - --verbosity=debug
   vc_extra_params:
     - --enable-beacon-rest-api
     - --verbosity=debug

 - el_type: nethermind
   cl_type: prysm
   cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest
   cl_extra_params:
     - --verbosity=debug
   vc_extra_params:
     - --enable-beacon-rest-api
     - --verbosity=debug
   validator_count: 13

additional_services:
 - dora
 - spamoor

spamoor_params:
 image: ethpandaops/spamoor:master
 max_mem: 4000
 spammers:
   - scenario: eoatx
     config:
       throughput: 200
   - scenario: blobs
     config:
       throughput: 20

network_params:
  fulu_fork_epoch: 2
  bpo_1_epoch: 8
  bpo_1_max_blobs: 21
  withdrawal_type: "0x02"
  preset: mainnet
  seconds_per_slot: 6

global_log_level: debug
```

**Which issues(s) does this PR fix?**

Fixes #

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2025-12-18 15:07:09 +00:00
19 changed files with 511 additions and 102 deletions

View File

@@ -323,14 +323,17 @@ func (s *Service) getPayloadAttribute(ctx context.Context, st state.BeaconState,
var ok bool
e := slots.ToEpoch(slot)
stateEpoch := slots.ToEpoch(st.Slot())
if e == stateEpoch {
fuluAndNextEpoch := st.Version() >= version.Fulu && e == stateEpoch+1
if e == stateEpoch || fuluAndNextEpoch {
val, ok = s.trackedProposer(st, slot)
if !ok {
return emptyAttri
}
}
st = st.Copy()
if slot > st.Slot() {
// At this point either we know we are proposing on a future slot or we need to still compute the
// right proposer index pre-Fulu, either way we need to copy the state to process it.
st = st.Copy()
var err error
st, err = transition.ProcessSlotsUsingNextSlotCache(ctx, st, headRoot, slot)
if err != nil {
@@ -338,7 +341,7 @@ func (s *Service) getPayloadAttribute(ctx context.Context, st state.BeaconState,
return emptyAttri
}
}
if e > stateEpoch {
if e > stateEpoch && !fuluAndNextEpoch {
emptyAttri := payloadattribute.EmptyWithVersion(st.Version())
val, ok = s.trackedProposer(st, slot)
if !ok {

View File

@@ -130,6 +130,10 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitAttestationsV2")
defer span.End()
if shared.IsSyncing(ctx, w, s.SyncChecker, s.HeadFetcher, s.TimeFetcher, s.OptimisticModeFetcher) {
return
}
versionHeader := r.Header.Get(api.VersionHeader)
if versionHeader == "" {
httputil.HandleError(w, api.VersionHeader+" header is required", http.StatusBadRequest)
@@ -238,22 +242,14 @@ func (s *Server) handleAttestationsElectra(
},
})
targetState, err := s.AttestationStateFetcher.AttestationTargetState(ctx, singleAtt.Data.Target)
if err != nil {
return nil, nil, errors.Wrap(err, "could not get target state for attestation")
}
committee, err := corehelpers.BeaconCommitteeFromState(ctx, targetState, singleAtt.Data.Slot, singleAtt.CommitteeId)
if err != nil {
return nil, nil, errors.Wrap(err, "could not get committee for attestation")
}
att := singleAtt.ToAttestationElectra(committee)
wantedEpoch := slots.ToEpoch(att.Data.Slot)
// Broadcast first using CommitteeId directly (fast path)
// This matches gRPC behavior and avoids blocking on state fetching
wantedEpoch := slots.ToEpoch(singleAtt.Data.Slot)
vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
if err != nil {
return nil, nil, errors.Wrap(err, "could not get head validator indices")
}
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.GetCommitteeIndex(), att.Data.Slot)
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), singleAtt.CommitteeId, singleAtt.Data.Slot)
if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, singleAtt); err != nil {
failedBroadcasts = append(failedBroadcasts, &server.IndexedError{
Index: i,
@@ -264,17 +260,35 @@ func (s *Server) handleAttestationsElectra(
}
continue
}
}
if features.Get().EnableExperimentalAttestationPool {
if err = s.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("Could not save attestation")
// Save to pool after broadcast (slow path - requires state fetching)
// Run in goroutine to avoid blocking the HTTP response
go func() {
for _, singleAtt := range validAttestations {
targetState, err := s.AttestationStateFetcher.AttestationTargetState(context.Background(), singleAtt.Data.Target)
if err != nil {
log.WithError(err).Error("Could not get target state for attestation")
continue
}
} else {
if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not save attestation")
committee, err := corehelpers.BeaconCommitteeFromState(context.Background(), targetState, singleAtt.Data.Slot, singleAtt.CommitteeId)
if err != nil {
log.WithError(err).Error("Could not get committee for attestation")
continue
}
att := singleAtt.ToAttestationElectra(committee)
if features.Get().EnableExperimentalAttestationPool {
if err = s.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("Could not save attestation")
}
} else {
if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not save attestation")
}
}
}
}
}()
if len(failedBroadcasts) > 0 {
log.WithFields(logrus.Fields{
@@ -470,6 +484,10 @@ func (s *Server) SubmitSyncCommitteeSignatures(w http.ResponseWriter, r *http.Re
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitPoolSyncCommitteeSignatures")
defer span.End()
if shared.IsSyncing(ctx, w, s.SyncChecker, s.HeadFetcher, s.TimeFetcher, s.OptimisticModeFetcher) {
return
}
var req structs.SubmitSyncCommitteeSignaturesRequest
err := json.NewDecoder(r.Body).Decode(&req.Data)
switch {

View File

@@ -26,6 +26,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits/mock"
p2pMock "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core"
mockSync "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/initial-sync/testing"
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
@@ -622,6 +623,8 @@ func TestSubmitAttestationsV2(t *testing.T) {
HeadFetcher: chainService,
ChainInfoFetcher: chainService,
TimeFetcher: chainService,
OptimisticModeFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: false},
OperationNotifier: &blockchainmock.MockOperationNotifier{},
AttestationStateFetcher: chainService,
}
@@ -654,6 +657,7 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
time.Sleep(100 * time.Millisecond) // Wait for async pool save
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
})
t.Run("multiple", func(t *testing.T) {
@@ -673,6 +677,7 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, http.StatusOK, writer.Code)
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, 2, broadcaster.NumAttestations())
time.Sleep(100 * time.Millisecond) // Wait for async pool save
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
})
t.Run("phase0 att post electra", func(t *testing.T) {
@@ -793,6 +798,7 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
time.Sleep(100 * time.Millisecond) // Wait for async pool save
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
})
t.Run("multiple", func(t *testing.T) {
@@ -812,6 +818,7 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, http.StatusOK, writer.Code)
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, 2, broadcaster.NumAttestations())
time.Sleep(100 * time.Millisecond) // Wait for async pool save
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
})
t.Run("no body", func(t *testing.T) {
@@ -861,6 +868,27 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, true, strings.Contains(e.Failures[0].Message, "Incorrect attestation signature"))
})
})
t.Run("syncing", func(t *testing.T) {
chainService := &blockchainmock.ChainService{}
s := &Server{
HeadFetcher: chainService,
TimeFetcher: chainService,
OptimisticModeFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: true},
}
var body bytes.Buffer
_, err := body.WriteString(singleAtt)
require.NoError(t, err)
request := httptest.NewRequest(http.MethodPost, "http://example.com", &body)
request.Header.Set(api.VersionHeader, version.String(version.Phase0))
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
s.SubmitAttestationsV2(writer, request)
assert.Equal(t, http.StatusServiceUnavailable, writer.Code)
assert.Equal(t, true, strings.Contains(writer.Body.String(), "Beacon node is currently syncing"))
})
}
func TestListVoluntaryExits(t *testing.T) {
@@ -1057,14 +1085,19 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
t.Run("single", func(t *testing.T) {
broadcaster := &p2pMock.MockBroadcaster{}
chainService := &blockchainmock.ChainService{
State: st,
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
}
s := &Server{
HeadFetcher: chainService,
TimeFetcher: chainService,
OptimisticModeFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: false},
CoreService: &core.Service{
SyncCommitteePool: synccommittee.NewStore(),
P2P: broadcaster,
HeadFetcher: &blockchainmock.ChainService{
State: st,
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
},
HeadFetcher: chainService,
},
}
@@ -1089,14 +1122,19 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
})
t.Run("multiple", func(t *testing.T) {
broadcaster := &p2pMock.MockBroadcaster{}
chainService := &blockchainmock.ChainService{
State: st,
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
}
s := &Server{
HeadFetcher: chainService,
TimeFetcher: chainService,
OptimisticModeFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: false},
CoreService: &core.Service{
SyncCommitteePool: synccommittee.NewStore(),
P2P: broadcaster,
HeadFetcher: &blockchainmock.ChainService{
State: st,
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
},
HeadFetcher: chainService,
},
}
@@ -1120,13 +1158,18 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
})
t.Run("invalid", func(t *testing.T) {
broadcaster := &p2pMock.MockBroadcaster{}
chainService := &blockchainmock.ChainService{
State: st,
}
s := &Server{
HeadFetcher: chainService,
TimeFetcher: chainService,
OptimisticModeFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: false},
CoreService: &core.Service{
SyncCommitteePool: synccommittee.NewStore(),
P2P: broadcaster,
HeadFetcher: &blockchainmock.ChainService{
State: st,
},
HeadFetcher: chainService,
},
}
@@ -1149,7 +1192,13 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
assert.Equal(t, false, broadcaster.BroadcastCalled.Load())
})
t.Run("empty", func(t *testing.T) {
s := &Server{}
chainService := &blockchainmock.ChainService{State: st}
s := &Server{
HeadFetcher: chainService,
TimeFetcher: chainService,
OptimisticModeFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
var body bytes.Buffer
_, err := body.WriteString("[]")
@@ -1166,7 +1215,13 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
assert.Equal(t, true, strings.Contains(e.Message, "No data submitted"))
})
t.Run("no body", func(t *testing.T) {
s := &Server{}
chainService := &blockchainmock.ChainService{State: st}
s := &Server{
HeadFetcher: chainService,
TimeFetcher: chainService,
OptimisticModeFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
request := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
writer := httptest.NewRecorder()
@@ -1179,6 +1234,26 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
assert.Equal(t, http.StatusBadRequest, e.Code)
assert.Equal(t, true, strings.Contains(e.Message, "No data submitted"))
})
t.Run("syncing", func(t *testing.T) {
chainService := &blockchainmock.ChainService{State: st}
s := &Server{
HeadFetcher: chainService,
TimeFetcher: chainService,
OptimisticModeFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: true},
}
var body bytes.Buffer
_, err := body.WriteString(singleSyncCommitteeMsg)
require.NoError(t, err)
request := httptest.NewRequest(http.MethodPost, "http://example.com", &body)
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
s.SubmitSyncCommitteeSignatures(writer, request)
assert.Equal(t, http.StatusServiceUnavailable, writer.Code)
assert.Equal(t, true, strings.Contains(writer.Body.String(), "Beacon node is currently syncing"))
})
}
func TestListBLSToExecutionChanges(t *testing.T) {

View File

@@ -52,24 +52,27 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation
ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestation")
defer span.End()
if vs.SyncChecker.Syncing() {
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
resp, err := vs.proposeAtt(ctx, att, att.GetData().CommitteeIndex)
if err != nil {
return nil, err
}
if features.Get().EnableExperimentalAttestationPool {
if err = vs.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("Could not save attestation")
}
} else {
go func() {
go func() {
if features.Get().EnableExperimentalAttestationPool {
if err := vs.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("Could not save attestation")
}
} else {
attCopy := att.Copy()
if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil {
log.WithError(err).Error("Could not save unaggregated attestation")
return
}
}()
}
}
}()
return resp, nil
}
@@ -82,6 +85,10 @@ func (vs *Server) ProposeAttestationElectra(ctx context.Context, singleAtt *ethp
ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestationElectra")
defer span.End()
if vs.SyncChecker.Syncing() {
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
resp, err := vs.proposeAtt(ctx, singleAtt, singleAtt.GetCommitteeIndex())
if err != nil {
return nil, err
@@ -98,18 +105,17 @@ func (vs *Server) ProposeAttestationElectra(ctx context.Context, singleAtt *ethp
singleAttCopy := singleAtt.Copy()
att := singleAttCopy.ToAttestationElectra(committee)
if features.Get().EnableExperimentalAttestationPool {
if err = vs.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("Could not save attestation")
}
} else {
go func() {
go func() {
if features.Get().EnableExperimentalAttestationPool {
if err := vs.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("Could not save attestation")
}
} else {
if err := vs.AttPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not save unaggregated attestation")
return
}
}()
}
}
}()
return resp, nil
}

View File

@@ -38,6 +38,7 @@ func TestProposeAttestation(t *testing.T) {
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
TimeFetcher: chainService,
AttestationStateFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
head := util.NewBeaconBlock()
head.Block.Slot = 999
@@ -141,6 +142,7 @@ func TestProposeAttestation_IncorrectSignature(t *testing.T) {
P2P: &mockp2p.MockBroadcaster{},
AttPool: attestations.NewPool(),
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
req := util.HydrateAttestation(&ethpb.Attestation{})
@@ -149,6 +151,37 @@ func TestProposeAttestation_IncorrectSignature(t *testing.T) {
assert.ErrorContains(t, wanted, err)
}
func TestProposeAttestation_Syncing(t *testing.T) {
attesterServer := &Server{
SyncChecker: &mockSync.Sync{IsSyncing: true},
}
req := util.HydrateAttestation(&ethpb.Attestation{})
_, err := attesterServer.ProposeAttestation(t.Context(), req)
assert.ErrorContains(t, "Syncing to latest head", err)
s, ok := status.FromError(err)
require.Equal(t, true, ok)
assert.Equal(t, codes.Unavailable, s.Code())
}
func TestProposeAttestationElectra_Syncing(t *testing.T) {
attesterServer := &Server{
SyncChecker: &mockSync.Sync{IsSyncing: true},
}
req := &ethpb.SingleAttestation{
Data: &ethpb.AttestationData{
Source: &ethpb.Checkpoint{Root: make([]byte, 32)},
Target: &ethpb.Checkpoint{Root: make([]byte, 32)},
},
}
_, err := attesterServer.ProposeAttestationElectra(t.Context(), req)
assert.ErrorContains(t, "Syncing to latest head", err)
s, ok := status.FromError(err)
require.Equal(t, true, ok)
assert.Equal(t, codes.Unavailable, s.Code())
}
func TestGetAttestationData_OK(t *testing.T) {
block := util.NewBeaconBlock()
block.Block.Slot = 3*params.BeaconConfig().SlotsPerEpoch + 1

View File

@@ -3,6 +3,7 @@ package sync
import (
"bytes"
"context"
"fmt"
"maps"
"slices"
"sync"
@@ -243,8 +244,10 @@ func requestDirectSidecarsFromPeers(
}
// Compute missing indices by root, excluding those already in storage.
var lastRoot [fieldparams.RootLength]byte
missingIndicesByRoot := make(map[[fieldparams.RootLength]byte]map[uint64]bool, len(incompleteRoots))
for root := range incompleteRoots {
lastRoot = root
storedIndices := storedIndicesByRoot[root]
missingIndices := make(map[uint64]bool, len(requestedIndices))
@@ -259,6 +262,7 @@ func requestDirectSidecarsFromPeers(
}
}
initialMissingRootCount := len(missingIndicesByRoot)
initialMissingCount := computeTotalCount(missingIndicesByRoot)
indicesByRootByPeer, err := computeIndicesByRootByPeer(params.P2P, slotByRoot, missingIndicesByRoot, connectedPeers)
@@ -301,11 +305,19 @@ func requestDirectSidecarsFromPeers(
}
}
log.WithFields(logrus.Fields{
"duration": time.Since(start),
"initialMissingCount": initialMissingCount,
"finalMissingCount": computeTotalCount(missingIndicesByRoot),
}).Debug("Requested direct data column sidecars from peers")
log := log.WithFields(logrus.Fields{
"duration": time.Since(start),
"initialMissingRootCount": initialMissingRootCount,
"initialMissingCount": initialMissingCount,
"finalMissingRootCount": len(missingIndicesByRoot),
"finalMissingCount": computeTotalCount(missingIndicesByRoot),
})
if initialMissingRootCount == 1 {
log = log.WithField("root", fmt.Sprintf("%#x", lastRoot))
}
log.Debug("Requested direct data column sidecars from peers")
return verifiedColumnsByRoot, nil
}

View File

@@ -3,9 +3,9 @@ package sync
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"slices"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/blocks"
@@ -21,13 +21,23 @@ import (
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/time"
"github.com/OffchainLabs/prysm/v7/time/slots"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var pendingAttsLimit = 32768
const pendingAttsLimit = 32768
// aggregatorIndexFilter defines how aggregator index should be handled in equality checks.
type aggregatorIndexFilter int
const (
// ignoreAggregatorIndex means aggregates differing only by aggregator index are considered equal.
ignoreAggregatorIndex aggregatorIndexFilter = iota
// includeAggregatorIndex means aggregator index must also match for aggregates to be considered equal.
includeAggregatorIndex
)
// This method processes pending attestations as a "known" block as arrived. With validations,
// the valid attestations get saved into the operation mem pool, and the invalid attestations gets deleted
@@ -50,16 +60,7 @@ func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte
attestations := s.blkRootToPendingAtts[bRoot]
s.pendingAttsLock.RUnlock()
if len(attestations) > 0 {
start := time.Now()
s.processAttestations(ctx, attestations)
duration := time.Since(start)
log.WithFields(logrus.Fields{
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
"pendingAttsCount": len(attestations),
"duration": duration,
}).Debug("Verified and saved pending attestations to pool")
}
s.processAttestations(ctx, attestations)
randGen := rand.NewGenerator()
// Delete the missing block root key from pending attestation queue so a node will not request for the block again.
@@ -79,26 +80,71 @@ func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte
return s.sendBatchRootRequest(ctx, pendingRoots, randGen)
}
// processAttestations processes a list of attestations.
// It assumes (for logging purposes only) that all attestations pertain to the same block.
func (s *Service) processAttestations(ctx context.Context, attestations []any) {
if len(attestations) == 0 {
return
}
firstAttestation := attestations[0]
var blockRoot []byte
switch v := firstAttestation.(type) {
case ethpb.Att:
blockRoot = v.GetData().BeaconBlockRoot
case ethpb.SignedAggregateAttAndProof:
blockRoot = v.AggregateAttestationAndProof().AggregateVal().GetData().BeaconBlockRoot
default:
log.Warnf("Unexpected attestation type %T, skipping processing", v)
return
}
validAggregates := make([]ethpb.SignedAggregateAttAndProof, 0, len(attestations))
startAggregate := time.Now()
atts := make([]ethpb.Att, 0, len(attestations))
aggregateAttAndProofCount := 0
for _, att := range attestations {
switch v := att.(type) {
case ethpb.Att:
atts = append(atts, v)
case ethpb.SignedAggregateAttAndProof:
s.processAggregate(ctx, v)
aggregateAttAndProofCount++
// Avoid processing multiple aggregates only differing by aggregator index.
if slices.ContainsFunc(validAggregates, func(other ethpb.SignedAggregateAttAndProof) bool {
return pendingAggregatesAreEqual(v, other, ignoreAggregatorIndex)
}) {
continue
}
if err := s.processAggregate(ctx, v); err != nil {
log.WithError(err).Debug("Pending aggregate attestation could not be processed")
continue
}
validAggregates = append(validAggregates, v)
default:
log.Warnf("Unexpected attestation type %T, skipping", v)
}
}
durationAggregateAttAndProof := time.Since(startAggregate)
startAtts := time.Now()
for _, bucket := range bucketAttestationsByData(atts) {
s.processAttestationBucket(ctx, bucket)
}
durationAtts := time.Since(startAtts)
log.WithFields(logrus.Fields{
"blockRoot": fmt.Sprintf("%#x", blockRoot),
"totalCount": len(attestations),
"aggregateAttAndProofCount": aggregateAttAndProofCount,
"uniqueAggregateAttAndProofCount": len(validAggregates),
"attCount": len(atts),
"durationTotal": durationAggregateAttAndProof + durationAtts,
"durationAggregateAttAndProof": durationAggregateAttAndProof,
"durationAtts": durationAtts,
}).Debug("Verified and saved pending attestations to pool")
}
// attestationBucket groups attestations with the same AttestationData for batch processing.
@@ -303,21 +349,20 @@ func (s *Service) processVerifiedAttestation(
})
}
func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAggregateAttAndProof) {
func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAggregateAttAndProof) error {
res, err := s.validateAggregatedAtt(ctx, aggregate)
if err != nil {
log.WithError(err).Debug("Pending aggregated attestation failed validation")
return
return errors.Wrap(err, "validate aggregated att")
}
if res != pubsub.ValidationAccept || !s.validateBlockInAttestation(ctx, aggregate) {
log.Debug("Pending aggregated attestation failed validation")
return
return errors.New("Pending aggregated attestation failed validation")
}
att := aggregate.AggregateAttestationAndProof().AggregateVal()
if err := s.saveAttestation(att); err != nil {
log.WithError(err).Debug("Could not save aggregated attestation")
return
return errors.Wrap(err, "save attestation")
}
_ = s.setAggregatorIndexEpochSeen(att.GetData().Target.Epoch, aggregate.AggregateAttestationAndProof().GetAggregatorIndex())
@@ -325,6 +370,8 @@ func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAg
if err := s.cfg.p2p.Broadcast(ctx, aggregate); err != nil {
log.WithError(err).Debug("Could not broadcast aggregated attestation")
}
return nil
}
// This defines how pending aggregates are saved in the map. The key is the
@@ -336,7 +383,7 @@ func (s *Service) savePendingAggregate(agg ethpb.SignedAggregateAttAndProof) {
s.savePending(root, agg, func(other any) bool {
a, ok := other.(ethpb.SignedAggregateAttAndProof)
return ok && pendingAggregatesAreEqual(agg, a)
return ok && pendingAggregatesAreEqual(agg, a, includeAggregatorIndex)
})
}
@@ -391,13 +438,19 @@ func (s *Service) savePending(root [32]byte, pending any, isEqual func(other any
s.blkRootToPendingAtts[root] = append(s.blkRootToPendingAtts[root], pending)
}
func pendingAggregatesAreEqual(a, b ethpb.SignedAggregateAttAndProof) bool {
// pendingAggregatesAreEqual checks if two pending aggregate attestations are equal.
// The filter parameter controls whether aggregator index is considered in the equality check.
func pendingAggregatesAreEqual(a, b ethpb.SignedAggregateAttAndProof, filter aggregatorIndexFilter) bool {
if a.Version() != b.Version() {
return false
}
if a.AggregateAttestationAndProof().GetAggregatorIndex() != b.AggregateAttestationAndProof().GetAggregatorIndex() {
return false
if filter == includeAggregatorIndex {
if a.AggregateAttestationAndProof().GetAggregatorIndex() != b.AggregateAttestationAndProof().GetAggregatorIndex() {
return false
}
}
aAtt := a.AggregateAttestationAndProof().AggregateVal()
bAtt := b.AggregateAttestationAndProof().AggregateVal()
if aAtt.GetData().Slot != bAtt.GetData().Slot {

View File

@@ -94,7 +94,7 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) {
// Process block A (which exists and has no pending attestations)
// This should skip processing attestations for A and request blocks B and C
require.NoError(t, r.processPendingAttsForBlock(t.Context(), rootA))
require.LogsContain(t, hook, "Requesting block by root")
require.LogsContain(t, hook, "Requesting blocks by root")
}
func TestProcessPendingAtts_HasBlockSaveUnaggregatedAtt(t *testing.T) {
@@ -911,17 +911,17 @@ func Test_pendingAggregatesAreEqual(t *testing.T) {
},
AggregationBits: bitfield.Bitlist{0b1111},
}}}
assert.Equal(t, true, pendingAggregatesAreEqual(a, b))
assert.Equal(t, true, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
})
t.Run("different version", func(t *testing.T) {
a := &ethpb.SignedAggregateAttestationAndProof{Message: &ethpb.AggregateAttestationAndProof{AggregatorIndex: 1}}
b := &ethpb.SignedAggregateAttestationAndProofElectra{Message: &ethpb.AggregateAttestationAndProofElectra{AggregatorIndex: 1}}
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
})
t.Run("different aggregator index", func(t *testing.T) {
a := &ethpb.SignedAggregateAttestationAndProof{Message: &ethpb.AggregateAttestationAndProof{AggregatorIndex: 1}}
b := &ethpb.SignedAggregateAttestationAndProof{Message: &ethpb.AggregateAttestationAndProof{AggregatorIndex: 2}}
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
})
t.Run("different slot", func(t *testing.T) {
a := &ethpb.SignedAggregateAttestationAndProof{
@@ -942,7 +942,7 @@ func Test_pendingAggregatesAreEqual(t *testing.T) {
},
AggregationBits: bitfield.Bitlist{0b1111},
}}}
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
})
t.Run("different committee index", func(t *testing.T) {
a := &ethpb.SignedAggregateAttestationAndProof{
@@ -963,7 +963,7 @@ func Test_pendingAggregatesAreEqual(t *testing.T) {
},
AggregationBits: bitfield.Bitlist{0b1111},
}}}
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
})
t.Run("different aggregation bits", func(t *testing.T) {
a := &ethpb.SignedAggregateAttestationAndProof{
@@ -984,7 +984,30 @@ func Test_pendingAggregatesAreEqual(t *testing.T) {
},
AggregationBits: bitfield.Bitlist{0b1000},
}}}
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
})
t.Run("different aggregator index should be equal while ignoring aggregator index", func(t *testing.T) {
a := &ethpb.SignedAggregateAttestationAndProof{
Message: &ethpb.AggregateAttestationAndProof{
AggregatorIndex: 1,
Aggregate: &ethpb.Attestation{
Data: &ethpb.AttestationData{
Slot: 1,
CommitteeIndex: 1,
},
AggregationBits: bitfield.Bitlist{0b1111},
}}}
b := &ethpb.SignedAggregateAttestationAndProof{
Message: &ethpb.AggregateAttestationAndProof{
AggregatorIndex: 2,
Aggregate: &ethpb.Attestation{
Data: &ethpb.AttestationData{
Slot: 1,
CommitteeIndex: 1,
},
AggregationBits: bitfield.Bitlist{0b1111},
}}}
assert.Equal(t, true, pendingAggregatesAreEqual(a, b, ignoreAggregatorIndex))
})
}

View File

@@ -2,7 +2,6 @@ package sync
import (
"context"
"encoding/hex"
"fmt"
"slices"
"sync"
@@ -44,11 +43,13 @@ func (s *Service) processPendingBlocksQueue() {
if !s.chainIsStarted() {
return
}
locker.Lock()
defer locker.Unlock()
if err := s.processPendingBlocks(s.ctx); err != nil {
log.WithError(err).Debug("Could not process pending blocks")
}
locker.Unlock()
})
}
@@ -73,8 +74,10 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
randGen := rand.NewGenerator()
var parentRoots [][32]byte
blkRoots := make([][32]byte, 0, len(sortedSlots)*maxBlocksPerSlot)
// Iterate through sorted slots.
for _, slot := range sortedSlots {
for i, slot := range sortedSlots {
// Skip processing if slot is in the future.
if slot > s.cfg.clock.CurrentSlot() {
continue
@@ -91,6 +94,9 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
// Process each block in the queue.
for _, b := range blocksInCache {
start := time.Now()
totalDuration := time.Duration(0)
if err := blocks.BeaconBlockIsNil(b); err != nil {
continue
}
@@ -147,19 +153,34 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
}
cancelFunction()
// Process pending attestations for this block.
if err := s.processPendingAttsForBlock(ctx, blkRoot); err != nil {
log.WithError(err).Debug("Failed to process pending attestations for block")
}
blkRoots = append(blkRoots, blkRoot)
// Remove the processed block from the queue.
if err := s.removeBlockFromQueue(b, blkRoot); err != nil {
return err
}
log.WithFields(logrus.Fields{"slot": slot, "blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:]))}).Debug("Processed pending block and cleared it in cache")
duration := time.Since(start)
totalDuration += duration
log.WithFields(logrus.Fields{
"slotIndex": fmt.Sprintf("%d/%d", i+1, len(sortedSlots)),
"slot": slot,
"root": fmt.Sprintf("%#x", blkRoot),
"duration": duration,
"totalDuration": totalDuration,
}).Debug("Processed pending block and cleared it in cache")
}
span.End()
}
for _, blkRoot := range blkRoots {
// Process pending attestations for this block.
if err := s.processPendingAttsForBlock(ctx, blkRoot); err != nil {
log.WithError(err).Debug("Failed to process pending attestations for block")
}
}
return s.sendBatchRootRequest(ctx, parentRoots, randGen)
}
@@ -379,6 +400,19 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra
req = roots[:maxReqBlock]
}
if logrus.GetLevel() >= logrus.DebugLevel {
rootsStr := make([]string, 0, len(roots))
for _, req := range roots {
rootsStr = append(rootsStr, fmt.Sprintf("%#x", req))
}
log.WithFields(logrus.Fields{
"peer": pid,
"count": len(req),
"roots": rootsStr,
}).Debug("Requesting blocks by root")
}
// Send the request to the peer.
if err := s.sendBeaconBlocksRequest(ctx, &req, pid); err != nil {
tracing.AnnotateError(span, err)
@@ -438,8 +472,6 @@ func (s *Service) filterOutPendingAndSynced(roots [][fieldparams.RootLength]byte
roots = append(roots[:i], roots[i+1:]...)
continue
}
log.WithField("blockRoot", fmt.Sprintf("%#x", r)).Debug("Requesting block by root")
}
return roots
}

View File

@@ -51,14 +51,12 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
// Decode the message, reject if it fails.
m, err := s.decodePubsubMessage(msg)
if err != nil {
log.WithError(err).Error("Failed to decode message")
return pubsub.ValidationReject, err
}
// Reject messages that are not of the expected type.
dcsc, ok := m.(*eth.DataColumnSidecar)
if !ok {
log.WithField("message", m).Error("Message is not of type *eth.DataColumnSidecar")
return pubsub.ValidationReject, errWrongMessage
}

View File

@@ -361,7 +361,7 @@ func (dv *RODataColumnsVerifier) SidecarParentSeen(parentSeen func([fieldparams.
}
if !dv.fc.HasNode(parentRoot) {
return columnErrBuilder(errSidecarParentNotSeen)
return columnErrBuilder(errors.Wrapf(errSidecarParentNotSeen, "parent root: %#x", parentRoot))
}
}

View File

@@ -0,0 +1,3 @@
### Added
- `primitives.BuilderIndex`: SSZ `uint64` wrapper for builder registry indices.

View File

@@ -0,0 +1,3 @@
### Changed
- the /eth/v2/beacon/pool/attestations and /eth/v1/beacon/pool/sync_committees now returns a 503 error if the node is still syncing, the rest api is also working in a similar process to gRPC broadcasting immediately now.

3
changelog/manu-agg.md Normal file
View File

@@ -0,0 +1,3 @@
### Changed
- Pending aggregates: When multiple aggregated attestations only differing by the aggregator index are in the pending queue, only process one of them.

View File

@@ -0,0 +1,2 @@
### Changed
- `validateDataColumn`: Remove error logs.

View File

@@ -0,0 +1,3 @@
### Fixed
- Do not process slots and copy states for next epoch proposers after Fulu

View File

@@ -4,6 +4,7 @@ go_library(
name = "go_default_library",
srcs = [
"basis_points.go",
"builder_index.go",
"committee_bits_mainnet.go",
"committee_bits_minimal.go", # keep
"committee_index.go",
@@ -31,6 +32,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"builder_index_test.go",
"committee_index_test.go",
"domain_test.go",
"epoch_test.go",

View File

@@ -0,0 +1,54 @@
package primitives
import (
"fmt"
fssz "github.com/prysmaticlabs/fastssz"
)
var _ fssz.HashRoot = (BuilderIndex)(0)
var _ fssz.Marshaler = (*BuilderIndex)(nil)
var _ fssz.Unmarshaler = (*BuilderIndex)(nil)
// BuilderIndex is an index into the builder registry.
type BuilderIndex uint64
// HashTreeRoot returns the SSZ hash tree root of the index.
func (b BuilderIndex) HashTreeRoot() ([32]byte, error) {
return fssz.HashWithDefaultHasher(b)
}
// HashTreeRootWith appends the SSZ uint64 representation of the index to the given hasher.
func (b BuilderIndex) HashTreeRootWith(hh *fssz.Hasher) error {
hh.PutUint64(uint64(b))
return nil
}
// UnmarshalSSZ decodes the SSZ-encoded uint64 index from buf.
func (b *BuilderIndex) UnmarshalSSZ(buf []byte) error {
if len(buf) != b.SizeSSZ() {
return fmt.Errorf("expected buffer of length %d received %d", b.SizeSSZ(), len(buf))
}
*b = BuilderIndex(fssz.UnmarshallUint64(buf))
return nil
}
// MarshalSSZTo appends the SSZ-encoded index to dst and returns the extended buffer.
func (b *BuilderIndex) MarshalSSZTo(dst []byte) ([]byte, error) {
marshalled, err := b.MarshalSSZ()
if err != nil {
return nil, err
}
return append(dst, marshalled...), nil
}
// MarshalSSZ encodes the index as an SSZ uint64.
func (b *BuilderIndex) MarshalSSZ() ([]byte, error) {
marshalled := fssz.MarshalUint64([]byte{}, uint64(*b))
return marshalled, nil
}
// SizeSSZ returns the size of the SSZ-encoded index in bytes.
func (b *BuilderIndex) SizeSSZ() int {
return 8
}

View File

@@ -0,0 +1,86 @@
package primitives_test
import (
"encoding/binary"
"slices"
"strconv"
"testing"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/testing/require"
)
func TestBuilderIndex_SSZRoundTripAndHashRoot(t *testing.T) {
cases := []uint64{
0,
1,
42,
(1 << 32) - 1,
1 << 32,
^uint64(0),
}
for _, v := range cases {
t.Run("v="+u64name(v), func(t *testing.T) {
t.Parallel()
val := primitives.BuilderIndex(v)
require.Equal(t, 8, (&val).SizeSSZ())
enc, err := (&val).MarshalSSZ()
require.NoError(t, err)
require.Equal(t, 8, len(enc))
wantEnc := make([]byte, 8)
binary.LittleEndian.PutUint64(wantEnc, v)
require.DeepEqual(t, wantEnc, enc)
dstPrefix := []byte("prefix:")
dst, err := (&val).MarshalSSZTo(slices.Clone(dstPrefix))
require.NoError(t, err)
wantDst := append(dstPrefix, wantEnc...)
require.DeepEqual(t, wantDst, dst)
var decoded primitives.BuilderIndex
require.NoError(t, (&decoded).UnmarshalSSZ(enc))
require.Equal(t, val, decoded)
root, err := val.HashTreeRoot()
require.NoError(t, err)
var wantRoot [32]byte
binary.LittleEndian.PutUint64(wantRoot[:8], v)
require.Equal(t, wantRoot, root)
})
}
}
func TestBuilderIndex_UnmarshalSSZRejectsWrongSize(t *testing.T) {
for _, size := range []int{7, 9} {
t.Run("size="+strconv.Itoa(size), func(t *testing.T) {
t.Parallel()
var v primitives.BuilderIndex
err := (&v).UnmarshalSSZ(make([]byte, size))
require.ErrorContains(t, "expected buffer of length 8", err)
})
}
}
func u64name(v uint64) string {
switch v {
case 0:
return "0"
case 1:
return "1"
case 42:
return "42"
case (1 << 32) - 1:
return "2^32-1"
case 1 << 32:
return "2^32"
case ^uint64(0):
return "max"
default:
return "custom"
}
}