Compare commits

...

8 Commits

Author SHA1 Message Date
Kasey Kirkham
92194a3cb5 Revert "set limit to multiple of burst for goerli (#13544)"
This reverts commit 373c853d17.
2024-01-29 10:17:19 -06:00
kasey
373c853d17 set limit to multiple of burst for goerli (#13544)
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
2024-01-27 22:12:08 +00:00
terence
23b0718b5f Add metric for data availability wait time (#13534)
* Add metric for data availability wait time

* Kasey's feedback

* Kasey's feedback
2024-01-26 18:17:25 +00:00
terence
3a9854145c Correct metrics from ns to ms (#13540) 2024-01-26 17:43:30 +00:00
Radosław Kapka
1b70d2b566 Fetch unaggregated atts in GetAggregateAttestation (#13533) 2024-01-26 17:08:58 +00:00
Nishant Das
59b310a221 make it the same (#13531) 2024-01-26 05:35:27 +00:00
Nishant Das
22b6d1751d Enable Backfill in E2E (#13524)
* enable backfill for devmode

* enable backfill

* gaz

* move to its own package

* fix panic

* fix bug

* gaz

* kasey's review
2024-01-26 04:37:41 +00:00
Potuz
9c13d47f4c fix off by one (#13529) 2024-01-26 00:05:56 +00:00
25 changed files with 220 additions and 119 deletions

View File

@@ -182,6 +182,10 @@ var (
Name: "chain_service_processing_milliseconds",
Help: "Total time to call a chain service in ReceiveBlock()",
})
dataAvailWaitedTime = promauto.NewSummary(prometheus.SummaryOpts{
Name: "da_waited_time_milliseconds",
Help: "Total time spent waiting for a data availability check in ReceiveBlock()",
})
processAttsElapsedTime = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "process_attestations_milliseconds",

View File

@@ -325,7 +325,10 @@ func (s *Service) updateEpochBoundaryCaches(ctx context.Context, st state.Beacon
}
// The proposer indices cache takes the target root for the previous
// epoch as key
target, err := s.cfg.ForkChoiceStore.TargetRootForEpoch(r, e-1)
if e > 0 {
e = e - 1
}
target, err := s.cfg.ForkChoiceStore.TargetRootForEpoch(r, e)
if err != nil {
log.WithError(err).Error("could not update proposer index state-root map")
return nil

View File

@@ -122,6 +122,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
}
}
daWaitedTime := time.Since(daStartTime)
dataAvailWaitedTime.Observe(float64(daWaitedTime.Milliseconds()))
// Defragment the state before continuing block processing.
s.defragmentState(postState)

View File

@@ -115,6 +115,7 @@ func (p *ProposerIndicesCache) IndicesFromCheckpoint(c forkchoicetypes.Checkpoin
root, ok := p.rootMap[c]
p.Unlock()
if !ok {
ProposerIndicesCacheMiss.Inc()
return emptyIndices, ok
}
return p.ProposerIndices(c.Epoch+1, root)

View File

@@ -37,70 +37,69 @@ func TestProposerCache_Set(t *testing.T) {
func TestProposerCache_CheckpointAndPrune(t *testing.T) {
cache := NewProposerIndicesCache()
indices := [fieldparams.SlotsPerEpoch]primitives.ValidatorIndex{}
root := [32]byte{'a'}
cpRoot := [32]byte{'b'}
copy(indices[3:], []primitives.ValidatorIndex{1, 2, 3, 4, 5, 6})
for i := 1; i < 10; i++ {
root := [32]byte{byte(i)}
cache.Set(primitives.Epoch(i), root, indices)
cpRoot := [32]byte{byte(i - 1)}
cache.SetCheckpoint(forkchoicetypes.Checkpoint{Epoch: primitives.Epoch(i - 1), Root: cpRoot}, root)
}
received, ok := cache.ProposerIndices(1, root)
received, ok := cache.ProposerIndices(1, [32]byte{1})
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.ProposerIndices(4, root)
received, ok = cache.ProposerIndices(4, [32]byte{4})
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.ProposerIndices(9, root)
received, ok = cache.ProposerIndices(9, [32]byte{9})
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 0, Root: cpRoot})
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{})
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 3, Root: cpRoot})
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 3, Root: [32]byte{3}})
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 4, Root: cpRoot})
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 4, Root: [32]byte{4}})
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 8, Root: cpRoot})
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 8, Root: [32]byte{8}})
require.Equal(t, true, ok)
require.Equal(t, indices, received)
cache.Prune(5)
emptyIndices := [fieldparams.SlotsPerEpoch]primitives.ValidatorIndex{}
received, ok = cache.ProposerIndices(1, root)
received, ok = cache.ProposerIndices(1, [32]byte{1})
require.Equal(t, false, ok)
require.Equal(t, emptyIndices, received)
received, ok = cache.ProposerIndices(4, root)
received, ok = cache.ProposerIndices(4, [32]byte{4})
require.Equal(t, false, ok)
require.Equal(t, emptyIndices, received)
received, ok = cache.ProposerIndices(9, root)
received, ok = cache.ProposerIndices(9, [32]byte{9})
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 0, Root: cpRoot})
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 0, Root: [32]byte{0}})
require.Equal(t, false, ok)
require.Equal(t, emptyIndices, received)
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 3, Root: cpRoot})
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 3, Root: [32]byte{3}})
require.Equal(t, false, ok)
require.Equal(t, emptyIndices, received)
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 4, Root: cpRoot})
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 4, Root: [32]byte{4}})
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 8, Root: cpRoot})
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 8, Root: [32]byte{8}})
require.Equal(t, true, ok)
require.Equal(t, indices, received)
}

