mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-02-03 09:35:00 -05:00
Compare commits
12 Commits
docs/docum
...
paralleliz
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cfe59706af | ||
|
|
b113d6bbde | ||
|
|
45577ef931 | ||
|
|
92865adfe7 | ||
|
|
a3863c118b | ||
|
|
730e6500e3 | ||
|
|
dac2c65004 | ||
|
|
dbfb987e1d | ||
|
|
3596d00ff9 | ||
|
|
2ac30f5ce6 | ||
|
|
7418c00ad6 | ||
|
|
66342655fd |
@@ -38,6 +38,7 @@ go_library(
|
|||||||
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
||||||
"@com_github_sirupsen_logrus//:go_default_library",
|
"@com_github_sirupsen_logrus//:go_default_library",
|
||||||
"@com_github_spf13_afero//:go_default_library",
|
"@com_github_spf13_afero//:go_default_library",
|
||||||
|
"@org_golang_x_sync//errgroup:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import (
|
|||||||
"github.com/OffchainLabs/prysm/v7/time/slots"
|
"github.com/OffchainLabs/prysm/v7/time/slots"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/spf13/afero"
|
"github.com/spf13/afero"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -625,7 +626,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create the SSZ encoded data column sidecars.
|
// Create the SSZ encoded data column sidecars.
|
||||||
var sszEncodedDataColumnSidecars []byte
|
var sszEncodedDataColumnSidecarsBytes []byte
|
||||||
|
|
||||||
// Initialize the count of the saved SSZ encoded data column sidecar.
|
// Initialize the count of the saved SSZ encoded data column sidecar.
|
||||||
storedCount := uint8(0)
|
storedCount := uint8(0)
|
||||||
@@ -636,7 +637,26 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, dataColumnSidecar := range dataColumnSidecars {
|
var wg errgroup.Group
|
||||||
|
sszEncodedDataColumnSidecars := make([][]byte, len(dataColumnSidecars))
|
||||||
|
for i, dataColumnSidecar := range dataColumnSidecars {
|
||||||
|
wg.Go(func() error {
|
||||||
|
// SSZ encode the data column sidecar.
|
||||||
|
sszEncodedDataColumnSidecar, err := dataColumnSidecar.MarshalSSZ()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "data column sidecar marshal SSZ")
|
||||||
|
}
|
||||||
|
|
||||||
|
sszEncodedDataColumnSidecars[i] = sszEncodedDataColumnSidecar
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := wg.Wait(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, dataColumnSidecar := range dataColumnSidecars {
|
||||||
// Extract the data columns index.
|
// Extract the data columns index.
|
||||||
dataColumnIndex := dataColumnSidecar.Index
|
dataColumnIndex := dataColumnSidecar.Index
|
||||||
|
|
||||||
@@ -658,10 +678,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SSZ encode the data column sidecar.
|
// SSZ encode the data column sidecar.
|
||||||
sszEncodedDataColumnSidecar, err := dataColumnSidecar.MarshalSSZ()
|
sszEncodedDataColumnSidecar := sszEncodedDataColumnSidecars[i]
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "data column sidecar marshal SSZ")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compute the size of the SSZ encoded data column sidecar.
|
// Compute the size of the SSZ encoded data column sidecar.
|
||||||
incomingSszEncodedDataColumnSidecarSize := uint32(len(sszEncodedDataColumnSidecar))
|
incomingSszEncodedDataColumnSidecarSize := uint32(len(sszEncodedDataColumnSidecar))
|
||||||
@@ -680,7 +697,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string
|
|||||||
storedCount++
|
storedCount++
|
||||||
|
|
||||||
// Append the SSZ encoded data column sidecar to the SSZ encoded data column sidecars.
|
// Append the SSZ encoded data column sidecar to the SSZ encoded data column sidecars.
|
||||||
sszEncodedDataColumnSidecars = append(sszEncodedDataColumnSidecars, sszEncodedDataColumnSidecar...)
|
sszEncodedDataColumnSidecarsBytes = append(sszEncodedDataColumnSidecarsBytes, sszEncodedDataColumnSidecar...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -695,11 +712,11 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Append the SSZ encoded data column sidecars to the end of the file.
|
// Append the SSZ encoded data column sidecars to the end of the file.
|
||||||
count, err = file.WriteAt(sszEncodedDataColumnSidecars, metadata.fileSize)
|
count, err = file.WriteAt(sszEncodedDataColumnSidecarsBytes, metadata.fileSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "write SSZ encoded data column sidecars")
|
return errors.Wrap(err, "write SSZ encoded data column sidecars")
|
||||||
}
|
}
|
||||||
if count != len(sszEncodedDataColumnSidecars) {
|
if count != len(sszEncodedDataColumnSidecarsBytes) {
|
||||||
return errWrongBytesWritten
|
return errWrongBytesWritten
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -721,7 +738,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inp
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
sszEncodedDataColumnSidecarRefSize int
|
sszEncodedDataColumnSidecarRefSize int
|
||||||
sszEncodedDataColumnSidecars []byte
|
sszEncodedDataColumnSidecarsBytes []byte
|
||||||
)
|
)
|
||||||
|
|
||||||
// Initialize the count of the saved SSZ encoded data column sidecar.
|
// Initialize the count of the saved SSZ encoded data column sidecar.
|
||||||
@@ -733,7 +750,26 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inp
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, dataColumnSidecar := range dataColumnSidecars {
|
var wg errgroup.Group
|
||||||
|
sszEncodedDataColumnSidecars := make([][]byte, len(dataColumnSidecars))
|
||||||
|
for i, dataColumnSidecar := range dataColumnSidecars {
|
||||||
|
wg.Go(func() error {
|
||||||
|
// SSZ encode the first data column sidecar.
|
||||||
|
sszEncodedDataColumnSidecar, err := dataColumnSidecar.MarshalSSZ()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "data column sidecar marshal SSZ")
|
||||||
|
}
|
||||||
|
|
||||||
|
sszEncodedDataColumnSidecars[i] = sszEncodedDataColumnSidecar
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := wg.Wait(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, dataColumnSidecar := range dataColumnSidecars {
|
||||||
// Extract the data column index.
|
// Extract the data column index.
|
||||||
dataColumnIndex := dataColumnSidecar.Index
|
dataColumnIndex := dataColumnSidecar.Index
|
||||||
|
|
||||||
@@ -756,10 +792,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inp
|
|||||||
storedCount++
|
storedCount++
|
||||||
|
|
||||||
// SSZ encode the first data column sidecar.
|
// SSZ encode the first data column sidecar.
|
||||||
sszEncodedDataColumnSidecar, err := dataColumnSidecar.MarshalSSZ()
|
sszEncodedDataColumnSidecar := sszEncodedDataColumnSidecars[i]
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "data column sidecar marshal SSZ")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if the size of the SSZ encoded data column sidecar is correct.
|
// Check if the size of the SSZ encoded data column sidecar is correct.
|
||||||
if sszEncodedDataColumnSidecarRefSize != 0 && len(sszEncodedDataColumnSidecar) != sszEncodedDataColumnSidecarRefSize {
|
if sszEncodedDataColumnSidecarRefSize != 0 && len(sszEncodedDataColumnSidecar) != sszEncodedDataColumnSidecarRefSize {
|
||||||
@@ -770,7 +803,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inp
|
|||||||
sszEncodedDataColumnSidecarRefSize = len(sszEncodedDataColumnSidecar)
|
sszEncodedDataColumnSidecarRefSize = len(sszEncodedDataColumnSidecar)
|
||||||
|
|
||||||
// Append the first SSZ encoded data column sidecar to the SSZ encoded data column sidecars.
|
// Append the first SSZ encoded data column sidecar to the SSZ encoded data column sidecars.
|
||||||
sszEncodedDataColumnSidecars = append(sszEncodedDataColumnSidecars, sszEncodedDataColumnSidecar...)
|
sszEncodedDataColumnSidecarsBytes = append(sszEncodedDataColumnSidecarsBytes, sszEncodedDataColumnSidecar...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -807,12 +840,12 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inp
|
|||||||
rawIndices := indices.raw()
|
rawIndices := indices.raw()
|
||||||
|
|
||||||
// Concatenate the version, the data column sidecar size, the data column indices and the SSZ encoded data column sidecar.
|
// Concatenate the version, the data column sidecar size, the data column indices and the SSZ encoded data column sidecar.
|
||||||
countToWrite := headerSize + len(sszEncodedDataColumnSidecars)
|
countToWrite := headerSize + len(sszEncodedDataColumnSidecarsBytes)
|
||||||
bytes := make([]byte, 0, countToWrite)
|
bytes := make([]byte, 0, countToWrite)
|
||||||
bytes = append(bytes, byte(version))
|
bytes = append(bytes, byte(version))
|
||||||
bytes = append(bytes, encodedSszEncodedDataColumnSidecarSize[:]...)
|
bytes = append(bytes, encodedSszEncodedDataColumnSidecarSize[:]...)
|
||||||
bytes = append(bytes, rawIndices[:]...)
|
bytes = append(bytes, rawIndices[:]...)
|
||||||
bytes = append(bytes, sszEncodedDataColumnSidecars...)
|
bytes = append(bytes, sszEncodedDataColumnSidecarsBytes...)
|
||||||
|
|
||||||
countWritten, err := file.Write(bytes)
|
countWritten, err := file.Write(bytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ go_library(
|
|||||||
"//beacon-chain/state/state-native:go_default_library",
|
"//beacon-chain/state/state-native:go_default_library",
|
||||||
"//beacon-chain/state/stategen:go_default_library",
|
"//beacon-chain/state/stategen:go_default_library",
|
||||||
"//beacon-chain/verification:go_default_library",
|
"//beacon-chain/verification:go_default_library",
|
||||||
|
"//cmd/beacon-chain/flags:go_default_library",
|
||||||
"//config/fieldparams:go_default_library",
|
"//config/fieldparams:go_default_library",
|
||||||
"//config/params:go_default_library",
|
"//config/params:go_default_library",
|
||||||
"//consensus-types/blocks:go_default_library",
|
"//consensus-types/blocks:go_default_library",
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
|
||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/execution/types"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/execution/types"
|
||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/verification"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/verification"
|
||||||
|
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
|
||||||
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
|
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
|
||||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||||
@@ -538,6 +539,10 @@ func (s *Service) GetBlobsV2(ctx context.Context, versionedHashes []common.Hash)
|
|||||||
return nil, errors.New(fmt.Sprintf("%s is not supported", GetBlobsV2))
|
return nil, errors.New(fmt.Sprintf("%s is not supported", GetBlobsV2))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if flags.Get().DisableGetBlobsV2 {
|
||||||
|
return []*pb.BlobAndProofV2{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
result := make([]*pb.BlobAndProofV2, len(versionedHashes))
|
result := make([]*pb.BlobAndProofV2, len(versionedHashes))
|
||||||
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV2, versionedHashes)
|
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV2, versionedHashes)
|
||||||
|
|
||||||
|
|||||||
@@ -130,6 +130,10 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) {
|
|||||||
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitAttestationsV2")
|
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitAttestationsV2")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
if shared.IsSyncing(ctx, w, s.SyncChecker, s.HeadFetcher, s.TimeFetcher, s.OptimisticModeFetcher) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
versionHeader := r.Header.Get(api.VersionHeader)
|
versionHeader := r.Header.Get(api.VersionHeader)
|
||||||
if versionHeader == "" {
|
if versionHeader == "" {
|
||||||
httputil.HandleError(w, api.VersionHeader+" header is required", http.StatusBadRequest)
|
httputil.HandleError(w, api.VersionHeader+" header is required", http.StatusBadRequest)
|
||||||
@@ -238,22 +242,14 @@ func (s *Server) handleAttestationsElectra(
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
targetState, err := s.AttestationStateFetcher.AttestationTargetState(ctx, singleAtt.Data.Target)
|
// Broadcast first using CommitteeId directly (fast path)
|
||||||
if err != nil {
|
// This matches gRPC behavior and avoids blocking on state fetching
|
||||||
return nil, nil, errors.Wrap(err, "could not get target state for attestation")
|
wantedEpoch := slots.ToEpoch(singleAtt.Data.Slot)
|
||||||
}
|
|
||||||
committee, err := corehelpers.BeaconCommitteeFromState(ctx, targetState, singleAtt.Data.Slot, singleAtt.CommitteeId)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, errors.Wrap(err, "could not get committee for attestation")
|
|
||||||
}
|
|
||||||
att := singleAtt.ToAttestationElectra(committee)
|
|
||||||
|
|
||||||
wantedEpoch := slots.ToEpoch(att.Data.Slot)
|
|
||||||
vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
|
vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, errors.Wrap(err, "could not get head validator indices")
|
return nil, nil, errors.Wrap(err, "could not get head validator indices")
|
||||||
}
|
}
|
||||||
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.GetCommitteeIndex(), att.Data.Slot)
|
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), singleAtt.CommitteeId, singleAtt.Data.Slot)
|
||||||
if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, singleAtt); err != nil {
|
if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, singleAtt); err != nil {
|
||||||
failedBroadcasts = append(failedBroadcasts, &server.IndexedError{
|
failedBroadcasts = append(failedBroadcasts, &server.IndexedError{
|
||||||
Index: i,
|
Index: i,
|
||||||
@@ -264,6 +260,23 @@ func (s *Server) handleAttestationsElectra(
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save to pool after broadcast (slow path - requires state fetching)
|
||||||
|
// Run in goroutine to avoid blocking the HTTP response
|
||||||
|
go func() {
|
||||||
|
for _, singleAtt := range validAttestations {
|
||||||
|
targetState, err := s.AttestationStateFetcher.AttestationTargetState(context.Background(), singleAtt.Data.Target)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("Could not get target state for attestation")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
committee, err := corehelpers.BeaconCommitteeFromState(context.Background(), targetState, singleAtt.Data.Slot, singleAtt.CommitteeId)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("Could not get committee for attestation")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
att := singleAtt.ToAttestationElectra(committee)
|
||||||
|
|
||||||
if features.Get().EnableExperimentalAttestationPool {
|
if features.Get().EnableExperimentalAttestationPool {
|
||||||
if err = s.AttestationCache.Add(att); err != nil {
|
if err = s.AttestationCache.Add(att); err != nil {
|
||||||
@@ -275,6 +288,7 @@ func (s *Server) handleAttestationsElectra(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
if len(failedBroadcasts) > 0 {
|
if len(failedBroadcasts) > 0 {
|
||||||
log.WithFields(logrus.Fields{
|
log.WithFields(logrus.Fields{
|
||||||
@@ -470,6 +484,10 @@ func (s *Server) SubmitSyncCommitteeSignatures(w http.ResponseWriter, r *http.Re
|
|||||||
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitPoolSyncCommitteeSignatures")
|
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitPoolSyncCommitteeSignatures")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
if shared.IsSyncing(ctx, w, s.SyncChecker, s.HeadFetcher, s.TimeFetcher, s.OptimisticModeFetcher) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var req structs.SubmitSyncCommitteeSignaturesRequest
|
var req structs.SubmitSyncCommitteeSignaturesRequest
|
||||||
err := json.NewDecoder(r.Body).Decode(&req.Data)
|
err := json.NewDecoder(r.Body).Decode(&req.Data)
|
||||||
switch {
|
switch {
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ import (
|
|||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits/mock"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits/mock"
|
||||||
p2pMock "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing"
|
p2pMock "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing"
|
||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core"
|
||||||
|
mockSync "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/initial-sync/testing"
|
||||||
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
|
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
|
||||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||||
@@ -622,6 +623,8 @@ func TestSubmitAttestationsV2(t *testing.T) {
|
|||||||
HeadFetcher: chainService,
|
HeadFetcher: chainService,
|
||||||
ChainInfoFetcher: chainService,
|
ChainInfoFetcher: chainService,
|
||||||
TimeFetcher: chainService,
|
TimeFetcher: chainService,
|
||||||
|
OptimisticModeFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
OperationNotifier: &blockchainmock.MockOperationNotifier{},
|
OperationNotifier: &blockchainmock.MockOperationNotifier{},
|
||||||
AttestationStateFetcher: chainService,
|
AttestationStateFetcher: chainService,
|
||||||
}
|
}
|
||||||
@@ -654,6 +657,7 @@ func TestSubmitAttestationsV2(t *testing.T) {
|
|||||||
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
|
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
|
||||||
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
|
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
|
||||||
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
|
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
|
||||||
|
time.Sleep(100 * time.Millisecond) // Wait for async pool save
|
||||||
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
|
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
|
||||||
})
|
})
|
||||||
t.Run("multiple", func(t *testing.T) {
|
t.Run("multiple", func(t *testing.T) {
|
||||||
@@ -673,6 +677,7 @@ func TestSubmitAttestationsV2(t *testing.T) {
|
|||||||
assert.Equal(t, http.StatusOK, writer.Code)
|
assert.Equal(t, http.StatusOK, writer.Code)
|
||||||
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
|
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
|
||||||
assert.Equal(t, 2, broadcaster.NumAttestations())
|
assert.Equal(t, 2, broadcaster.NumAttestations())
|
||||||
|
time.Sleep(100 * time.Millisecond) // Wait for async pool save
|
||||||
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
|
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
|
||||||
})
|
})
|
||||||
t.Run("phase0 att post electra", func(t *testing.T) {
|
t.Run("phase0 att post electra", func(t *testing.T) {
|
||||||
@@ -793,6 +798,7 @@ func TestSubmitAttestationsV2(t *testing.T) {
|
|||||||
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
|
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
|
||||||
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
|
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
|
||||||
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
|
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
|
||||||
|
time.Sleep(100 * time.Millisecond) // Wait for async pool save
|
||||||
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
|
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
|
||||||
})
|
})
|
||||||
t.Run("multiple", func(t *testing.T) {
|
t.Run("multiple", func(t *testing.T) {
|
||||||
@@ -812,6 +818,7 @@ func TestSubmitAttestationsV2(t *testing.T) {
|
|||||||
assert.Equal(t, http.StatusOK, writer.Code)
|
assert.Equal(t, http.StatusOK, writer.Code)
|
||||||
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
|
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
|
||||||
assert.Equal(t, 2, broadcaster.NumAttestations())
|
assert.Equal(t, 2, broadcaster.NumAttestations())
|
||||||
|
time.Sleep(100 * time.Millisecond) // Wait for async pool save
|
||||||
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
|
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
|
||||||
})
|
})
|
||||||
t.Run("no body", func(t *testing.T) {
|
t.Run("no body", func(t *testing.T) {
|
||||||
@@ -861,6 +868,27 @@ func TestSubmitAttestationsV2(t *testing.T) {
|
|||||||
assert.Equal(t, true, strings.Contains(e.Failures[0].Message, "Incorrect attestation signature"))
|
assert.Equal(t, true, strings.Contains(e.Failures[0].Message, "Incorrect attestation signature"))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
t.Run("syncing", func(t *testing.T) {
|
||||||
|
chainService := &blockchainmock.ChainService{}
|
||||||
|
s := &Server{
|
||||||
|
HeadFetcher: chainService,
|
||||||
|
TimeFetcher: chainService,
|
||||||
|
OptimisticModeFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: true},
|
||||||
|
}
|
||||||
|
|
||||||
|
var body bytes.Buffer
|
||||||
|
_, err := body.WriteString(singleAtt)
|
||||||
|
require.NoError(t, err)
|
||||||
|
request := httptest.NewRequest(http.MethodPost, "http://example.com", &body)
|
||||||
|
request.Header.Set(api.VersionHeader, version.String(version.Phase0))
|
||||||
|
writer := httptest.NewRecorder()
|
||||||
|
writer.Body = &bytes.Buffer{}
|
||||||
|
|
||||||
|
s.SubmitAttestationsV2(writer, request)
|
||||||
|
assert.Equal(t, http.StatusServiceUnavailable, writer.Code)
|
||||||
|
assert.Equal(t, true, strings.Contains(writer.Body.String(), "Beacon node is currently syncing"))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestListVoluntaryExits(t *testing.T) {
|
func TestListVoluntaryExits(t *testing.T) {
|
||||||
@@ -1057,14 +1085,19 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("single", func(t *testing.T) {
|
t.Run("single", func(t *testing.T) {
|
||||||
broadcaster := &p2pMock.MockBroadcaster{}
|
broadcaster := &p2pMock.MockBroadcaster{}
|
||||||
|
chainService := &blockchainmock.ChainService{
|
||||||
|
State: st,
|
||||||
|
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
|
||||||
|
}
|
||||||
s := &Server{
|
s := &Server{
|
||||||
|
HeadFetcher: chainService,
|
||||||
|
TimeFetcher: chainService,
|
||||||
|
OptimisticModeFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
CoreService: &core.Service{
|
CoreService: &core.Service{
|
||||||
SyncCommitteePool: synccommittee.NewStore(),
|
SyncCommitteePool: synccommittee.NewStore(),
|
||||||
P2P: broadcaster,
|
P2P: broadcaster,
|
||||||
HeadFetcher: &blockchainmock.ChainService{
|
HeadFetcher: chainService,
|
||||||
State: st,
|
|
||||||
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1089,14 +1122,19 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
|
|||||||
})
|
})
|
||||||
t.Run("multiple", func(t *testing.T) {
|
t.Run("multiple", func(t *testing.T) {
|
||||||
broadcaster := &p2pMock.MockBroadcaster{}
|
broadcaster := &p2pMock.MockBroadcaster{}
|
||||||
|
chainService := &blockchainmock.ChainService{
|
||||||
|
State: st,
|
||||||
|
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
|
||||||
|
}
|
||||||
s := &Server{
|
s := &Server{
|
||||||
|
HeadFetcher: chainService,
|
||||||
|
TimeFetcher: chainService,
|
||||||
|
OptimisticModeFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
CoreService: &core.Service{
|
CoreService: &core.Service{
|
||||||
SyncCommitteePool: synccommittee.NewStore(),
|
SyncCommitteePool: synccommittee.NewStore(),
|
||||||
P2P: broadcaster,
|
P2P: broadcaster,
|
||||||
HeadFetcher: &blockchainmock.ChainService{
|
HeadFetcher: chainService,
|
||||||
State: st,
|
|
||||||
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1120,13 +1158,18 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
|
|||||||
})
|
})
|
||||||
t.Run("invalid", func(t *testing.T) {
|
t.Run("invalid", func(t *testing.T) {
|
||||||
broadcaster := &p2pMock.MockBroadcaster{}
|
broadcaster := &p2pMock.MockBroadcaster{}
|
||||||
|
chainService := &blockchainmock.ChainService{
|
||||||
|
State: st,
|
||||||
|
}
|
||||||
s := &Server{
|
s := &Server{
|
||||||
|
HeadFetcher: chainService,
|
||||||
|
TimeFetcher: chainService,
|
||||||
|
OptimisticModeFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
CoreService: &core.Service{
|
CoreService: &core.Service{
|
||||||
SyncCommitteePool: synccommittee.NewStore(),
|
SyncCommitteePool: synccommittee.NewStore(),
|
||||||
P2P: broadcaster,
|
P2P: broadcaster,
|
||||||
HeadFetcher: &blockchainmock.ChainService{
|
HeadFetcher: chainService,
|
||||||
State: st,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1149,7 +1192,13 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
|
|||||||
assert.Equal(t, false, broadcaster.BroadcastCalled.Load())
|
assert.Equal(t, false, broadcaster.BroadcastCalled.Load())
|
||||||
})
|
})
|
||||||
t.Run("empty", func(t *testing.T) {
|
t.Run("empty", func(t *testing.T) {
|
||||||
s := &Server{}
|
chainService := &blockchainmock.ChainService{State: st}
|
||||||
|
s := &Server{
|
||||||
|
HeadFetcher: chainService,
|
||||||
|
TimeFetcher: chainService,
|
||||||
|
OptimisticModeFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
|
}
|
||||||
|
|
||||||
var body bytes.Buffer
|
var body bytes.Buffer
|
||||||
_, err := body.WriteString("[]")
|
_, err := body.WriteString("[]")
|
||||||
@@ -1166,7 +1215,13 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
|
|||||||
assert.Equal(t, true, strings.Contains(e.Message, "No data submitted"))
|
assert.Equal(t, true, strings.Contains(e.Message, "No data submitted"))
|
||||||
})
|
})
|
||||||
t.Run("no body", func(t *testing.T) {
|
t.Run("no body", func(t *testing.T) {
|
||||||
s := &Server{}
|
chainService := &blockchainmock.ChainService{State: st}
|
||||||
|
s := &Server{
|
||||||
|
HeadFetcher: chainService,
|
||||||
|
TimeFetcher: chainService,
|
||||||
|
OptimisticModeFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
|
}
|
||||||
|
|
||||||
request := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
|
request := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
|
||||||
writer := httptest.NewRecorder()
|
writer := httptest.NewRecorder()
|
||||||
@@ -1179,6 +1234,26 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
|
|||||||
assert.Equal(t, http.StatusBadRequest, e.Code)
|
assert.Equal(t, http.StatusBadRequest, e.Code)
|
||||||
assert.Equal(t, true, strings.Contains(e.Message, "No data submitted"))
|
assert.Equal(t, true, strings.Contains(e.Message, "No data submitted"))
|
||||||
})
|
})
|
||||||
|
t.Run("syncing", func(t *testing.T) {
|
||||||
|
chainService := &blockchainmock.ChainService{State: st}
|
||||||
|
s := &Server{
|
||||||
|
HeadFetcher: chainService,
|
||||||
|
TimeFetcher: chainService,
|
||||||
|
OptimisticModeFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: true},
|
||||||
|
}
|
||||||
|
|
||||||
|
var body bytes.Buffer
|
||||||
|
_, err := body.WriteString(singleSyncCommitteeMsg)
|
||||||
|
require.NoError(t, err)
|
||||||
|
request := httptest.NewRequest(http.MethodPost, "http://example.com", &body)
|
||||||
|
writer := httptest.NewRecorder()
|
||||||
|
writer.Body = &bytes.Buffer{}
|
||||||
|
|
||||||
|
s.SubmitSyncCommitteeSignatures(writer, request)
|
||||||
|
assert.Equal(t, http.StatusServiceUnavailable, writer.Code)
|
||||||
|
assert.Equal(t, true, strings.Contains(writer.Body.String(), "Beacon node is currently syncing"))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestListBLSToExecutionChanges(t *testing.T) {
|
func TestListBLSToExecutionChanges(t *testing.T) {
|
||||||
|
|||||||
@@ -52,24 +52,27 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation
|
|||||||
ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestation")
|
ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestation")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
if vs.SyncChecker.Syncing() {
|
||||||
|
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
|
||||||
|
}
|
||||||
|
|
||||||
resp, err := vs.proposeAtt(ctx, att, att.GetData().CommitteeIndex)
|
resp, err := vs.proposeAtt(ctx, att, att.GetData().CommitteeIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
if features.Get().EnableExperimentalAttestationPool {
|
if features.Get().EnableExperimentalAttestationPool {
|
||||||
if err = vs.AttestationCache.Add(att); err != nil {
|
if err := vs.AttestationCache.Add(att); err != nil {
|
||||||
log.WithError(err).Error("Could not save attestation")
|
log.WithError(err).Error("Could not save attestation")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
go func() {
|
|
||||||
attCopy := att.Copy()
|
attCopy := att.Copy()
|
||||||
if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil {
|
if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil {
|
||||||
log.WithError(err).Error("Could not save unaggregated attestation")
|
log.WithError(err).Error("Could not save unaggregated attestation")
|
||||||
return
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
@@ -82,6 +85,10 @@ func (vs *Server) ProposeAttestationElectra(ctx context.Context, singleAtt *ethp
|
|||||||
ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestationElectra")
|
ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestationElectra")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
if vs.SyncChecker.Syncing() {
|
||||||
|
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
|
||||||
|
}
|
||||||
|
|
||||||
resp, err := vs.proposeAtt(ctx, singleAtt, singleAtt.GetCommitteeIndex())
|
resp, err := vs.proposeAtt(ctx, singleAtt, singleAtt.GetCommitteeIndex())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -98,18 +105,17 @@ func (vs *Server) ProposeAttestationElectra(ctx context.Context, singleAtt *ethp
|
|||||||
|
|
||||||
singleAttCopy := singleAtt.Copy()
|
singleAttCopy := singleAtt.Copy()
|
||||||
att := singleAttCopy.ToAttestationElectra(committee)
|
att := singleAttCopy.ToAttestationElectra(committee)
|
||||||
|
go func() {
|
||||||
if features.Get().EnableExperimentalAttestationPool {
|
if features.Get().EnableExperimentalAttestationPool {
|
||||||
if err = vs.AttestationCache.Add(att); err != nil {
|
if err := vs.AttestationCache.Add(att); err != nil {
|
||||||
log.WithError(err).Error("Could not save attestation")
|
log.WithError(err).Error("Could not save attestation")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
go func() {
|
|
||||||
if err := vs.AttPool.SaveUnaggregatedAttestation(att); err != nil {
|
if err := vs.AttPool.SaveUnaggregatedAttestation(att); err != nil {
|
||||||
log.WithError(err).Error("Could not save unaggregated attestation")
|
log.WithError(err).Error("Could not save unaggregated attestation")
|
||||||
return
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -38,6 +38,7 @@ func TestProposeAttestation(t *testing.T) {
|
|||||||
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
|
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
|
||||||
TimeFetcher: chainService,
|
TimeFetcher: chainService,
|
||||||
AttestationStateFetcher: chainService,
|
AttestationStateFetcher: chainService,
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
}
|
}
|
||||||
head := util.NewBeaconBlock()
|
head := util.NewBeaconBlock()
|
||||||
head.Block.Slot = 999
|
head.Block.Slot = 999
|
||||||
@@ -141,6 +142,7 @@ func TestProposeAttestation_IncorrectSignature(t *testing.T) {
|
|||||||
P2P: &mockp2p.MockBroadcaster{},
|
P2P: &mockp2p.MockBroadcaster{},
|
||||||
AttPool: attestations.NewPool(),
|
AttPool: attestations.NewPool(),
|
||||||
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
|
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
}
|
}
|
||||||
|
|
||||||
req := util.HydrateAttestation(ðpb.Attestation{})
|
req := util.HydrateAttestation(ðpb.Attestation{})
|
||||||
@@ -149,6 +151,37 @@ func TestProposeAttestation_IncorrectSignature(t *testing.T) {
|
|||||||
assert.ErrorContains(t, wanted, err)
|
assert.ErrorContains(t, wanted, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestProposeAttestation_Syncing(t *testing.T) {
|
||||||
|
attesterServer := &Server{
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: true},
|
||||||
|
}
|
||||||
|
|
||||||
|
req := util.HydrateAttestation(ðpb.Attestation{})
|
||||||
|
_, err := attesterServer.ProposeAttestation(t.Context(), req)
|
||||||
|
assert.ErrorContains(t, "Syncing to latest head", err)
|
||||||
|
s, ok := status.FromError(err)
|
||||||
|
require.Equal(t, true, ok)
|
||||||
|
assert.Equal(t, codes.Unavailable, s.Code())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProposeAttestationElectra_Syncing(t *testing.T) {
|
||||||
|
attesterServer := &Server{
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: true},
|
||||||
|
}
|
||||||
|
|
||||||
|
req := ðpb.SingleAttestation{
|
||||||
|
Data: ðpb.AttestationData{
|
||||||
|
Source: ðpb.Checkpoint{Root: make([]byte, 32)},
|
||||||
|
Target: ðpb.Checkpoint{Root: make([]byte, 32)},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, err := attesterServer.ProposeAttestationElectra(t.Context(), req)
|
||||||
|
assert.ErrorContains(t, "Syncing to latest head", err)
|
||||||
|
s, ok := status.FromError(err)
|
||||||
|
require.Equal(t, true, ok)
|
||||||
|
assert.Equal(t, codes.Unavailable, s.Code())
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetAttestationData_OK(t *testing.T) {
|
func TestGetAttestationData_OK(t *testing.T) {
|
||||||
block := util.NewBeaconBlock()
|
block := util.NewBeaconBlock()
|
||||||
block.Block.Slot = 3*params.BeaconConfig().SlotsPerEpoch + 1
|
block.Block.Slot = 3*params.BeaconConfig().SlotsPerEpoch + 1
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package sync
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"maps"
|
"maps"
|
||||||
"slices"
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -243,8 +244,10 @@ func requestDirectSidecarsFromPeers(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Compute missing indices by root, excluding those already in storage.
|
// Compute missing indices by root, excluding those already in storage.
|
||||||
|
var lastRoot [fieldparams.RootLength]byte
|
||||||
missingIndicesByRoot := make(map[[fieldparams.RootLength]byte]map[uint64]bool, len(incompleteRoots))
|
missingIndicesByRoot := make(map[[fieldparams.RootLength]byte]map[uint64]bool, len(incompleteRoots))
|
||||||
for root := range incompleteRoots {
|
for root := range incompleteRoots {
|
||||||
|
lastRoot = root
|
||||||
storedIndices := storedIndicesByRoot[root]
|
storedIndices := storedIndicesByRoot[root]
|
||||||
|
|
||||||
missingIndices := make(map[uint64]bool, len(requestedIndices))
|
missingIndices := make(map[uint64]bool, len(requestedIndices))
|
||||||
@@ -259,6 +262,7 @@ func requestDirectSidecarsFromPeers(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
initialMissingRootCount := len(missingIndicesByRoot)
|
||||||
initialMissingCount := computeTotalCount(missingIndicesByRoot)
|
initialMissingCount := computeTotalCount(missingIndicesByRoot)
|
||||||
|
|
||||||
indicesByRootByPeer, err := computeIndicesByRootByPeer(params.P2P, slotByRoot, missingIndicesByRoot, connectedPeers)
|
indicesByRootByPeer, err := computeIndicesByRootByPeer(params.P2P, slotByRoot, missingIndicesByRoot, connectedPeers)
|
||||||
@@ -301,11 +305,19 @@ func requestDirectSidecarsFromPeers(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithFields(logrus.Fields{
|
log := log.WithFields(logrus.Fields{
|
||||||
"duration": time.Since(start),
|
"duration": time.Since(start),
|
||||||
|
"initialMissingRootCount": initialMissingRootCount,
|
||||||
"initialMissingCount": initialMissingCount,
|
"initialMissingCount": initialMissingCount,
|
||||||
|
"finalMissingRootCount": len(missingIndicesByRoot),
|
||||||
"finalMissingCount": computeTotalCount(missingIndicesByRoot),
|
"finalMissingCount": computeTotalCount(missingIndicesByRoot),
|
||||||
}).Debug("Requested direct data column sidecars from peers")
|
})
|
||||||
|
|
||||||
|
if initialMissingRootCount == 1 {
|
||||||
|
log = log.WithField("root", fmt.Sprintf("%#x", lastRoot))
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug("Requested direct data column sidecars from peers")
|
||||||
|
|
||||||
return verifiedColumnsByRoot, nil
|
return verifiedColumnsByRoot, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,9 +3,9 @@ package sync
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
|
||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/blocks"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/blocks"
|
||||||
@@ -21,13 +21,23 @@ import (
|
|||||||
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
|
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
|
||||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||||
"github.com/OffchainLabs/prysm/v7/runtime/version"
|
"github.com/OffchainLabs/prysm/v7/runtime/version"
|
||||||
"github.com/OffchainLabs/prysm/v7/time"
|
|
||||||
"github.com/OffchainLabs/prysm/v7/time/slots"
|
"github.com/OffchainLabs/prysm/v7/time/slots"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
var pendingAttsLimit = 32768
|
const pendingAttsLimit = 32768
|
||||||
|
|
||||||
|
// aggregatorIndexFilter defines how aggregator index should be handled in equality checks.
|
||||||
|
type aggregatorIndexFilter int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ignoreAggregatorIndex means aggregates differing only by aggregator index are considered equal.
|
||||||
|
ignoreAggregatorIndex aggregatorIndexFilter = iota
|
||||||
|
// includeAggregatorIndex means aggregator index must also match for aggregates to be considered equal.
|
||||||
|
includeAggregatorIndex
|
||||||
|
)
|
||||||
|
|
||||||
// This method processes pending attestations as a "known" block as arrived. With validations,
|
// This method processes pending attestations as a "known" block as arrived. With validations,
|
||||||
// the valid attestations get saved into the operation mem pool, and the invalid attestations gets deleted
|
// the valid attestations get saved into the operation mem pool, and the invalid attestations gets deleted
|
||||||
@@ -50,16 +60,7 @@ func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte
|
|||||||
attestations := s.blkRootToPendingAtts[bRoot]
|
attestations := s.blkRootToPendingAtts[bRoot]
|
||||||
s.pendingAttsLock.RUnlock()
|
s.pendingAttsLock.RUnlock()
|
||||||
|
|
||||||
if len(attestations) > 0 {
|
|
||||||
start := time.Now()
|
|
||||||
s.processAttestations(ctx, attestations)
|
s.processAttestations(ctx, attestations)
|
||||||
duration := time.Since(start)
|
|
||||||
log.WithFields(logrus.Fields{
|
|
||||||
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
|
|
||||||
"pendingAttsCount": len(attestations),
|
|
||||||
"duration": duration,
|
|
||||||
}).Debug("Verified and saved pending attestations to pool")
|
|
||||||
}
|
|
||||||
|
|
||||||
randGen := rand.NewGenerator()
|
randGen := rand.NewGenerator()
|
||||||
// Delete the missing block root key from pending attestation queue so a node will not request for the block again.
|
// Delete the missing block root key from pending attestation queue so a node will not request for the block again.
|
||||||
@@ -79,26 +80,71 @@ func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte
|
|||||||
return s.sendBatchRootRequest(ctx, pendingRoots, randGen)
|
return s.sendBatchRootRequest(ctx, pendingRoots, randGen)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// processAttestations processes a list of attestations.
|
||||||
|
// It assumes (for logging purposes only) that all attestations pertain to the same block.
|
||||||
func (s *Service) processAttestations(ctx context.Context, attestations []any) {
|
func (s *Service) processAttestations(ctx context.Context, attestations []any) {
|
||||||
if len(attestations) == 0 {
|
if len(attestations) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
firstAttestation := attestations[0]
|
||||||
|
var blockRoot []byte
|
||||||
|
switch v := firstAttestation.(type) {
|
||||||
|
case ethpb.Att:
|
||||||
|
blockRoot = v.GetData().BeaconBlockRoot
|
||||||
|
case ethpb.SignedAggregateAttAndProof:
|
||||||
|
blockRoot = v.AggregateAttestationAndProof().AggregateVal().GetData().BeaconBlockRoot
|
||||||
|
default:
|
||||||
|
log.Warnf("Unexpected attestation type %T, skipping processing", v)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
validAggregates := make([]ethpb.SignedAggregateAttAndProof, 0, len(attestations))
|
||||||
|
startAggregate := time.Now()
|
||||||
atts := make([]ethpb.Att, 0, len(attestations))
|
atts := make([]ethpb.Att, 0, len(attestations))
|
||||||
|
aggregateAttAndProofCount := 0
|
||||||
for _, att := range attestations {
|
for _, att := range attestations {
|
||||||
switch v := att.(type) {
|
switch v := att.(type) {
|
||||||
case ethpb.Att:
|
case ethpb.Att:
|
||||||
atts = append(atts, v)
|
atts = append(atts, v)
|
||||||
case ethpb.SignedAggregateAttAndProof:
|
case ethpb.SignedAggregateAttAndProof:
|
||||||
s.processAggregate(ctx, v)
|
aggregateAttAndProofCount++
|
||||||
|
// Avoid processing multiple aggregates only differing by aggregator index.
|
||||||
|
if slices.ContainsFunc(validAggregates, func(other ethpb.SignedAggregateAttAndProof) bool {
|
||||||
|
return pendingAggregatesAreEqual(v, other, ignoreAggregatorIndex)
|
||||||
|
}) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.processAggregate(ctx, v); err != nil {
|
||||||
|
log.WithError(err).Debug("Pending aggregate attestation could not be processed")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
validAggregates = append(validAggregates, v)
|
||||||
default:
|
default:
|
||||||
log.Warnf("Unexpected attestation type %T, skipping", v)
|
log.Warnf("Unexpected attestation type %T, skipping", v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
durationAggregateAttAndProof := time.Since(startAggregate)
|
||||||
|
|
||||||
|
startAtts := time.Now()
|
||||||
for _, bucket := range bucketAttestationsByData(atts) {
|
for _, bucket := range bucketAttestationsByData(atts) {
|
||||||
s.processAttestationBucket(ctx, bucket)
|
s.processAttestationBucket(ctx, bucket)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
durationAtts := time.Since(startAtts)
|
||||||
|
|
||||||
|
log.WithFields(logrus.Fields{
|
||||||
|
"blockRoot": fmt.Sprintf("%#x", blockRoot),
|
||||||
|
"totalCount": len(attestations),
|
||||||
|
"aggregateAttAndProofCount": aggregateAttAndProofCount,
|
||||||
|
"uniqueAggregateAttAndProofCount": len(validAggregates),
|
||||||
|
"attCount": len(atts),
|
||||||
|
"durationTotal": durationAggregateAttAndProof + durationAtts,
|
||||||
|
"durationAggregateAttAndProof": durationAggregateAttAndProof,
|
||||||
|
"durationAtts": durationAtts,
|
||||||
|
}).Debug("Verified and saved pending attestations to pool")
|
||||||
}
|
}
|
||||||
|
|
||||||
// attestationBucket groups attestations with the same AttestationData for batch processing.
|
// attestationBucket groups attestations with the same AttestationData for batch processing.
|
||||||
@@ -303,21 +349,20 @@ func (s *Service) processVerifiedAttestation(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAggregateAttAndProof) {
|
func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAggregateAttAndProof) error {
|
||||||
res, err := s.validateAggregatedAtt(ctx, aggregate)
|
res, err := s.validateAggregatedAtt(ctx, aggregate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Debug("Pending aggregated attestation failed validation")
|
log.WithError(err).Debug("Pending aggregated attestation failed validation")
|
||||||
return
|
return errors.Wrap(err, "validate aggregated att")
|
||||||
}
|
}
|
||||||
|
|
||||||
if res != pubsub.ValidationAccept || !s.validateBlockInAttestation(ctx, aggregate) {
|
if res != pubsub.ValidationAccept || !s.validateBlockInAttestation(ctx, aggregate) {
|
||||||
log.Debug("Pending aggregated attestation failed validation")
|
return errors.New("Pending aggregated attestation failed validation")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
att := aggregate.AggregateAttestationAndProof().AggregateVal()
|
att := aggregate.AggregateAttestationAndProof().AggregateVal()
|
||||||
if err := s.saveAttestation(att); err != nil {
|
if err := s.saveAttestation(att); err != nil {
|
||||||
log.WithError(err).Debug("Could not save aggregated attestation")
|
return errors.Wrap(err, "save attestation")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = s.setAggregatorIndexEpochSeen(att.GetData().Target.Epoch, aggregate.AggregateAttestationAndProof().GetAggregatorIndex())
|
_ = s.setAggregatorIndexEpochSeen(att.GetData().Target.Epoch, aggregate.AggregateAttestationAndProof().GetAggregatorIndex())
|
||||||
@@ -325,6 +370,8 @@ func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAg
|
|||||||
if err := s.cfg.p2p.Broadcast(ctx, aggregate); err != nil {
|
if err := s.cfg.p2p.Broadcast(ctx, aggregate); err != nil {
|
||||||
log.WithError(err).Debug("Could not broadcast aggregated attestation")
|
log.WithError(err).Debug("Could not broadcast aggregated attestation")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// This defines how pending aggregates are saved in the map. The key is the
|
// This defines how pending aggregates are saved in the map. The key is the
|
||||||
@@ -336,7 +383,7 @@ func (s *Service) savePendingAggregate(agg ethpb.SignedAggregateAttAndProof) {
|
|||||||
|
|
||||||
s.savePending(root, agg, func(other any) bool {
|
s.savePending(root, agg, func(other any) bool {
|
||||||
a, ok := other.(ethpb.SignedAggregateAttAndProof)
|
a, ok := other.(ethpb.SignedAggregateAttAndProof)
|
||||||
return ok && pendingAggregatesAreEqual(agg, a)
|
return ok && pendingAggregatesAreEqual(agg, a, includeAggregatorIndex)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -391,13 +438,19 @@ func (s *Service) savePending(root [32]byte, pending any, isEqual func(other any
|
|||||||
s.blkRootToPendingAtts[root] = append(s.blkRootToPendingAtts[root], pending)
|
s.blkRootToPendingAtts[root] = append(s.blkRootToPendingAtts[root], pending)
|
||||||
}
|
}
|
||||||
|
|
||||||
func pendingAggregatesAreEqual(a, b ethpb.SignedAggregateAttAndProof) bool {
|
// pendingAggregatesAreEqual checks if two pending aggregate attestations are equal.
|
||||||
|
// The filter parameter controls whether aggregator index is considered in the equality check.
|
||||||
|
func pendingAggregatesAreEqual(a, b ethpb.SignedAggregateAttAndProof, filter aggregatorIndexFilter) bool {
|
||||||
if a.Version() != b.Version() {
|
if a.Version() != b.Version() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if filter == includeAggregatorIndex {
|
||||||
if a.AggregateAttestationAndProof().GetAggregatorIndex() != b.AggregateAttestationAndProof().GetAggregatorIndex() {
|
if a.AggregateAttestationAndProof().GetAggregatorIndex() != b.AggregateAttestationAndProof().GetAggregatorIndex() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
aAtt := a.AggregateAttestationAndProof().AggregateVal()
|
aAtt := a.AggregateAttestationAndProof().AggregateVal()
|
||||||
bAtt := b.AggregateAttestationAndProof().AggregateVal()
|
bAtt := b.AggregateAttestationAndProof().AggregateVal()
|
||||||
if aAtt.GetData().Slot != bAtt.GetData().Slot {
|
if aAtt.GetData().Slot != bAtt.GetData().Slot {
|
||||||
|
|||||||
@@ -94,7 +94,7 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) {
|
|||||||
// Process block A (which exists and has no pending attestations)
|
// Process block A (which exists and has no pending attestations)
|
||||||
// This should skip processing attestations for A and request blocks B and C
|
// This should skip processing attestations for A and request blocks B and C
|
||||||
require.NoError(t, r.processPendingAttsForBlock(t.Context(), rootA))
|
require.NoError(t, r.processPendingAttsForBlock(t.Context(), rootA))
|
||||||
require.LogsContain(t, hook, "Requesting block by root")
|
require.LogsContain(t, hook, "Requesting blocks by root")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProcessPendingAtts_HasBlockSaveUnaggregatedAtt(t *testing.T) {
|
func TestProcessPendingAtts_HasBlockSaveUnaggregatedAtt(t *testing.T) {
|
||||||
@@ -911,17 +911,17 @@ func Test_pendingAggregatesAreEqual(t *testing.T) {
|
|||||||
},
|
},
|
||||||
AggregationBits: bitfield.Bitlist{0b1111},
|
AggregationBits: bitfield.Bitlist{0b1111},
|
||||||
}}}
|
}}}
|
||||||
assert.Equal(t, true, pendingAggregatesAreEqual(a, b))
|
assert.Equal(t, true, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
|
||||||
})
|
})
|
||||||
t.Run("different version", func(t *testing.T) {
|
t.Run("different version", func(t *testing.T) {
|
||||||
a := ðpb.SignedAggregateAttestationAndProof{Message: ðpb.AggregateAttestationAndProof{AggregatorIndex: 1}}
|
a := ðpb.SignedAggregateAttestationAndProof{Message: ðpb.AggregateAttestationAndProof{AggregatorIndex: 1}}
|
||||||
b := ðpb.SignedAggregateAttestationAndProofElectra{Message: ðpb.AggregateAttestationAndProofElectra{AggregatorIndex: 1}}
|
b := ðpb.SignedAggregateAttestationAndProofElectra{Message: ðpb.AggregateAttestationAndProofElectra{AggregatorIndex: 1}}
|
||||||
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
|
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
|
||||||
})
|
})
|
||||||
t.Run("different aggregator index", func(t *testing.T) {
|
t.Run("different aggregator index", func(t *testing.T) {
|
||||||
a := ðpb.SignedAggregateAttestationAndProof{Message: ðpb.AggregateAttestationAndProof{AggregatorIndex: 1}}
|
a := ðpb.SignedAggregateAttestationAndProof{Message: ðpb.AggregateAttestationAndProof{AggregatorIndex: 1}}
|
||||||
b := ðpb.SignedAggregateAttestationAndProof{Message: ðpb.AggregateAttestationAndProof{AggregatorIndex: 2}}
|
b := ðpb.SignedAggregateAttestationAndProof{Message: ðpb.AggregateAttestationAndProof{AggregatorIndex: 2}}
|
||||||
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
|
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
|
||||||
})
|
})
|
||||||
t.Run("different slot", func(t *testing.T) {
|
t.Run("different slot", func(t *testing.T) {
|
||||||
a := ðpb.SignedAggregateAttestationAndProof{
|
a := ðpb.SignedAggregateAttestationAndProof{
|
||||||
@@ -942,7 +942,7 @@ func Test_pendingAggregatesAreEqual(t *testing.T) {
|
|||||||
},
|
},
|
||||||
AggregationBits: bitfield.Bitlist{0b1111},
|
AggregationBits: bitfield.Bitlist{0b1111},
|
||||||
}}}
|
}}}
|
||||||
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
|
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
|
||||||
})
|
})
|
||||||
t.Run("different committee index", func(t *testing.T) {
|
t.Run("different committee index", func(t *testing.T) {
|
||||||
a := ðpb.SignedAggregateAttestationAndProof{
|
a := ðpb.SignedAggregateAttestationAndProof{
|
||||||
@@ -963,7 +963,7 @@ func Test_pendingAggregatesAreEqual(t *testing.T) {
|
|||||||
},
|
},
|
||||||
AggregationBits: bitfield.Bitlist{0b1111},
|
AggregationBits: bitfield.Bitlist{0b1111},
|
||||||
}}}
|
}}}
|
||||||
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
|
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
|
||||||
})
|
})
|
||||||
t.Run("different aggregation bits", func(t *testing.T) {
|
t.Run("different aggregation bits", func(t *testing.T) {
|
||||||
a := ðpb.SignedAggregateAttestationAndProof{
|
a := ðpb.SignedAggregateAttestationAndProof{
|
||||||
@@ -984,7 +984,30 @@ func Test_pendingAggregatesAreEqual(t *testing.T) {
|
|||||||
},
|
},
|
||||||
AggregationBits: bitfield.Bitlist{0b1000},
|
AggregationBits: bitfield.Bitlist{0b1000},
|
||||||
}}}
|
}}}
|
||||||
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
|
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
|
||||||
|
})
|
||||||
|
t.Run("different aggregator index should be equal while ignoring aggregator index", func(t *testing.T) {
|
||||||
|
a := ðpb.SignedAggregateAttestationAndProof{
|
||||||
|
Message: ðpb.AggregateAttestationAndProof{
|
||||||
|
AggregatorIndex: 1,
|
||||||
|
Aggregate: ðpb.Attestation{
|
||||||
|
Data: ðpb.AttestationData{
|
||||||
|
Slot: 1,
|
||||||
|
CommitteeIndex: 1,
|
||||||
|
},
|
||||||
|
AggregationBits: bitfield.Bitlist{0b1111},
|
||||||
|
}}}
|
||||||
|
b := ðpb.SignedAggregateAttestationAndProof{
|
||||||
|
Message: ðpb.AggregateAttestationAndProof{
|
||||||
|
AggregatorIndex: 2,
|
||||||
|
Aggregate: ðpb.Attestation{
|
||||||
|
Data: ðpb.AttestationData{
|
||||||
|
Slot: 1,
|
||||||
|
CommitteeIndex: 1,
|
||||||
|
},
|
||||||
|
AggregationBits: bitfield.Bitlist{0b1111},
|
||||||
|
}}}
|
||||||
|
assert.Equal(t, true, pendingAggregatesAreEqual(a, b, ignoreAggregatorIndex))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package sync
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -44,11 +43,13 @@ func (s *Service) processPendingBlocksQueue() {
|
|||||||
if !s.chainIsStarted() {
|
if !s.chainIsStarted() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
locker.Lock()
|
locker.Lock()
|
||||||
|
defer locker.Unlock()
|
||||||
|
|
||||||
if err := s.processPendingBlocks(s.ctx); err != nil {
|
if err := s.processPendingBlocks(s.ctx); err != nil {
|
||||||
log.WithError(err).Debug("Could not process pending blocks")
|
log.WithError(err).Debug("Could not process pending blocks")
|
||||||
}
|
}
|
||||||
locker.Unlock()
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -73,8 +74,10 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
|
|||||||
randGen := rand.NewGenerator()
|
randGen := rand.NewGenerator()
|
||||||
var parentRoots [][32]byte
|
var parentRoots [][32]byte
|
||||||
|
|
||||||
|
blkRoots := make([][32]byte, 0, len(sortedSlots)*maxBlocksPerSlot)
|
||||||
|
|
||||||
// Iterate through sorted slots.
|
// Iterate through sorted slots.
|
||||||
for _, slot := range sortedSlots {
|
for i, slot := range sortedSlots {
|
||||||
// Skip processing if slot is in the future.
|
// Skip processing if slot is in the future.
|
||||||
if slot > s.cfg.clock.CurrentSlot() {
|
if slot > s.cfg.clock.CurrentSlot() {
|
||||||
continue
|
continue
|
||||||
@@ -91,6 +94,9 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
|
|||||||
|
|
||||||
// Process each block in the queue.
|
// Process each block in the queue.
|
||||||
for _, b := range blocksInCache {
|
for _, b := range blocksInCache {
|
||||||
|
start := time.Now()
|
||||||
|
totalDuration := time.Duration(0)
|
||||||
|
|
||||||
if err := blocks.BeaconBlockIsNil(b); err != nil {
|
if err := blocks.BeaconBlockIsNil(b); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -147,19 +153,34 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
cancelFunction()
|
cancelFunction()
|
||||||
|
|
||||||
// Process pending attestations for this block.
|
blkRoots = append(blkRoots, blkRoot)
|
||||||
if err := s.processPendingAttsForBlock(ctx, blkRoot); err != nil {
|
|
||||||
log.WithError(err).Debug("Failed to process pending attestations for block")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove the processed block from the queue.
|
// Remove the processed block from the queue.
|
||||||
if err := s.removeBlockFromQueue(b, blkRoot); err != nil {
|
if err := s.removeBlockFromQueue(b, blkRoot); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.WithFields(logrus.Fields{"slot": slot, "blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:]))}).Debug("Processed pending block and cleared it in cache")
|
|
||||||
|
duration := time.Since(start)
|
||||||
|
totalDuration += duration
|
||||||
|
log.WithFields(logrus.Fields{
|
||||||
|
"slotIndex": fmt.Sprintf("%d/%d", i+1, len(sortedSlots)),
|
||||||
|
"slot": slot,
|
||||||
|
"root": fmt.Sprintf("%#x", blkRoot),
|
||||||
|
"duration": duration,
|
||||||
|
"totalDuration": totalDuration,
|
||||||
|
}).Debug("Processed pending block and cleared it in cache")
|
||||||
}
|
}
|
||||||
|
|
||||||
span.End()
|
span.End()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, blkRoot := range blkRoots {
|
||||||
|
// Process pending attestations for this block.
|
||||||
|
if err := s.processPendingAttsForBlock(ctx, blkRoot); err != nil {
|
||||||
|
log.WithError(err).Debug("Failed to process pending attestations for block")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return s.sendBatchRootRequest(ctx, parentRoots, randGen)
|
return s.sendBatchRootRequest(ctx, parentRoots, randGen)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -379,6 +400,19 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra
|
|||||||
req = roots[:maxReqBlock]
|
req = roots[:maxReqBlock]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if logrus.GetLevel() >= logrus.DebugLevel {
|
||||||
|
rootsStr := make([]string, 0, len(roots))
|
||||||
|
for _, req := range roots {
|
||||||
|
rootsStr = append(rootsStr, fmt.Sprintf("%#x", req))
|
||||||
|
}
|
||||||
|
|
||||||
|
log.WithFields(logrus.Fields{
|
||||||
|
"peer": pid,
|
||||||
|
"count": len(req),
|
||||||
|
"roots": rootsStr,
|
||||||
|
}).Debug("Requesting blocks by root")
|
||||||
|
}
|
||||||
|
|
||||||
// Send the request to the peer.
|
// Send the request to the peer.
|
||||||
if err := s.sendBeaconBlocksRequest(ctx, &req, pid); err != nil {
|
if err := s.sendBeaconBlocksRequest(ctx, &req, pid); err != nil {
|
||||||
tracing.AnnotateError(span, err)
|
tracing.AnnotateError(span, err)
|
||||||
@@ -438,8 +472,6 @@ func (s *Service) filterOutPendingAndSynced(roots [][fieldparams.RootLength]byte
|
|||||||
roots = append(roots[:i], roots[i+1:]...)
|
roots = append(roots[:i], roots[i+1:]...)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithField("blockRoot", fmt.Sprintf("%#x", r)).Debug("Requesting block by root")
|
|
||||||
}
|
}
|
||||||
return roots
|
return roots
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
|
||||||
@@ -48,7 +49,15 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
|
|||||||
return errors.Wrap(err, "new ro block with root")
|
return errors.Wrap(err, "new ro block with root")
|
||||||
}
|
}
|
||||||
|
|
||||||
go s.processSidecarsFromExecutionFromBlock(ctx, roBlock)
|
var wg sync.WaitGroup
|
||||||
|
wg.Go(func() {
|
||||||
|
if err := s.processSidecarsFromExecutionFromBlock(ctx, roBlock); err != nil {
|
||||||
|
log.WithError(err).WithFields(logrus.Fields{
|
||||||
|
"root": fmt.Sprintf("%#x", root),
|
||||||
|
"slot": block.Slot(),
|
||||||
|
}).Error("Failed to process sidecars from execution from block")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
if err := s.cfg.chain.ReceiveBlock(ctx, signed, root, nil); err != nil {
|
if err := s.cfg.chain.ReceiveBlock(ctx, signed, root, nil); err != nil {
|
||||||
if blockchain.IsInvalidBlock(err) {
|
if blockchain.IsInvalidBlock(err) {
|
||||||
@@ -69,28 +78,33 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
|
|||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.processPendingAttsForBlock(ctx, root); err != nil {
|
if err := s.processPendingAttsForBlock(ctx, root); err != nil {
|
||||||
return errors.Wrap(err, "process pending atts for block")
|
return errors.Wrap(err, "process pending atts for block")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// processSidecarsFromExecutionFromBlock retrieves (if available) sidecars data from the execution client,
|
// processSidecarsFromExecutionFromBlock retrieves (if available) sidecars data from the execution client,
|
||||||
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
|
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
|
||||||
func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) {
|
func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) error {
|
||||||
if roBlock.Version() >= version.Fulu {
|
if roBlock.Version() >= version.Fulu {
|
||||||
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromBlock(roBlock)); err != nil {
|
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromBlock(roBlock)); err != nil {
|
||||||
log.WithError(err).Error("Failed to process data column sidecars from execution")
|
return errors.Wrap(err, "process data column sidecars from execution")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if roBlock.Version() >= version.Deneb {
|
if roBlock.Version() >= version.Deneb {
|
||||||
s.processBlobSidecarsFromExecution(ctx, roBlock)
|
s.processBlobSidecarsFromExecution(ctx, roBlock)
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// processBlobSidecarsFromExecution retrieves (if available) blob sidecars data from the execution client,
|
// processBlobSidecarsFromExecution retrieves (if available) blob sidecars data from the execution client,
|
||||||
@@ -168,7 +182,6 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
|
|||||||
key := fmt.Sprintf("%#x", source.Root())
|
key := fmt.Sprintf("%#x", source.Root())
|
||||||
if _, err, _ := s.columnSidecarsExecSingleFlight.Do(key, func() (any, error) {
|
if _, err, _ := s.columnSidecarsExecSingleFlight.Do(key, func() (any, error) {
|
||||||
const delay = 250 * time.Millisecond
|
const delay = 250 * time.Millisecond
|
||||||
secondsPerHalfSlot := time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second
|
|
||||||
|
|
||||||
commitments, err := source.Commitments()
|
commitments, err := source.Commitments()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -186,9 +199,6 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
|
|||||||
return nil, errors.Wrap(err, "column indices to sample")
|
return nil, errors.Wrap(err, "column indices to sample")
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, secondsPerHalfSlot)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
log := log.WithFields(logrus.Fields{
|
log := log.WithFields(logrus.Fields{
|
||||||
"root": fmt.Sprintf("%#x", source.Root()),
|
"root": fmt.Sprintf("%#x", source.Root()),
|
||||||
"slot": source.Slot(),
|
"slot": source.Slot(),
|
||||||
@@ -209,6 +219,11 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Return if the context is done.
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
if iteration == 0 {
|
if iteration == 0 {
|
||||||
dataColumnsRecoveredFromELAttempts.Inc()
|
dataColumnsRecoveredFromELAttempts.Inc()
|
||||||
}
|
}
|
||||||
@@ -220,20 +235,10 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
|
|||||||
}
|
}
|
||||||
|
|
||||||
// No sidecars are retrieved from the EL, retry later
|
// No sidecars are retrieved from the EL, retry later
|
||||||
constructedSidecarCount = uint64(len(constructedSidecars))
|
constructedCount := uint64(len(constructedSidecars))
|
||||||
if constructedSidecarCount == 0 {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return nil, ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(delay)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
dataColumnsRecoveredFromELTotal.Inc()
|
|
||||||
|
|
||||||
// Boundary check.
|
// Boundary check.
|
||||||
if constructedSidecarCount != fieldparams.NumberOfColumns {
|
if constructedSidecarCount > 0 && constructedSidecarCount != fieldparams.NumberOfColumns {
|
||||||
return nil, errors.Errorf("reconstruct data column sidecars returned %d sidecars, expected %d - should never happen", constructedSidecarCount, fieldparams.NumberOfColumns)
|
return nil, errors.Errorf("reconstruct data column sidecars returned %d sidecars, expected %d - should never happen", constructedSidecarCount, fieldparams.NumberOfColumns)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -242,15 +247,25 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
|
|||||||
return nil, errors.Wrap(err, "broadcast and receive unseen data column sidecars")
|
return nil, errors.Wrap(err, "broadcast and receive unseen data column sidecars")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if constructedCount > 0 {
|
||||||
|
dataColumnsRecoveredFromELTotal.Inc()
|
||||||
|
|
||||||
log.WithFields(logrus.Fields{
|
log.WithFields(logrus.Fields{
|
||||||
|
"root": fmt.Sprintf("%#x", source.Root()),
|
||||||
|
"slot": source.Slot(),
|
||||||
|
"proposerIndex": source.ProposerIndex(),
|
||||||
|
"iteration": iteration,
|
||||||
|
"type": source.Type(),
|
||||||
"count": len(unseenIndices),
|
"count": len(unseenIndices),
|
||||||
"indices": helpers.SortedPrettySliceFromMap(unseenIndices),
|
"indices": helpers.SortedPrettySliceFromMap(unseenIndices),
|
||||||
}).Debug("Constructed data column sidecars from the execution client")
|
}).Debug("Constructed data column sidecars from the execution client")
|
||||||
|
|
||||||
dataColumnSidecarsObtainedViaELCount.Observe(float64(len(unseenIndices)))
|
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait before retrying.
|
||||||
|
time.Sleep(delay)
|
||||||
|
}
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -284,6 +299,11 @@ func (s *Service) broadcastAndReceiveUnseenDataColumnSidecars(
|
|||||||
unseenIndices[sidecar.Index] = true
|
unseenIndices[sidecar.Index] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Exit early if there are no nothing to broadcast or receive.
|
||||||
|
if len(unseenSidecars) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Broadcast all the data column sidecars we reconstructed but did not see via gossip (non blocking).
|
// Broadcast all the data column sidecars we reconstructed but did not see via gossip (non blocking).
|
||||||
if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, unseenSidecars); err != nil {
|
if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, unseenSidecars); err != nil {
|
||||||
return nil, errors.Wrap(err, "broadcast data column sidecars")
|
return nil, errors.Wrap(err, "broadcast data column sidecars")
|
||||||
|
|||||||
@@ -194,7 +194,8 @@ func TestProcessSidecarsFromExecutionFromBlock(t *testing.T) {
|
|||||||
},
|
},
|
||||||
seenBlobCache: lruwrpr.New(1),
|
seenBlobCache: lruwrpr.New(1),
|
||||||
}
|
}
|
||||||
s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
|
err := s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
|
||||||
|
require.NoError(t, err)
|
||||||
require.Equal(t, tt.expectedBlobCount, len(chainService.Blobs))
|
require.Equal(t, tt.expectedBlobCount, len(chainService.Blobs))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -293,7 +294,8 @@ func TestProcessSidecarsFromExecutionFromBlock(t *testing.T) {
|
|||||||
roBlock, err := blocks.NewROBlock(sb)
|
roBlock, err := blocks.NewROBlock(sb)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
|
err = s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
|
||||||
|
require.NoError(t, err)
|
||||||
require.Equal(t, tt.expectedDataColumnCount, len(chainService.DataColumns))
|
require.Equal(t, tt.expectedDataColumnCount, len(chainService.DataColumns))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,12 +25,12 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
|
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
|
||||||
return errors.Wrap(err, "receive data column sidecar")
|
return wrapDataColumnError(sidecar, "receive data column sidecar", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Go(func() error {
|
wg.Go(func() error {
|
||||||
if err := s.processDataColumnSidecarsFromReconstruction(ctx, sidecar); err != nil {
|
if err := s.processDataColumnSidecarsFromReconstruction(ctx, sidecar); err != nil {
|
||||||
return errors.Wrap(err, "process data column sidecars from reconstruction")
|
return wrapDataColumnError(sidecar, "process data column sidecars from reconstruction", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -38,7 +38,7 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
|
|||||||
|
|
||||||
wg.Go(func() error {
|
wg.Go(func() error {
|
||||||
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromSidecar(sidecar)); err != nil {
|
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromSidecar(sidecar)); err != nil {
|
||||||
return errors.Wrap(err, "process data column sidecars from execution")
|
return wrapDataColumnError(sidecar, "process data column sidecars from execution", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -110,3 +110,7 @@ func (s *Service) allDataColumnSubnets(_ primitives.Slot) map[uint64]bool {
|
|||||||
|
|
||||||
return allSubnets
|
return allSubnets
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func wrapDataColumnError(sidecar blocks.VerifiedRODataColumn, message string, err error) error {
|
||||||
|
return fmt.Errorf("%s - slot %d, root %s: %w", message, sidecar.SignedBlockHeader.Header.Slot, fmt.Sprintf("%#x", sidecar.BlockRoot()), err)
|
||||||
|
}
|
||||||
|
|||||||
@@ -51,14 +51,12 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
|
|||||||
// Decode the message, reject if it fails.
|
// Decode the message, reject if it fails.
|
||||||
m, err := s.decodePubsubMessage(msg)
|
m, err := s.decodePubsubMessage(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Failed to decode message")
|
|
||||||
return pubsub.ValidationReject, err
|
return pubsub.ValidationReject, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reject messages that are not of the expected type.
|
// Reject messages that are not of the expected type.
|
||||||
dcsc, ok := m.(*eth.DataColumnSidecar)
|
dcsc, ok := m.(*eth.DataColumnSidecar)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.WithField("message", m).Error("Message is not of type *eth.DataColumnSidecar")
|
|
||||||
return pubsub.ValidationReject, errWrongMessage
|
return pubsub.ValidationReject, errWrongMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -361,7 +361,7 @@ func (dv *RODataColumnsVerifier) SidecarParentSeen(parentSeen func([fieldparams.
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !dv.fc.HasNode(parentRoot) {
|
if !dv.fc.HasNode(parentRoot) {
|
||||||
return columnErrBuilder(errSidecarParentNotSeen)
|
return columnErrBuilder(errors.Wrapf(errSidecarParentNotSeen, "parent root: %#x", parentRoot))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
3
changelog/james-prysm_align-atter-pool-apis.md
Normal file
3
changelog/james-prysm_align-atter-pool-apis.md
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
### Changed
|
||||||
|
|
||||||
|
- the /eth/v2/beacon/pool/attestations and /eth/v1/beacon/pool/sync_committees now returns a 503 error if the node is still syncing, the rest api is also working in a similar process to gRPC broadcasting immediately now.
|
||||||
3
changelog/manu-agg.md
Normal file
3
changelog/manu-agg.md
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
### Changed
|
||||||
|
|
||||||
|
- Pending aggregates: When multiple aggregated attestations only differing by the aggregator index are in the pending queue, only process one of them.
|
||||||
2
changelog/manu-remove-error-logs.md
Normal file
2
changelog/manu-remove-error-logs.md
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
### Changed
|
||||||
|
- `validateDataColumn`: Remove error logs.
|
||||||
2
changelog/manu_disable_get_blobs_v2.md
Normal file
2
changelog/manu_disable_get_blobs_v2.md
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
### Added
|
||||||
|
- `--disable-get-blobs-v2` flag.
|
||||||
@@ -356,4 +356,9 @@ var (
|
|||||||
Usage: "A comma-separated list of exponents (of 2) in decreasing order, defining the state diff hierarchy levels. The last exponent must be greater than or equal to 5.",
|
Usage: "A comma-separated list of exponents (of 2) in decreasing order, defining the state diff hierarchy levels. The last exponent must be greater than or equal to 5.",
|
||||||
Value: cli.NewIntSlice(21, 18, 16, 13, 11, 9, 5),
|
Value: cli.NewIntSlice(21, 18, 16, 13, 11, 9, 5),
|
||||||
}
|
}
|
||||||
|
// DisableGetBlobsV2 disables the engine_getBlobsV2 usage.
|
||||||
|
DisableGetBlobsV2 = &cli.BoolFlag{
|
||||||
|
Name: "disable-get-blobs-v2",
|
||||||
|
Usage: "Disables the engine_getBlobsV2 usage.",
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ type GlobalFlags struct {
|
|||||||
SubscribeToAllSubnets bool
|
SubscribeToAllSubnets bool
|
||||||
Supernode bool
|
Supernode bool
|
||||||
SemiSupernode bool
|
SemiSupernode bool
|
||||||
|
DisableGetBlobsV2 bool
|
||||||
MinimumSyncPeers int
|
MinimumSyncPeers int
|
||||||
MinimumPeersPerSubnet int
|
MinimumPeersPerSubnet int
|
||||||
MaxConcurrentDials int
|
MaxConcurrentDials int
|
||||||
@@ -72,6 +73,11 @@ func ConfigureGlobalFlags(ctx *cli.Context) error {
|
|||||||
cfg.SemiSupernode = true
|
cfg.SemiSupernode = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ctx.Bool(DisableGetBlobsV2.Name) {
|
||||||
|
log.Warning("Disabling `engine_getBlobsV2` API")
|
||||||
|
cfg.DisableGetBlobsV2 = true
|
||||||
|
}
|
||||||
|
|
||||||
// State-diff-exponents
|
// State-diff-exponents
|
||||||
cfg.StateDiffExponents = ctx.IntSlice(StateDiffExponents.Name)
|
cfg.StateDiffExponents = ctx.IntSlice(StateDiffExponents.Name)
|
||||||
if features.Get().EnableStateDiff {
|
if features.Get().EnableStateDiff {
|
||||||
|
|||||||
@@ -147,6 +147,7 @@ var appFlags = []cli.Flag{
|
|||||||
flags.SlasherDirFlag,
|
flags.SlasherDirFlag,
|
||||||
flags.SlasherFlag,
|
flags.SlasherFlag,
|
||||||
flags.JwtId,
|
flags.JwtId,
|
||||||
|
flags.DisableGetBlobsV2,
|
||||||
storage.BlobStoragePathFlag,
|
storage.BlobStoragePathFlag,
|
||||||
storage.DataColumnStoragePathFlag,
|
storage.DataColumnStoragePathFlag,
|
||||||
storage.BlobStorageLayout,
|
storage.BlobStorageLayout,
|
||||||
|
|||||||
@@ -169,6 +169,7 @@ var appHelpFlagGroups = []flagGroup{
|
|||||||
flags.ExecutionJWTSecretFlag,
|
flags.ExecutionJWTSecretFlag,
|
||||||
flags.JwtId,
|
flags.JwtId,
|
||||||
flags.InteropMockEth1DataVotesFlag,
|
flags.InteropMockEth1DataVotesFlag,
|
||||||
|
flags.DisableGetBlobsV2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{ // Flags relevant to configuring beacon chain monitoring.
|
{ // Flags relevant to configuring beacon chain monitoring.
|
||||||
|
|||||||
Reference in New Issue
Block a user