View File

@@ -379,7 +379,7 @@ func UpdateCachedCheckpointToStateRoot(state state.ReadOnlyBeaconState, cp *fork
if cp.Epoch <= params.BeaconConfig().GenesisEpoch+params.BeaconConfig().MinSeedLookahead {
return nil
}
slot, err := slots.EpochEnd(cp.Epoch - 1)
slot, err := slots.EpochEnd(cp.Epoch)
if err != nil {
return err
}

View File

@@ -38,7 +38,7 @@ import (
// GetAggregateAttestation aggregates all attestations matching the given attestation data root and slot, returning the aggregated result.
func (s *Server) GetAggregateAttestation(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "validator.GetAggregateAttestation")
_, span := trace.StartSpan(r.Context(), "validator.GetAggregateAttestation")
defer span.End()
_, attDataRoot, ok := shared.HexFromQuery(w, r, "attestation_data_root", fieldparams.RootLength, true)
@@ -51,53 +51,67 @@ func (s *Server) GetAggregateAttestation(w http.ResponseWriter, r *http.Request)
return
}
if err := s.AttestationsPool.AggregateUnaggregatedAttestations(ctx); err != nil {
httputil.HandleError(w, "Could not aggregate unaggregated attestations: "+err.Error(), http.StatusBadRequest)
var match *ethpbalpha.Attestation
var err error
match, err = matchingAtt(s.AttestationsPool.AggregatedAttestations(), primitives.Slot(slot), attDataRoot)
if err != nil {
httputil.HandleError(w, "Could not get matching attestation: "+err.Error(), http.StatusInternalServerError)
return
}
allAtts := s.AttestationsPool.AggregatedAttestations()
var bestMatchingAtt *ethpbalpha.Attestation
for _, att := range allAtts {
if att.Data.Slot == primitives.Slot(slot) {
root, err := att.Data.HashTreeRoot()
if err != nil {
httputil.HandleError(w, "Could not get attestation data root: "+err.Error(), http.StatusInternalServerError)
return
}
if bytes.Equal(root[:], attDataRoot) {
if bestMatchingAtt == nil || len(att.AggregationBits) > len(bestMatchingAtt.AggregationBits) {
bestMatchingAtt = att
}
}
if match == nil {
atts, err := s.AttestationsPool.UnaggregatedAttestations()
if err != nil {
httputil.HandleError(w, "Could not get unaggregated attestations: "+err.Error(), http.StatusInternalServerError)
return
}
match, err = matchingAtt(atts, primitives.Slot(slot), attDataRoot)
if err != nil {
httputil.HandleError(w, "Could not get matching attestation: "+err.Error(), http.StatusInternalServerError)
return
}
}
if bestMatchingAtt == nil {
if match == nil {
httputil.HandleError(w, "No matching attestation found", http.StatusNotFound)
return
}
response := &AggregateAttestationResponse{
Data: &shared.Attestation{
AggregationBits: hexutil.Encode(bestMatchingAtt.AggregationBits),
AggregationBits: hexutil.Encode(match.AggregationBits),
Data: &shared.AttestationData{
Slot: strconv.FormatUint(uint64(bestMatchingAtt.Data.Slot), 10),
CommitteeIndex: strconv.FormatUint(uint64(bestMatchingAtt.Data.CommitteeIndex), 10),
BeaconBlockRoot: hexutil.Encode(bestMatchingAtt.Data.BeaconBlockRoot),
Slot: strconv.FormatUint(uint64(match.Data.Slot), 10),
CommitteeIndex: strconv.FormatUint(uint64(match.Data.CommitteeIndex), 10),
BeaconBlockRoot: hexutil.Encode(match.Data.BeaconBlockRoot),
Source: &shared.Checkpoint{
Epoch: strconv.FormatUint(uint64(bestMatchingAtt.Data.Source.Epoch), 10),
Root: hexutil.Encode(bestMatchingAtt.Data.Source.Root),
Epoch: strconv.FormatUint(uint64(match.Data.Source.Epoch), 10),
Root: hexutil.Encode(match.Data.Source.Root),
},
Target: &shared.Checkpoint{
Epoch: strconv.FormatUint(uint64(bestMatchingAtt.Data.Target.Epoch), 10),
Root: hexutil.Encode(bestMatchingAtt.Data.Target.Root),
Epoch: strconv.FormatUint(uint64(match.Data.Target.Epoch), 10),
Root: hexutil.Encode(match.Data.Target.Root),
},
},
Signature: hexutil.Encode(bestMatchingAtt.Signature),
Signature: hexutil.Encode(match.Signature),
}}
httputil.WriteJson(w, response)
}
func matchingAtt(atts []*ethpbalpha.Attestation, slot primitives.Slot, attDataRoot []byte) (*ethpbalpha.Attestation, error) {
for _, att := range atts {
if att.Data.Slot == slot {
root, err := att.Data.HashTreeRoot()
if err != nil {
return nil, errors.Wrap(err, "could not get attestation data root")
}
if bytes.Equal(root[:], attDataRoot) {
return att, nil
}
}
}
return nil, nil
}
// SubmitContributionAndProofs publishes multiple signed sync committee contribution and proofs.
func (s *Server) SubmitContributionAndProofs(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "validator.SubmitContributionAndProofs")

View File

@@ -71,7 +71,7 @@ func TestGetAggregateAttestation(t *testing.T) {
AggregationBits: []byte{0, 1, 1},
Data: &ethpbalpha.AttestationData{
Slot: 2,
CommitteeIndex: 2,
CommitteeIndex: 1,
BeaconBlockRoot: root21,
Source: &ethpbalpha.Checkpoint{
Epoch: 1,
@@ -90,7 +90,7 @@ func TestGetAggregateAttestation(t *testing.T) {
AggregationBits: []byte{0, 1, 1, 1},
Data: &ethpbalpha.AttestationData{
Slot: 2,
CommitteeIndex: 3,
CommitteeIndex: 1,
BeaconBlockRoot: root22,
Source: &ethpbalpha.Checkpoint{
Epoch: 1,
@@ -103,33 +103,56 @@ func TestGetAggregateAttestation(t *testing.T) {
},
Signature: sig22,
}
root33 := bytesutil.PadTo([]byte("root3_3"), 32)
sig33 := bls.NewAggregateSignature().Marshal()
attslot33 := &ethpbalpha.Attestation{
AggregationBits: []byte{1, 0, 0, 1},
root31 := bytesutil.PadTo([]byte("root3_1"), 32)
sig31 := bls.NewAggregateSignature().Marshal()
attslot31 := &ethpbalpha.Attestation{
AggregationBits: []byte{1, 0},
Data: &ethpbalpha.AttestationData{
Slot: 2,
CommitteeIndex: 3,
BeaconBlockRoot: root33,
Slot: 3,
CommitteeIndex: 1,
BeaconBlockRoot: root31,
Source: &ethpbalpha.Checkpoint{
Epoch: 1,
Root: root33,
Root: root31,
},
Target: &ethpbalpha.Checkpoint{
Epoch: 1,
Root: root33,
Root: root31,
},
},
Signature: sig33,
Signature: sig31,
}
root32 := bytesutil.PadTo([]byte("root3_2"), 32)
sig32 := bls.NewAggregateSignature().Marshal()
attslot32 := &ethpbalpha.Attestation{
AggregationBits: []byte{0, 1},
Data: &ethpbalpha.AttestationData{
Slot: 3,
CommitteeIndex: 1,
BeaconBlockRoot: root32,
Source: &ethpbalpha.Checkpoint{
Epoch: 1,
Root: root32,
},
Target: &ethpbalpha.Checkpoint{
Epoch: 1,
Root: root32,
},
},
Signature: sig32,
}
pool := attestations.NewPool()
err := pool.SaveAggregatedAttestations([]*ethpbalpha.Attestation{attSlot1, attslot21, attslot22})
assert.NoError(t, err)
err = pool.SaveUnaggregatedAttestations([]*ethpbalpha.Attestation{attslot31, attslot32})
assert.NoError(t, err)
s := &Server{
AttestationsPool: pool,
}
t.Run("ok", func(t *testing.T) {
t.Run("matching aggregated att", func(t *testing.T) {
reqRoot, err := attslot22.Data.HashTreeRoot()
require.NoError(t, err)
attDataRoot := hexutil.Encode(reqRoot[:])
@@ -147,7 +170,7 @@ func TestGetAggregateAttestation(t *testing.T) {
assert.DeepEqual(t, "0x00010101", resp.Data.AggregationBits)
assert.DeepEqual(t, hexutil.Encode(sig22), resp.Data.Signature)
assert.Equal(t, "2", resp.Data.Data.Slot)
assert.Equal(t, "3", resp.Data.Data.CommitteeIndex)
assert.Equal(t, "1", resp.Data.Data.CommitteeIndex)
assert.DeepEqual(t, hexutil.Encode(root22), resp.Data.Data.BeaconBlockRoot)
require.NotNil(t, resp.Data.Data.Source)
assert.Equal(t, "1", resp.Data.Data.Source.Epoch)
@@ -156,19 +179,11 @@ func TestGetAggregateAttestation(t *testing.T) {
assert.Equal(t, "1", resp.Data.Data.Target.Epoch)
assert.DeepEqual(t, hexutil.Encode(root22), resp.Data.Data.Target.Root)
})
t.Run("aggregate beforehand", func(t *testing.T) {
err = s.AttestationsPool.SaveUnaggregatedAttestation(attslot33)
require.NoError(t, err)
newAtt := ethpbalpha.CopyAttestation(attslot33)
newAtt.AggregationBits = []byte{0, 1, 0, 1}
err = s.AttestationsPool.SaveUnaggregatedAttestation(newAtt)
require.NoError(t, err)
reqRoot, err := attslot33.Data.HashTreeRoot()
t.Run("matching unaggregated att", func(t *testing.T) {
reqRoot, err := attslot32.Data.HashTreeRoot()
require.NoError(t, err)
attDataRoot := hexutil.Encode(reqRoot[:])
url := "http://example.com?attestation_data_root=" + attDataRoot + "&slot=2"
url := "http://example.com?attestation_data_root=" + attDataRoot + "&slot=3"
request := httptest.NewRequest(http.MethodGet, url, nil)
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
@@ -178,7 +193,18 @@ func TestGetAggregateAttestation(t *testing.T) {
resp := &AggregateAttestationResponse{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), resp))
require.NotNil(t, resp)
assert.DeepEqual(t, "0x01010001", resp.Data.AggregationBits)
require.NotNil(t, resp.Data)
assert.DeepEqual(t, "0x0001", resp.Data.AggregationBits)
assert.DeepEqual(t, hexutil.Encode(sig32), resp.Data.Signature)
assert.Equal(t, "3", resp.Data.Data.Slot)
assert.Equal(t, "1", resp.Data.Data.CommitteeIndex)
assert.DeepEqual(t, hexutil.Encode(root32), resp.Data.Data.BeaconBlockRoot)
require.NotNil(t, resp.Data.Data.Source)
assert.Equal(t, "1", resp.Data.Data.Source.Epoch)
assert.DeepEqual(t, hexutil.Encode(root32), resp.Data.Data.Source.Root)
require.NotNil(t, resp.Data.Data.Target)
assert.Equal(t, "1", resp.Data.Data.Target.Epoch)
assert.DeepEqual(t, hexutil.Encode(root32), resp.Data.Data.Target.Root)
})
t.Run("no matching attestation", func(t *testing.T) {
attDataRoot := hexutil.Encode(bytesutil.PadTo([]byte("foo"), 32))

View File

@@ -54,6 +54,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",

View File

@@ -227,6 +227,10 @@ func (s *Service) Start() {
}
s.ms.setClock(clock)
if s.store.isGenesisSync() {
log.Info("Exiting backfill service as the node has been initialized with a genesis state or the backfill status is missing")
return
}
status := s.store.status()
// Exit early if there aren't going to be any batches to backfill.
if primitives.Slot(status.LowSlot) <= s.ms.minimumSlot() {
@@ -293,8 +297,10 @@ func minimumBackfillSlot(current primitives.Slot) primitives.Slot {
oe = slots.MaxSafeEpoch()
}
offset := slots.UnsafeEpochStart(oe)
if offset > current {
return 0
if offset >= current {
// Slot 0 is the genesis block, therefore the signature in it is invalid.
// To prevent us from rejecting a batch, we restrict the minimum backfill batch till only slot 1
return 1
}
return current - offset
}

View File

@@ -5,9 +5,11 @@ import (
"testing"
"time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/proto/dbval"
"github.com/prysmaticlabs/prysm/v4/testing/require"
@@ -75,6 +77,20 @@ func TestServiceInit(t *testing.T) {
}
}
func TestMinimumBackfillSlot(t *testing.T) {
oe := helpers.MinEpochsForBlockRequests()
currSlot := (oe + 100).Mul(uint64(params.BeaconConfig().SlotsPerEpoch))
minSlot := minimumBackfillSlot(primitives.Slot(currSlot))
require.Equal(t, 100*params.BeaconConfig().SlotsPerEpoch, minSlot)
oe = helpers.MinEpochsForBlockRequests()
currSlot = oe.Mul(uint64(params.BeaconConfig().SlotsPerEpoch))
minSlot = minimumBackfillSlot(primitives.Slot(currSlot))
require.Equal(t, primitives.Slot(1), minSlot)
}
func testReadN(t *testing.T, ctx context.Context, c chan batch, n int, into []batch) []batch {
for i := 0; i < n; i++ {
select {

View File

@@ -149,6 +149,12 @@ func (s *Store) swapStatus(bs *dbval.BackfillStatus) {
s.bs = bs
}
func (s *Store) isGenesisSync() bool {
s.RLock()
defer s.RUnlock()
return s.genesisSync
}
// originState looks up the state for the checkpoint sync origin. This is a hack, because StatusUpdater is the only
// thing that needs db access and it has the origin root handy, so it's convenient to look it up here. The state is
// needed by the verifier.

View File

@@ -40,11 +40,13 @@ func (w *p2pWorker) handle(ctx context.Context, b batch) batch {
dlt := time.Now()
backfillBatchTimeDownloading.Observe(float64(dlt.Sub(start).Milliseconds()))
if err != nil {
log.WithError(err).WithFields(b.logFields()).Debug("Batch requesting failed")
return b.withRetryableError(err)
}
vb, err := w.v.verify(results)
backfillBatchTimeVerifying.Observe(float64(time.Since(dlt).Milliseconds()))
if err != nil {
log.WithError(err).WithFields(b.logFields()).Debug("Batch validation failed")
return b.withRetryableError(err)
}
// This is a hack to get the rough size of the batch. This helps us approximate the amount of memory needed

View File

@@ -227,8 +227,8 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms
logFields["validationTime"] = validationTime
log.WithFields(logFields).Debug("Received block")
blockArrivalGossipSummary.Observe(float64(sinceSlotStartTime))
blockVerificationGossipSummary.Observe(float64(validationTime))
blockArrivalGossipSummary.Observe(float64(sinceSlotStartTime.Milliseconds()))
blockVerificationGossipSummary.Observe(float64(validationTime.Milliseconds()))
return pubsub.ValidationAccept, nil
}

View File

@@ -22,6 +22,7 @@ go_library(
"//cmd/beacon-chain/jwt:go_default_library",
"//cmd/beacon-chain/storage:go_default_library",
"//cmd/beacon-chain/sync/backfill:go_default_library",
"//cmd/beacon-chain/sync/backfill/flags:go_default_library",
"//cmd/beacon-chain/sync/checkpoint:go_default_library",
"//cmd/beacon-chain/sync/genesis:go_default_library",
"//config/features:go_default_library",

View File

@@ -171,7 +171,7 @@ var (
BlobBatchLimit = &cli.IntFlag{
Name: "blob-batch-limit",
Usage: "The amount of blobs the local peer is bounded to request and respond to in a batch.",
Value: 8,
Value: 64,
}
// BlobBatchLimitBurstFactor specifies the factor by which blob batch size may increase.
BlobBatchLimitBurstFactor = &cli.IntFlag{

View File

@@ -20,7 +20,8 @@ import (
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
jwtcommands "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/jwt"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/storage"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill"
backfill "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill"
bflags "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill/flags"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/checkpoint"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/genesis"
"github.com/prysmaticlabs/prysm/v4/config/features"
@@ -139,9 +140,9 @@ var appFlags = []cli.Flag{
flags.JwtId,
storage.BlobStoragePathFlag,
storage.BlobRetentionEpochFlag,
backfill.EnableExperimentalBackfill,
backfill.BackfillBatchSize,
backfill.BackfillWorkerCount,
bflags.EnableExperimentalBackfill,
bflags.BackfillBatchSize,
bflags.BackfillWorkerCount,
}
func init() {

View File

@@ -8,6 +8,7 @@ go_library(
deps = [
"//beacon-chain/node:go_default_library",
"//beacon-chain/sync/backfill:go_default_library",
"//cmd/beacon-chain/sync/backfill/flags:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
],
)

View File

@@ -0,0 +1,9 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["flags.go"],
importpath = "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill/flags",
visibility = ["//visibility:public"],
deps = ["@com_github_urfave_cli_v2//:go_default_library"],
)

View File

@@ -0,0 +1,38 @@
package flags
import (
"github.com/urfave/cli/v2"
)
var (
backfillBatchSizeName = "backfill-batch-size"
backfillWorkerCountName = "backfill-worker-count"
// EnableExperimentalBackfill enables backfill for checkpoint synced nodes.
// This flag will be removed onced backfill is enabled by default.
EnableExperimentalBackfill = &cli.BoolFlag{
Name: "enable-experimental-backfill",
Usage: "Backfill is still experimental at this time." +
"It will only be enabled if this flag is specified and the node was started using checkpoint sync.",
}
// BackfillBatchSize allows users to tune block backfill request sizes to maximize network utilization
// at the cost of higher memory.
BackfillBatchSize = &cli.Uint64Flag{
Name: backfillBatchSizeName,
Usage: "Number of blocks per backfill batch. " +
"A larger number will request more blocks at once from peers, but also consume more system memory to " +
"hold batches in memory during processing. This has a multiplicative effect with " + backfillWorkerCountName,
Value: 64,
}
// BackfillWorkerCount allows users to tune the number of concurrent backfill batches to download, to maximize
// network utilization at the cost of higher memory.
BackfillWorkerCount = &cli.IntFlag{
Name: backfillWorkerCountName,
Usage: "Number of concurrent backfill batch requests. " +
"A larger number will better utilize network resources, up to a system-dependent limit, but will also " +
"consume more system memory to hold batches in memory during processing. Multiply by backfill-batch-size and " +
"average block size (~2MB before deneb) to find the right number for your system. " +
"This has a multiplicatice effect with " + backfillBatchSizeName,
Value: 2,
}
)

View File

@@ -3,49 +3,18 @@ package backfill
import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/node"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/backfill"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill/flags"
"github.com/urfave/cli/v2"
)
var (
backfillBatchSizeName = "backfill-batch-size"
backfillWorkerCountName = "backfill-worker-count"
// EnableExperimentalBackfill enables backfill for checkpoint synced nodes.
// This flag will be removed onced backfill is enabled by default.
EnableExperimentalBackfill = &cli.BoolFlag{
Name: "enable-experimental-backfill",
Usage: "Backfill is still experimental at this time." +
"It will only be enabled if this flag is specified and the node was started using checkpoint sync.",
}
// BackfillBatchSize allows users to tune block backfill request sizes to maximize network utilization
// at the cost of higher memory.
BackfillBatchSize = &cli.Uint64Flag{
Name: backfillBatchSizeName,
Usage: "Number of blocks per backfill batch. " +
"A larger number will request more blocks at once from peers, but also consume more system memory to " +
"hold batches in memory during processing. This has a multiplicative effect with " + backfillWorkerCountName,
Value: 64,
}
// BackfillWorkerCount allows users to tune the number of concurrent backfill batches to download, to maximize
// network utilization at the cost of higher memory.
BackfillWorkerCount = &cli.IntFlag{
Name: backfillWorkerCountName,
Usage: "Number of concurrent backfill batch requests. " +
"A larger number will better utilize network resources, up to a system-dependent limit, but will also " +
"consume more system memory to hold batches in memory during processing. Multiply by backfill-batch-size and " +
"average block size (~2MB before deneb) to find the right number for your system. " +
"This has a multiplicatice effect with " + backfillBatchSizeName,
Value: 2,
}
)
// BeaconNodeOptions sets the appropriate functional opts on the *node.BeaconNode value, to decouple options
// from flag parsing.
func BeaconNodeOptions(c *cli.Context) ([]node.Option, error) {
opt := func(node *node.BeaconNode) (err error) {
node.BackfillOpts = []backfill.ServiceOption{
backfill.WithBatchSize(c.Uint64(BackfillBatchSize.Name)),
backfill.WithWorkerCount(c.Int(BackfillWorkerCount.Name)),
backfill.WithEnableBackfill(c.Bool(EnableExperimentalBackfill.Name)),
backfill.WithBatchSize(c.Uint64(flags.BackfillBatchSize.Name)),
backfill.WithWorkerCount(c.Int(flags.BackfillWorkerCount.Name)),
backfill.WithEnableBackfill(c.Bool(flags.EnableExperimentalBackfill.Name)),
}
return nil
}

View File

@@ -8,7 +8,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/cmd"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/storage"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill"
backfill "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill/flags"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/checkpoint"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/genesis"
"github.com/prysmaticlabs/prysm/v4/config/features"

View File

@@ -12,6 +12,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//cmd:go_default_library",
"//cmd/beacon-chain/sync/backfill/flags:go_default_library",
"//config/params:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",

View File

@@ -3,6 +3,7 @@ package features
import (
"time"
backfill "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill/flags"
"github.com/urfave/cli/v2"
)
@@ -161,6 +162,7 @@ var devModeFlags = []cli.Flag{
enableVerboseSigVerification,
EnableEIP4881,
enableExperimentalState,
backfill.EnableExperimentalBackfill,
}
// ValidatorFlags contains a list of all the feature flags that apply to the validator client.

View File

@@ -283,7 +283,7 @@ func (node *BeaconNode) Start(ctx context.Context) error {
// on our features or the beacon index is a multiplier of 2 (idea is to split nodes
// equally down the line with one group having feature flags and the other without
// feature flags; this is to allow A-B testing on new features)
if !config.TestFeature || index%2 == 0 {
if !config.TestFeature || index != 1 {
args = append(args, features.E2EBeaconChainFlags...)
}
if config.UseBuilder {