Compare commits

...

43 Commits

Author SHA1 Message Date
prestonvanloon
2838683245 Revert "Revert "revert costing change""
This reverts commit 64db5ad361.

I'm done looking at this.
2022-09-02 13:05:20 -05:00
Preston Van Loon
476d188fcc Update beacon-chain/blockchain/error.go
Co-authored-by: Potuz <potuz@prysmaticlabs.com>
2022-09-02 13:04:14 -05:00
prestonvanloon
64db5ad361 Revert "revert costing change"
This reverts commit d0c7cedbba.
2022-09-02 13:03:52 -05:00
prestonvanloon
d0c7cedbba revert costing change 2022-09-02 13:02:45 -05:00
prestonvanloon
b20937e97a go mod tidy 2022-09-02 09:44:45 -05:00
prestonvanloon
52a581362c restore godoc comment 2022-09-02 09:25:19 -05:00
prestonvanloon
537142f79e ensure db doesn't have junk root 2022-09-02 09:23:51 -05:00
prestonvanloon
abb32e5f27 Skip saving bad state summaries, return multiple errors 2022-09-02 09:22:32 -05:00
prestonvanloon
08c3d149bf Tests for pending blocks cache insertion 2022-09-02 09:01:55 -05:00
prestonvanloon
0d5b7d093a Add test requiring a block to exist 2022-09-02 08:53:34 -05:00
prestonvanloon
80f30a0d5b more PR feedback 2022-09-02 08:38:49 -05:00
prestonvanloon
9580f64fea Minimal changes. Terence feedback 2022-09-02 08:34:00 -05:00
nisdas
b3c3f7f40e hack 2022-09-02 17:39:22 +08:00
nisdas
49f2e12030 fix tests 2022-09-02 17:19:56 +08:00
prestonvanloon
9aa4407daf A few more passing test cases 2022-09-01 22:44:24 -05:00
prestonvanloon
05b3ef1fcd Fix mosts tests 2022-09-01 22:18:57 -05:00
Preston Van Loon
8d086e433d Merge branch 'develop' into ss-missing-blocks 2022-09-01 15:53:35 -05:00
terencechain
8627fe72e8 Remove activation/exit queue metrics (#11389)
* Remove activation/exit queue metrics

* Gaz

* Rm unused vars

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
2022-09-01 19:11:25 +00:00
Raul Jordan
65bf3d0fa8 Fix Div By 0 in Small Helper (#11390) 2022-09-01 18:26:28 +00:00
Raul Jordan
a5da9aedd4 Add in P2P Metrics for Mainnet (#11386)
* connected peers gauge vec

* build

* add in gossip metric

* clean
2022-09-01 18:00:54 +00:00
Preston Van Loon
3833f2770e Merge branch 'develop' into ss-missing-blocks 2022-09-01 12:53:33 -05:00
prestonvanloon
e6c04e5dad fix interfacechecker 2022-09-01 12:22:07 -05:00
prestonvanloon
5e05398daf Lots of changes... 2022-09-01 11:51:17 -05:00
Potuz
e1ab034d25 Defensive pulls protoarray (#11385)
* Defensive pull tips protoarray

* unit test

* gaz

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
2022-09-01 15:05:44 +00:00
Potuz
84bc8f3d64 Fix fillInMissingBlocks (#11353)
* Fix fillInMissingBlocks

Only check that the chain's parent is in forkchoice, rather than it
being the finalized checkpoint. Forkchoice anyway guarantees that the
chain will be a descendant of the finalized checkpoint.

* ensure root is not zero

* fix tests

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
2022-09-01 14:40:32 +00:00
prestonvanloon
299fa3494a Enforce block exists in db before saving state summary 2022-09-01 09:35:18 -05:00
Radosław Kapka
c4deb84012 Simplify BeaconBlockIsNil() (#11373)
* Simplify `BeaconBlockIsNil()`

* remove unused code

Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
2022-09-01 03:40:20 +00:00
kasey
488e19e428 less ominous --weak-subjectivity-checkpoint warning (#11362)
* fix #11361

* change log level to debug

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
2022-09-01 01:56:56 +00:00
Raul Jordan
bcaae1c440 Performance Metrics for Prysm (#11377)
* atts performance and blocks

* idiomatic observe

* all attestation related errors

* block metrics

* db metrics

* metrics func

* rem old metrics

* naming

* rem metric

* rem unneeded

* fix

* fix up

* rev

* fix

* rem
2022-09-01 01:26:19 +00:00
terencechain
587ba83aca Better batch block processing warning (#11372)
* Better batch block warning

* Fix test

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
2022-09-01 00:57:55 +00:00
terencechain
091f16b26c Don't hard shutdown if mev-boost / relay is not available (#11380)
* Don't hard shtudown if mev-boost / relay is not available

* Add else
2022-08-31 23:58:27 +00:00
Potuz
fb9626fdd7 Feature flag to disregard deposit contract (#11370)
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
2022-08-31 22:35:59 +00:00
terencechain
c638e114db Add new metrics (#11374)
* Better batch block warning

* New metrics

* Revert "Better batch block warning"

This reverts commit e21fcfcebe.

* More metrics

* Add activation and exit queues

* Gaz
2022-08-31 18:05:50 -04:00
terencechain
b1e08307ed Fix time to duty to round slot number (#11371)
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
2022-08-31 19:54:44 +00:00
kasey
cac5d0f234 giving commands more clear names per issue #11287 (#11360)
* giving commands more clear names per issue #11287

* mark the top-level help text for cpt deprecated

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
2022-08-31 13:18:35 -05:00
james-prysm
52d48b328f Improve Validator Index RPC Error Handling (#11363)
* adding in nil check for head

* adding changes based on feedback

* Update beacon-chain/rpc/prysm/v1alpha1/validator/server_test.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2022-08-31 16:42:49 +00:00
Nishant Das
9729b2ec77 Remove The Header Time Check (#11329)
* remove the check

* remove function and tests

* dead code

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
2022-08-31 14:54:29 +00:00
terencechain
7aa3776aa6 Log tx count only on payload (#11368) 2022-08-31 14:38:44 +02:00
Potuz
760c71ef77 Only update finalized checkpoint in DB if it's newer (#11356)
* Only update finalized checkpoint in DB if it's newer

Do not save to DB a finalized checkpoint that it's older than the
current one.

* Add a test

* Add more strict condition check

* Revert params.SetupTestConfigCleanupWithLock(t)

* Remove new line

Co-authored-by: terence tsao <terence@prysmaticlabs.com>
2022-08-31 00:16:22 +00:00
james-prysm
6c209db3ca fixing json unmarshalling (#11357)
* fixing json unmarshalling

* adding unit test for no conent

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
2022-08-30 19:31:23 +00:00
Raul Jordan
0725905797 Informative Errors on Execution Client Connection Issues (#11359)
* add err auth help

* error working

* add err auth fix
2022-08-30 19:09:42 +00:00
terencechain
166f8a1eb6 Log corerct header value (#11354)
* Log corerct header value

* gaz

* Go fmt

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
2022-08-30 16:23:21 +00:00
Radosław Kapka
85896e994e Explain the purpose of deprecatedBeaconFlags (#11355) 2022-08-30 15:47:36 +00:00
89 changed files with 1086 additions and 290 deletions

View File

@@ -274,9 +274,6 @@ func non200Err(response *http.Response) error {
if err != nil {
body = "(Unable to read response body.)"
} else {
if jsonErr := json.Unmarshal(bodyBytes, &errMessage); jsonErr != nil {
return errors.Wrap(jsonErr, "unable to read response body")
}
body = "response body:\n" + string(bodyBytes)
}
msg := fmt.Sprintf("code=%d, url=%s, body=%s", response.StatusCode, response.Request.URL, body)
@@ -285,13 +282,25 @@ func non200Err(response *http.Response) error {
log.WithError(ErrNoContent).Debug(msg)
return ErrNoContent
case 400:
if jsonErr := json.Unmarshal(bodyBytes, &errMessage); jsonErr != nil {
return errors.Wrap(jsonErr, "unable to read response body")
}
log.WithError(ErrBadRequest).Debug(msg)
return errors.Wrap(ErrBadRequest, errMessage.Message)
case 404:
if jsonErr := json.Unmarshal(bodyBytes, &errMessage); jsonErr != nil {
return errors.Wrap(jsonErr, "unable to read response body")
}
log.WithError(ErrNotFound).Debug(msg)
return errors.Wrap(ErrNotFound, errMessage.Message)
default:
case 500:
if jsonErr := json.Unmarshal(bodyBytes, &errMessage); jsonErr != nil {
return errors.Wrap(jsonErr, "unable to read response body")
}
log.WithError(ErrNotOK).Debug(msg)
return errors.Wrap(ErrNotOK, errMessage.Message)
default:
log.WithError(ErrNotOK).Debug(msg)
return errors.Wrap(ErrNotOK, fmt.Sprintf("unsupported error code: %d", response.StatusCode))
}
}

View File

@@ -144,6 +144,23 @@ func TestClient_GetHeader(t *testing.T) {
_, err := c.GetHeader(ctx, slot, bytesutil.ToBytes32(parentHash), bytesutil.ToBytes48(pubkey))
require.ErrorIs(t, err, ErrNotOK)
hc = &http.Client{
Transport: roundtrip(func(r *http.Request) (*http.Response, error) {
require.Equal(t, expectedPath, r.URL.Path)
return &http.Response{
StatusCode: http.StatusNoContent,
Body: io.NopCloser(bytes.NewBuffer([]byte("No header is available."))),
Request: r.Clone(ctx),
}, nil
}),
}
c = &Client{
hc: hc,
baseURL: &url.URL{Host: "localhost:3500", Scheme: "http"},
}
_, err = c.GetHeader(ctx, slot, bytesutil.ToBytes32(parentHash), bytesutil.ToBytes48(pubkey))
require.ErrorIs(t, err, ErrNoContent)
hc = &http.Client{
Transport: roundtrip(func(r *http.Request) (*http.Response, error) {
require.Equal(t, expectedPath, r.URL.Path)

View File

@@ -530,13 +530,13 @@ func TestService_IsOptimisticForRoot_DB_ProtoArray(t *testing.T) {
optimisticBlock := util.NewBeaconBlock()
optimisticBlock.Block.Slot = 97
optimisticRoot, err := optimisticBlock.Block.HashTreeRoot()
optimisticRoot, err := util.SaveBlock(t, ctx, beaconDB, optimisticBlock).Block().HashTreeRoot()
require.NoError(t, err)
util.SaveBlock(t, context.Background(), beaconDB, optimisticBlock)
validatedBlock := util.NewBeaconBlock()
validatedBlock.Block.Slot = 9
validatedRoot, err := validatedBlock.Block.HashTreeRoot()
validatedRoot, err := util.SaveBlock(t, ctx, beaconDB, validatedBlock).Block().HashTreeRoot()
require.NoError(t, err)
util.SaveBlock(t, context.Background(), beaconDB, validatedBlock)
@@ -563,10 +563,11 @@ func TestService_IsOptimisticForRoot_DB_ProtoArray(t *testing.T) {
require.NoError(t, err)
require.Equal(t, false, validated)
// Before the first finalized epoch, finalized root could be zeros.
validatedCheckpoint = &ethpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
r, err := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
validatedCheckpoint = &ethpb.Checkpoint{Root: r[:]}
require.NoError(t, beaconDB.SaveGenesisBlockRoot(ctx, br))
require.NoError(t, beaconDB.SaveStateSummary(context.Background(), &ethpb.StateSummary{Root: params.BeaconConfig().ZeroHash[:], Slot: 10}))
require.NoError(t, beaconDB.SaveStateSummary(context.Background(), &ethpb.StateSummary{Root: r[:], Slot: 10}))
require.NoError(t, beaconDB.SaveLastValidatedCheckpoint(ctx, validatedCheckpoint))
require.NoError(t, beaconDB.SaveStateSummary(context.Background(), &ethpb.StateSummary{Root: optimisticRoot[:], Slot: 11}))
@@ -621,10 +622,11 @@ func TestService_IsOptimisticForRoot_DB_DoublyLinkedTree(t *testing.T) {
require.NoError(t, err)
require.Equal(t, false, validated)
// Before the first finalized epoch, finalized root could be zeros.
validatedCheckpoint = &ethpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
r, err := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
validatedCheckpoint = &ethpb.Checkpoint{Root: r[:]}
require.NoError(t, beaconDB.SaveGenesisBlockRoot(ctx, br))
require.NoError(t, beaconDB.SaveStateSummary(context.Background(), &ethpb.StateSummary{Root: params.BeaconConfig().ZeroHash[:], Slot: 10}))
require.NoError(t, beaconDB.SaveStateSummary(context.Background(), &ethpb.StateSummary{Root: r[:], Slot: 10}))
require.NoError(t, beaconDB.SaveLastValidatedCheckpoint(ctx, validatedCheckpoint))
require.NoError(t, beaconDB.SaveStateSummary(context.Background(), &ethpb.StateSummary{Root: optimisticRoot[:], Slot: 11}))

View File

@@ -23,6 +23,8 @@ var (
errNotOptimisticCandidate = errors.New("block is not suitable for optimistic sync")
// errBlockNotFoundInCacheOrDB is returned when a block is not found in the cache or DB.
errBlockNotFoundInCacheOrDB = errors.New("block not found in cache or db")
// errNilBlockInCache is returned when a nil block is returned from the cache.
ErrNilBlockInCache = errors.New("nil block returned from the cache")
// errNilStateFromStategen is returned when a nil state is returned from the state generator.
errNilStateFromStategen = errors.New("justified state can't be nil")
// errWSBlockNotFound is returned when a block is not found in the WS cache or DB.

View File

@@ -89,7 +89,7 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *notifyForkcho
}
return payloadID, nil
case execution.ErrInvalidPayloadStatus:
newPayloadInvalidNodeCount.Inc()
forkchoiceUpdatedInvalidNodeCount.Inc()
headRoot := arg.headRoot
if len(lastValidHash) == 0 {
lastValidHash = defaultLatestValidHash

View File

@@ -1083,14 +1083,14 @@ func Test_UpdateLastValidatedCheckpoint(t *testing.T) {
require.NoError(t, fcs.InsertNode(ctx, state, blkRoot))
fcs.SetOriginRoot(genesisRoot)
genesisSummary := &ethpb.StateSummary{
Root: genesisStateRoot[:],
Root: genesisRoot[:],
Slot: 0,
}
require.NoError(t, beaconDB.SaveStateSummary(ctx, genesisSummary))
// Get last validated checkpoint
origCheckpoint, err := service.cfg.BeaconDB.LastValidatedCheckpoint(ctx)
require.NoError(t, err)
// Set last validated checkpoint to a junk root to prevent issues
// with saving it due to the state summary.
origCheckpoint := &ethpb.Checkpoint{Root: genesisRoot[:], Epoch: 0}
require.NoError(t, beaconDB.SaveLastValidatedCheckpoint(ctx, origCheckpoint))
// Optimistic finalized checkpoint
@@ -1155,6 +1155,18 @@ func Test_UpdateLastValidatedCheckpoint(t *testing.T) {
require.Equal(t, false, optimistic)
require.DeepEqual(t, validCheckpoint.Root, cp.Root)
require.Equal(t, validCheckpoint.Epoch, cp.Epoch)
// Checkpoint with a lower epoch
oldCp, err := service.cfg.BeaconDB.FinalizedCheckpoint(ctx)
require.NoError(t, err)
invalidCp := &ethpb.Checkpoint{
Epoch: oldCp.Epoch - 1,
}
// Nothing should happen as we no-op on an invalid checkpoint.
require.NoError(t, service.updateFinalized(ctx, invalidCp))
got, err := service.cfg.BeaconDB.FinalizedCheckpoint(ctx)
require.NoError(t, err)
require.DeepEqual(t, oldCp, got)
}
func TestService_removeInvalidBlockAndState(t *testing.T) {

View File

@@ -15,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/config/features"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
consensusblocks "github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
ethpbv1 "github.com/prysmaticlabs/prysm/v3/proto/eth/v1"
@@ -160,10 +161,16 @@ func TestCacheJustifiedStateBalances_CanCache(t *testing.T) {
ctx := context.Background()
state, _ := util.DeterministicGenesisState(t, 100)
r := [32]byte{'a'}
require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(context.Background(), &ethpb.StateSummary{Root: r[:]}))
require.NoError(t, service.cfg.BeaconDB.SaveState(context.Background(), state, r))
balances, err := service.justifiedBalances.get(ctx, r)
newBlock := util.NewBeaconBlock()
newBlock.Block.Slot = 20
rt, err := newBlock.Block.HashTreeRoot()
assert.NoError(t, err)
wrappedBlk, err := consensusblocks.NewSignedBeaconBlock(newBlock)
assert.NoError(t, err)
assert.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wrappedBlk))
require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(context.Background(), &ethpb.StateSummary{Root: rt[:]}))
require.NoError(t, service.cfg.BeaconDB.SaveState(context.Background(), state, rt))
balances, err := service.justifiedBalances.get(ctx, rt)
require.NoError(t, err)
require.DeepEqual(t, balances, state.Balances(), "Incorrect justified balances")
}

View File

@@ -75,6 +75,17 @@ func (s *Service) getInitSyncBlocks() []interfaces.SignedBeaconBlock {
return blks
}
// getInitSyncBlockFromCache returns a block from the initial sync blocks cache. This method
func (s *Service) getInitSyncBlockFromCache(r [32]byte) (interfaces.SignedBeaconBlock, error) {
s.initSyncBlocksLock.RLock()
defer s.initSyncBlocksLock.RUnlock()
b := s.initSyncBlocks[r]
if b.IsNil() {
return nil, ErrNilBlockInCache
}
return b, nil
}
// This clears out the initial sync blocks cache.
func (s *Service) clearInitSyncBlocks() {
s.initSyncBlocksLock.Lock()

View File

@@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v3/config/params"
consensusBlocks "github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
@@ -51,10 +52,15 @@ func logStateTransitionData(b interfaces.BeaconBlock) error {
}
log = log.WithField("payloadHash", fmt.Sprintf("%#x", bytesutil.Trunc(p.BlockHash())))
txs, err := p.Transactions()
if err != nil {
switch {
case errors.Is(err, consensusBlocks.ErrUnsupportedGetter):
case err != nil:
return err
default:
log = log.WithField("txCount", len(txs))
txsPerSlotCount.Set(float64(len(txs)))
}
log = log.WithField("txCount", len(txs))
}
log.Info("Finished applying state transition")
return nil

View File

@@ -158,10 +158,47 @@ var (
Name: "forkchoice_updated_optimistic_node_count",
Help: "Count the number of optimistic nodes after forkchoiceUpdated EE call",
})
forkchoiceUpdatedInvalidNodeCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "forkchoice_updated_invalid_node_count",
Help: "Count the number of invalid nodes after forkchoiceUpdated EE call",
})
txsPerSlotCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "txs_per_slot_count",
Help: "Count the number of txs per slot",
})
missedPayloadIDFilledCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "missed_payload_id_filled_count",
Help: "",
})
onBlockProcessingTime = promauto.NewSummary(prometheus.SummaryOpts{
Name: "on_block_processing_milliseconds",
Help: "Total time in milliseconds to complete a call to onBlock()",
})
stateTransitionProcessingTime = promauto.NewSummary(prometheus.SummaryOpts{
Name: "state_transition_processing_milliseconds",
Help: "Total time to call a state transition in onBlock()",
})
processAttsElapsedTime = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "process_attestations_milliseconds",
Help: "Captures latency for process attestations (forkchoice) in milliseconds",
Buckets: []float64{1, 5, 20, 100, 500, 1000},
},
)
newAttHeadElapsedTime = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "new_att_head_milliseconds",
Help: "Captures latency for new attestation head in milliseconds",
Buckets: []float64{1, 5, 20, 100, 500, 1000},
},
)
newBlockHeadElapsedTime = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "new_block_head_milliseconds",
Help: "Captures latency for new block head in milliseconds",
Buckets: []float64{1, 5, 20, 100, 500, 1000},
},
)
)
// reportSlotMetrics reports slot related metrics.

View File

@@ -307,7 +307,11 @@ func TestStore_SaveCheckpointState(t *testing.T) {
s, err := util.NewBeaconState()
require.NoError(t, err)
err = s.SetFinalizedCheckpoint(&ethpb.Checkpoint{Root: bytesutil.PadTo([]byte{'A'}, fieldparams.RootLength)})
b0 := util.NewBeaconBlock()
b0.Block.Slot = 0
r0, err := util.SaveBlock(t, ctx, beaconDB, b0).Block().HashTreeRoot()
require.NoError(t, err)
err = s.SetFinalizedCheckpoint(&ethpb.Checkpoint{Root: r0[:]})
require.NoError(t, err)
val := &ethpb.Validator{
PublicKey: bytesutil.PadTo([]byte("foo"), 48),
@@ -317,20 +321,27 @@ func TestStore_SaveCheckpointState(t *testing.T) {
require.NoError(t, err)
err = s.SetBalances([]uint64{0})
require.NoError(t, err)
r := [32]byte{'g'}
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, s, r))
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, s, r0))
cp1 := &ethpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte{'A'}, fieldparams.RootLength)}
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, s, bytesutil.ToBytes32([]byte{'A'})))
require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: bytesutil.PadTo([]byte{'A'}, fieldparams.RootLength)}))
b1 := util.NewBeaconBlock()
b1.Block.Slot = 1
r1, err := util.SaveBlock(t, ctx, beaconDB, b1).Block().HashTreeRoot()
require.NoError(t, err)
cp1 := &ethpb.Checkpoint{Epoch: 1, Root: r1[:]}
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, s, r1))
require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: r1[:]}))
s1, err := service.getAttPreState(ctx, cp1)
require.NoError(t, err)
assert.Equal(t, 1*params.BeaconConfig().SlotsPerEpoch, s1.Slot(), "Unexpected state slot")
cp2 := &ethpb.Checkpoint{Epoch: 2, Root: bytesutil.PadTo([]byte{'B'}, fieldparams.RootLength)}
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, s, bytesutil.ToBytes32([]byte{'B'})))
require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: bytesutil.PadTo([]byte{'B'}, fieldparams.RootLength)}))
b2 := util.NewBeaconBlock()
b2.Block.Slot = 2
r2, err := util.SaveBlock(t, ctx, beaconDB, b2).Block().HashTreeRoot()
require.NoError(t, err)
cp2 := &ethpb.Checkpoint{Epoch: 2, Root: r2[:]}
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, s, r2))
require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: r2[:]}))
s2, err := service.getAttPreState(ctx, cp2)
require.NoError(t, err)
assert.Equal(t, 2*params.BeaconConfig().SlotsPerEpoch, s2.Slot(), "Unexpected state slot")
@@ -348,9 +359,13 @@ func TestStore_SaveCheckpointState(t *testing.T) {
assert.Equal(t, 2*params.BeaconConfig().SlotsPerEpoch, s2.Slot(), "Unexpected state slot")
require.NoError(t, s.SetSlot(params.BeaconConfig().SlotsPerEpoch+1))
cp3 := &ethpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte{'C'}, fieldparams.RootLength)}
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, s, bytesutil.ToBytes32([]byte{'C'})))
require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: bytesutil.PadTo([]byte{'C'}, fieldparams.RootLength)}))
b3 := util.NewBeaconBlock()
b3.Block.Slot = 3
r3, err := util.SaveBlock(t, ctx, beaconDB, b3).Block().HashTreeRoot()
require.NoError(t, err)
cp3 := &ethpb.Checkpoint{Epoch: 1, Root: r3[:]}
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, s, r3))
require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: r3[:]}))
s3, err := service.getAttPreState(ctx, cp3)
require.NoError(t, err)
assert.Equal(t, s.Slot(), s3.Slot(), "Unexpected state slot")

View File

@@ -98,6 +98,7 @@ func (s *Service) onBlock(ctx context.Context, signed interfaces.SignedBeaconBlo
if err := consensusblocks.BeaconBlockIsNil(signed); err != nil {
return invalidBlock{error: err}
}
startTime := time.Now()
b := signed.Block()
preState, err := s.getBlockPreState(ctx, b)
@@ -115,10 +116,13 @@ func (s *Service) onBlock(ctx context.Context, signed interfaces.SignedBeaconBlo
if err != nil {
return err
}
stateTransitionStartTime := time.Now()
postState, err := transition.ExecuteStateTransition(ctx, preState, signed)
if err != nil {
return invalidBlock{error: err}
}
stateTransitionProcessingTime.Observe(float64(time.Since(stateTransitionStartTime).Milliseconds()))
postStateVersion, postStateHeader, err := getStateVersionAndPayload(postState)
if err != nil {
return err
@@ -182,10 +186,14 @@ func (s *Service) onBlock(ctx context.Context, signed interfaces.SignedBeaconBlo
msg := fmt.Sprintf("could not read balances for state w/ justified checkpoint %#x", justified.Root)
return errors.Wrap(err, msg)
}
start := time.Now()
headRoot, err := s.cfg.ForkChoiceStore.Head(ctx, balances)
if err != nil {
log.WithError(err).Warn("Could not update head")
}
newBlockHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
if err := s.notifyEngineIfChangedHead(ctx, headRoot); err != nil {
return err
}
@@ -262,7 +270,11 @@ func (s *Service) onBlock(ctx context.Context, signed interfaces.SignedBeaconBlo
}
defer reportAttestationInclusion(b)
return s.handleEpochBoundary(ctx, postState)
if err := s.handleEpochBoundary(ctx, postState); err != nil {
return err
}
onBlockProcessingTime.Observe(float64(time.Since(startTime).Milliseconds()))
return nil
}
func getStateVersionAndPayload(st state.BeaconState) (int, *enginev1.ExecutionPayloadHeader, error) {
@@ -396,13 +408,15 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.SignedBeac
tracing.AnnotateError(span, err)
return err
}
if err := s.cfg.BeaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{
if err := s.cfg.BeaconDB.SaveStateSummariesWithPendingBlocks(ctx, []*ethpb.StateSummary{{
Slot: b.Block().Slot(),
Root: blockRoots[i][:],
}); err != nil {
}}, s.getInitSyncBlockFromCache); err != nil {
tracing.AnnotateError(span, err)
return err
}
if i > 0 && jCheckpoints[i].Epoch > jCheckpoints[i-1].Epoch {
if err := s.cfg.BeaconDB.SaveJustifiedCheckpoint(ctx, jCheckpoints[i]); err != nil {
tracing.AnnotateError(span, err)
@@ -433,6 +447,17 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.SignedBeac
}
for r, st := range boundaries {
// Ensure the cached block is saved to db before saving state boundary.
if !s.cfg.BeaconDB.HasBlock(ctx, r) {
b, err := s.getInitSyncBlockFromCache(r)
if err != nil || b.IsNil() {
log.WithField("block root", fmt.Sprintf("%#x", bytesutil.Trunc(r[:]))).Warn("Could not find block for boundary state root in cache")
} else {
if err := s.cfg.BeaconDB.SaveBlock(ctx, b); err != nil {
return err
}
}
}
if err := s.cfg.StateGen.SaveState(ctx, r, st); err != nil {
return err
}

View File

@@ -135,11 +135,21 @@ func (s *Service) verifyBlkFinalizedSlot(b interfaces.BeaconBlock) error {
}
// updateFinalized saves the init sync blocks, finalized checkpoint, migrates
// to cold old states and saves the last validated checkpoint to DB
// to cold old states and saves the last validated checkpoint to DB. It returns
// early if the new checkpoint is older than the one on db.
func (s *Service) updateFinalized(ctx context.Context, cp *ethpb.Checkpoint) error {
ctx, span := trace.StartSpan(ctx, "blockChain.updateFinalized")
defer span.End()
// return early if new checkpoint is not newer than the one in DB
currentFinalized, err := s.cfg.BeaconDB.FinalizedCheckpoint(ctx)
if err != nil {
return err
}
if cp.Epoch <= currentFinalized.Epoch {
return nil
}
// Blocks need to be saved so that we can retrieve finalized block from
// DB when migrating states.
if err := s.cfg.BeaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil {
@@ -267,7 +277,7 @@ func (s *Service) fillInForkChoiceMissingBlocks(ctx context.Context, blk interfa
if len(pendingNodes) == 1 {
return nil
}
if root != s.ensureRootNotZeros(finalized.Root) {
if root != s.ensureRootNotZeros(finalized.Root) && !s.ForkChoicer().HasNode(root) {
return errNotDescendantOfFinalized
}
return s.cfg.ForkChoiceStore.InsertOptimisticChain(ctx, pendingNodes)

View File

@@ -391,6 +391,7 @@ func TestStore_OnBlockBatch_NotifyNewPayload(t *testing.T) {
}
err = service.onBlockBatch(ctx, blks, blkRoots)
require.NoError(t, err)
require.NoError(t, service.Stop())
}
func TestCachedPreState_CanGetFromStateSummary_ProtoArray(t *testing.T) {
@@ -1350,7 +1351,14 @@ func TestInsertFinalizedDeposits(t *testing.T) {
gs = gs.Copy()
assert.NoError(t, gs.SetEth1Data(&ethpb.Eth1Data{DepositCount: 10}))
assert.NoError(t, gs.SetEth1DepositIndex(8))
assert.NoError(t, service.cfg.StateGen.SaveState(ctx, [32]byte{'m', 'o', 'c', 'k'}, gs))
newBlock := util.NewBeaconBlock()
newBlock.Block.Slot = 20
rt, err := newBlock.Block.HashTreeRoot()
assert.NoError(t, err)
wrappedBlk, err := consensusblocks.NewSignedBeaconBlock(newBlock)
assert.NoError(t, err)
assert.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wrappedBlk))
assert.NoError(t, service.cfg.StateGen.SaveState(ctx, rt, gs))
zeroSig := [96]byte{}
for i := uint64(0); i < uint64(4*params.BeaconConfig().SlotsPerEpoch); i++ {
root := []byte(strconv.Itoa(int(i)))
@@ -1361,7 +1369,7 @@ func TestInsertFinalizedDeposits(t *testing.T) {
Signature: zeroSig[:],
}, Proof: [][]byte{root}}, 100+i, int64(i), bytesutil.ToBytes32(root)))
}
assert.NoError(t, service.insertFinalizedDeposits(ctx, [32]byte{'m', 'o', 'c', 'k'}))
assert.NoError(t, service.insertFinalizedDeposits(ctx, rt))
fDeposits := depositCache.FinalizedDeposits(ctx)
assert.Equal(t, 7, int(fDeposits.MerkleTrieIndex), "Finalized deposits not inserted correctly")
deps := depositCache.AllDeposits(ctx, big.NewInt(107))
@@ -1384,11 +1392,27 @@ func TestInsertFinalizedDeposits_MultipleFinalizedRoutines(t *testing.T) {
gs = gs.Copy()
assert.NoError(t, gs.SetEth1Data(&ethpb.Eth1Data{DepositCount: 7}))
assert.NoError(t, gs.SetEth1DepositIndex(6))
assert.NoError(t, service.cfg.StateGen.SaveState(ctx, [32]byte{'m', 'o', 'c', 'k'}, gs))
newBlock := util.NewBeaconBlock()
newBlock.Block.Slot = 20
rt, err := newBlock.Block.HashTreeRoot()
assert.NoError(t, err)
wrappedBlk, err := consensusblocks.NewSignedBeaconBlock(newBlock)
assert.NoError(t, err)
assert.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wrappedBlk))
assert.NoError(t, service.cfg.StateGen.SaveState(ctx, rt, gs))
gs2 := gs.Copy()
assert.NoError(t, gs2.SetEth1Data(&ethpb.Eth1Data{DepositCount: 15}))
assert.NoError(t, gs2.SetEth1DepositIndex(13))
assert.NoError(t, service.cfg.StateGen.SaveState(ctx, [32]byte{'m', 'o', 'c', 'k', '2'}, gs2))
newBlock2 := util.NewBeaconBlock()
newBlock2.Block.Slot = 30
rt2, err := newBlock2.Block.HashTreeRoot()
assert.NoError(t, err)
wrappedBlk2, err := consensusblocks.NewSignedBeaconBlock(newBlock2)
assert.NoError(t, err)
assert.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wrappedBlk2))
assert.NoError(t, service.cfg.StateGen.SaveState(ctx, rt2, gs2))
zeroSig := [96]byte{}
for i := uint64(0); i < uint64(4*params.BeaconConfig().SlotsPerEpoch); i++ {
root := []byte(strconv.Itoa(int(i)))
@@ -1402,7 +1426,7 @@ func TestInsertFinalizedDeposits_MultipleFinalizedRoutines(t *testing.T) {
// Insert 3 deposits before hand.
depositCache.InsertFinalizedDeposits(ctx, 2)
assert.NoError(t, service.insertFinalizedDeposits(ctx, [32]byte{'m', 'o', 'c', 'k'}))
assert.NoError(t, service.insertFinalizedDeposits(ctx, rt))
fDeposits := depositCache.FinalizedDeposits(ctx)
assert.Equal(t, 5, int(fDeposits.MerkleTrieIndex), "Finalized deposits not inserted correctly")
@@ -1412,7 +1436,7 @@ func TestInsertFinalizedDeposits_MultipleFinalizedRoutines(t *testing.T) {
}
// Insert New Finalized State with higher deposit count.
assert.NoError(t, service.insertFinalizedDeposits(ctx, [32]byte{'m', 'o', 'c', 'k', '2'}))
assert.NoError(t, service.insertFinalizedDeposits(ctx, rt2))
fDeposits = depositCache.FinalizedDeposits(ctx)
assert.Equal(t, 12, int(fDeposits.MerkleTrieIndex), "Finalized deposits not inserted correctly")
deps = depositCache.AllDeposits(ctx, big.NewInt(112))
@@ -1425,12 +1449,12 @@ func TestRemoveBlockAttestationsInPool_Canonical(t *testing.T) {
genesis, keys := util.DeterministicGenesisState(t, 64)
b, err := util.GenerateFullBlock(genesis, keys, util.DefaultBlockGenConfig(), 1)
assert.NoError(t, err)
r, err := b.Block.HashTreeRoot()
require.NoError(t, err)
ctx := context.Background()
beaconDB := testDB.SetupDB(t)
service := setupBeaconChain(t, beaconDB)
r, err := util.SaveBlock(t, ctx, beaconDB, b).Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: r[:]}))
require.NoError(t, service.cfg.BeaconDB.SaveGenesisBlockRoot(ctx, r))

View File

@@ -147,17 +147,22 @@ func (s *Service) UpdateHead(ctx context.Context) error {
s.processAttestationsLock.Lock()
defer s.processAttestationsLock.Unlock()
start := time.Now()
s.processAttestations(ctx)
processAttsElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
justified := s.ForkChoicer().JustifiedCheckpoint()
balances, err := s.justifiedBalances.get(ctx, justified.Root)
if err != nil {
return err
}
start = time.Now()
newHeadRoot, err := s.cfg.ForkChoiceStore.Head(ctx, balances)
if err != nil {
log.WithError(err).Warn("Resolving fork due to new attestation")
}
newAttHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
s.headLock.RLock()
if s.headRoot() != newHeadRoot {
log.WithFields(logrus.Fields{

View File

@@ -119,8 +119,11 @@ func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service {
require.NoError(t, err)
stateGen := stategen.New(beaconDB)
// Safe a state in stategen to purposes of testing a service stop / shutdown.
require.NoError(t, stateGen.SaveState(ctx, bytesutil.ToBytes32(bState.FinalizedCheckpoint().Root), bState))
// Save a state and block in stategen to purposes of testing a service stop / shutdown.
r, err := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, stateGen.SaveState(ctx, r, bState))
opts := []Option{
WithDatabase(beaconDB),
@@ -192,8 +195,6 @@ func TestChainStartStop_GenesisZeroHashes(t *testing.T) {
require.NoError(t, beaconDB.SaveState(ctx, s, blkRoot))
require.NoError(t, beaconDB.SaveGenesisBlockRoot(ctx, blkRoot))
require.NoError(t, beaconDB.SaveBlock(ctx, wsb))
require.NoError(t, beaconDB.SaveJustifiedCheckpoint(ctx, &ethpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}))
require.NoError(t, beaconDB.SaveFinalizedCheckpoint(ctx, &ethpb.Checkpoint{Root: blkRoot[:]}))
chainService.cfg.FinalizedStateAtStartUp = s
// Test the start function.
chainService.Start()
@@ -381,7 +382,7 @@ func TestChainService_SaveHeadNoDB(t *testing.T) {
}
blk := util.NewBeaconBlock()
blk.Block.Slot = 1
r, err := blk.HashTreeRoot()
r, err := util.SaveBlock(t, ctx, beaconDB, blk).Block().HashTreeRoot()
require.NoError(t, err)
newState, err := util.NewBeaconState()
require.NoError(t, err)

View File

@@ -30,8 +30,7 @@ type WeakSubjectivityVerifier struct {
// NewWeakSubjectivityVerifier validates a checkpoint, and if valid, uses it to initialize a weak subjectivity verifier.
func NewWeakSubjectivityVerifier(wsc *ethpb.Checkpoint, db weakSubjectivityDB) (*WeakSubjectivityVerifier, error) {
if wsc == nil || len(wsc.Root) == 0 || wsc.Epoch == 0 {
log.Info("--weak-subjectivity-checkpoint not provided. Prysm recommends providing a weak subjectivity checkpoint " +
"for nodes synced from genesis, or manual verification of block and state roots for checkpoint sync nodes.")
log.Debug("--weak-subjectivity-checkpoint not provided")
return &WeakSubjectivityVerifier{
enabled: false,
}, nil

View File

@@ -2,7 +2,6 @@ package builder
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
@@ -66,12 +65,12 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) {
// Is the builder up?
if err := s.c.Status(ctx); err != nil {
return nil, fmt.Errorf("could not connect to builder: %v", err)
log.WithError(err).Error("Failed to check builder status")
} else {
log.WithField("endpoint", c.NodeURL()).Info("Builder has been configured")
log.Warn("Outsourcing block construction to external builders adds non-trivial delay to block propagation time. " +
"Builder-constructed blocks or fallback blocks may get orphaned. Use at your own risk!")
}
log.WithField("endpoint", c.NodeURL()).Info("Builder has been configured")
log.Warn("Outsourcing block construction to external builders adds non-trivial delay to block propagation time. " +
"Builder-constructed blocks or fallback blocks may get orphaned. Use at your own risk!")
}
return s, nil
}

View File

@@ -7,6 +7,7 @@ go_library(
"beacon_committee.go",
"block.go",
"genesis.go",
"metrics.go",
"randao.go",
"rewards_penalties.go",
"shuffle.go",

View File

@@ -51,10 +51,11 @@ func ValidateSlotTargetEpoch(data *ethpb.AttestationData) error {
// committee count as an argument allows cheaper computation at run time.
//
// Spec pseudocode definition:
// def is_aggregator(state: BeaconState, slot: Slot, index: CommitteeIndex, slot_signature: BLSSignature) -> bool:
// committee = get_beacon_committee(state, slot, index)
// modulo = max(1, len(committee) // TARGET_AGGREGATORS_PER_COMMITTEE)
// return bytes_to_uint64(hash(slot_signature)[0:8]) % modulo == 0
//
// def is_aggregator(state: BeaconState, slot: Slot, index: CommitteeIndex, slot_signature: BLSSignature) -> bool:
// committee = get_beacon_committee(state, slot, index)
// modulo = max(1, len(committee) // TARGET_AGGREGATORS_PER_COMMITTEE)
// return bytes_to_uint64(hash(slot_signature)[0:8]) % modulo == 0
func IsAggregator(committeeCount uint64, slotSig []byte) (bool, error) {
modulo := uint64(1)
if committeeCount/params.BeaconConfig().TargetAggregatorsPerCommittee > 1 {
@@ -68,9 +69,10 @@ func IsAggregator(committeeCount uint64, slotSig []byte) (bool, error) {
// AggregateSignature returns the aggregated signature of the input attestations.
//
// Spec pseudocode definition:
// def get_aggregate_signature(attestations: Sequence[Attestation]) -> BLSSignature:
// signatures = [attestation.signature for attestation in attestations]
// return bls.Aggregate(signatures)
//
// def get_aggregate_signature(attestations: Sequence[Attestation]) -> BLSSignature:
// signatures = [attestation.signature for attestation in attestations]
// return bls.Aggregate(signatures)
func AggregateSignature(attestations []*ethpb.Attestation) (bls.Signature, error) {
sigs := make([]bls.Signature, len(attestations))
var err error
@@ -95,14 +97,15 @@ func IsAggregated(attestation *ethpb.Attestation) bool {
//
// Spec pseudocode definition:
// def compute_subnet_for_attestation(committees_per_slot: uint64, slot: Slot, committee_index: CommitteeIndex) -> uint64:
// """
// Compute the correct subnet for an attestation for Phase 0.
// Note, this mimics expected future behavior where attestations will be mapped to their shard subnet.
// """
// slots_since_epoch_start = uint64(slot % SLOTS_PER_EPOCH)
// committees_since_epoch_start = committees_per_slot * slots_since_epoch_start
//
// return uint64((committees_since_epoch_start + committee_index) % ATTESTATION_SUBNET_COUNT)
// """
// Compute the correct subnet for an attestation for Phase 0.
// Note, this mimics expected future behavior where attestations will be mapped to their shard subnet.
// """
// slots_since_epoch_start = uint64(slot % SLOTS_PER_EPOCH)
// committees_since_epoch_start = committees_per_slot * slots_since_epoch_start
//
// return uint64((committees_since_epoch_start + committee_index) % ATTESTATION_SUBNET_COUNT)
func ComputeSubnetForAttestation(activeValCount uint64, att *ethpb.Attestation) uint64 {
return ComputeSubnetFromCommitteeAndSlot(activeValCount, att.Data.CommitteeIndex, att.Data.Slot)
}
@@ -112,14 +115,15 @@ func ComputeSubnetForAttestation(activeValCount uint64, att *ethpb.Attestation)
//
// Spec pseudocode definition:
// def compute_subnet_for_attestation(committees_per_slot: uint64, slot: Slot, committee_index: CommitteeIndex) -> uint64:
// """
// Compute the correct subnet for an attestation for Phase 0.
// Note, this mimics expected future behavior where attestations will be mapped to their shard subnet.
// """
// slots_since_epoch_start = uint64(slot % SLOTS_PER_EPOCH)
// committees_since_epoch_start = committees_per_slot * slots_since_epoch_start
//
// return uint64((committees_since_epoch_start + committee_index) % ATTESTATION_SUBNET_COUNT)
// """
// Compute the correct subnet for an attestation for Phase 0.
// Note, this mimics expected future behavior where attestations will be mapped to their shard subnet.
// """
// slots_since_epoch_start = uint64(slot % SLOTS_PER_EPOCH)
// committees_since_epoch_start = committees_per_slot * slots_since_epoch_start
//
// return uint64((committees_since_epoch_start + committee_index) % ATTESTATION_SUBNET_COUNT)
func ComputeSubnetFromCommitteeAndSlot(activeValCount uint64, comIdx types.CommitteeIndex, attSlot types.Slot) uint64 {
slotSinceStart := slots.SinceEpochStarts(attSlot)
comCount := SlotCommitteeCount(activeValCount)
@@ -133,13 +137,15 @@ func ComputeSubnetFromCommitteeAndSlot(activeValCount uint64, comIdx types.Commi
// slots.
//
// Example:
// ATTESTATION_PROPAGATION_SLOT_RANGE = 5
// clockDisparity = 24 seconds
// current_slot = 100
// invalid_attestation_slot = 92
// invalid_attestation_slot = 103
// valid_attestation_slot = 98
// valid_attestation_slot = 101
//
// ATTESTATION_PROPAGATION_SLOT_RANGE = 5
// clockDisparity = 24 seconds
// current_slot = 100
// invalid_attestation_slot = 92
// invalid_attestation_slot = 103
// valid_attestation_slot = 98
// valid_attestation_slot = 101
//
// In the attestation must be within the range of 95 to 102 in the example above.
func ValidateAttestationTime(attSlot types.Slot, genesisTime time.Time, clockDisparity time.Duration) error {
if err := slots.ValidateClock(attSlot, uint64(genesisTime.Unix())); err != nil {
@@ -170,13 +176,19 @@ func ValidateAttestationTime(attSlot types.Slot, genesisTime time.Time, clockDis
lowerBounds := lowerTime.Add(-clockDisparity)
// Verify attestation slot within the time range.
if attTime.Before(lowerBounds) || attTime.After(upperBounds) {
return fmt.Errorf(
"attestation slot %d not within attestation propagation range of %d to %d (current slot)",
attSlot,
lowerBoundsSlot,
currentSlot,
)
attError := fmt.Errorf(
"attestation slot %d not within attestation propagation range of %d to %d (current slot)",
attSlot,
lowerBoundsSlot,
currentSlot,
)
if attTime.Before(lowerBounds) {
attReceivedTooEarlyCount.Inc()
return attError
}
if attTime.After(upperBounds) {
attReceivedTooLateCount.Inc()
return attError
}
return nil
}

View File

@@ -0,0 +1,17 @@
package helpers
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
attReceivedTooEarlyCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "attestation_too_early_total",
Help: "Increased when an attestation is considered too early",
})
attReceivedTooLateCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "attestation_too_late_total",
Help: "Increased when an attestation is considered too late",
})
)

View File

@@ -75,6 +75,7 @@ type NoHeadAccessDatabase interface {
DeleteStates(ctx context.Context, blockRoots [][32]byte) error
SaveStateSummary(ctx context.Context, summary *ethpb.StateSummary) error
SaveStateSummaries(ctx context.Context, summaries []*ethpb.StateSummary) error
SaveStateSummariesWithPendingBlocks(ctx context.Context, summaries []*ethpb.StateSummary, blockCache func([32]byte) (interfaces.SignedBeaconBlock, error)) error
// Checkpoint operations.
SaveJustifiedCheckpoint(ctx context.Context, checkpoint *ethpb.Checkpoint) error
SaveFinalizedCheckpoint(ctx context.Context, checkpoint *ethpb.Checkpoint) error

View File

@@ -57,6 +57,7 @@ go_library(
"//monitoring/tracing:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/version:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_dgraph_io_ristretto//:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
@@ -72,6 +73,7 @@ go_library(
"@io_etcd_go_bbolt//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
"@org_uber_go_multierr//:go_default_library",
],
)

View File

@@ -296,32 +296,40 @@ func (s *Store) SaveBlocks(ctx context.Context, blks []interfaces.SignedBeaconBl
indicesByBucket := createBlockIndicesFromBlock(ctx, blk.Block())
indicesForBlocks[i] = indicesByBucket
}
return s.db.Update(func(tx *bolt.Tx) error {
if err := s.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(blocksBucket)
for i, blk := range blks {
for i := range blks {
if existingBlock := bkt.Get(blockRoots[i]); existingBlock != nil {
continue
}
if err := updateValueForIndices(ctx, indicesForBlocks[i], blockRoots[i], tx); err != nil {
return errors.Wrap(err, "could not update DB indices")
}
if features.Get().EnableOnlyBlindedBeaconBlocks {
blindedBlock, err := blk.ToBlinded()
if err != nil {
if !errors.Is(err, blocks.ErrUnsupportedVersion) {
return err
}
} else {
blk = blindedBlock
}
}
s.blockCache.Set(string(blockRoots[i]), blk, int64(len(encodedBlocks[i])))
if err := bkt.Put(blockRoots[i], encodedBlocks[i]); err != nil {
return err
}
}
return nil
})
}); err != nil {
return err
}
// Populate cache only after successful db update.
for i, blk := range blks {
if features.Get().EnableOnlyBlindedBeaconBlocks {
blindedBlock, err := blk.ToBlinded()
if err != nil {
if !errors.Is(err, blocks.ErrUnsupportedVersion) {
return err
}
} else {
blk = blindedBlock
}
}
s.blockCache.Set(string(blockRoots[i]), blk, int64(len(encodedBlocks[i])))
}
return nil
}
// SaveHeadBlockRoot to the db.

View File

@@ -9,6 +9,7 @@ var ErrDeleteJustifiedAndFinalized = errors.New("cannot delete finalized block o
// indicate that a value couldn't be found.
var ErrNotFound = errors.New("not found in db")
var ErrNotFoundState = errors.Wrap(ErrNotFound, "state not found")
var ErrNotFoundBlock = errors.Wrap(ErrNotFound, "block not found")
// ErrNotFoundOriginBlockRoot is an error specifically for the origin block root getter
var ErrNotFoundOriginBlockRoot = errors.Wrap(ErrNotFound, "OriginBlockRoot")

View File

@@ -55,6 +55,14 @@ var (
Name: "validator_entry_cache_delete_total",
Help: "The total number of cache deletes on the validator entry cache.",
})
stateReadingTime = promauto.NewSummary(prometheus.SummaryOpts{
Name: "db_beacon_state_reading_milliseconds",
Help: "Milliseconds it takes to read a beacon state from the DB",
})
stateSavingTime = promauto.NewSummary(prometheus.SummaryOpts{
Name: "db_beacon_state_saving_milliseconds",
Help: "Milliseconds it takes to save a beacon state to the DB",
})
)
// BlockCacheSize specifies 1000 slots worth of blocks cached, which

View File

@@ -20,6 +20,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v3/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/time"
"github.com/prysmaticlabs/prysm/v3/time/slots"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
@@ -30,6 +31,7 @@ import (
func (s *Store) State(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.State")
defer span.End()
startTime := time.Now()
enc, err := s.stateBytes(ctx, blockRoot)
if err != nil {
return nil, err
@@ -44,7 +46,12 @@ func (s *Store) State(ctx context.Context, blockRoot [32]byte) (state.BeaconStat
return nil, valErr
}
return s.unmarshalState(ctx, enc, valEntries)
st, err := s.unmarshalState(ctx, enc, valEntries)
if err != nil {
return nil, err
}
stateReadingTime.Observe(float64(time.Since(startTime).Milliseconds()))
return st, err
}
// StateOrError is just like State(), except it only returns a non-error response
@@ -127,6 +134,7 @@ func (s *Store) SaveStates(ctx context.Context, states []state.ReadOnlyBeaconSta
if states == nil {
return errors.New("nil state")
}
startTime := time.Now()
multipleEncs := make([][]byte, len(states))
for i, st := range states {
stateBytes, err := marshalState(ctx, st)
@@ -136,7 +144,7 @@ func (s *Store) SaveStates(ctx context.Context, states []state.ReadOnlyBeaconSta
multipleEncs[i] = stateBytes
}
return s.db.Update(func(tx *bolt.Tx) error {
if err := s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(stateBucket)
for i, rt := range blockRoots {
indicesByBucket := createStateIndicesFromStateSlot(ctx, states[i].Slot())
@@ -148,7 +156,11 @@ func (s *Store) SaveStates(ctx context.Context, states []state.ReadOnlyBeaconSta
}
}
return nil
})
}); err != nil {
return err
}
stateSavingTime.Observe(float64(time.Since(startTime).Milliseconds()))
return nil
}
type withValidators interface {
@@ -760,8 +772,10 @@ func createStateIndicesFromStateSlot(ctx context.Context, slot types.Slot) map[s
// Only following states would be kept:
// 1.) state_slot % archived_interval == 0. (e.g. archived_interval=2048, states with slot 2048, 4096... etc)
// 2.) archived_interval - archived_interval/3 < state_slot % archived_interval
// (e.g. archived_interval=2048, states with slots after 1365).
// This is to tolerate skip slots. Not every state lays on the boundary.
//
// (e.g. archived_interval=2048, states with slots after 1365).
// This is to tolerate skip slots. Not every state lays on the boundary.
//
// 3.) state with current finalized root
// 4.) unfinalized States
func (s *Store) CleanUpDirtyStates(ctx context.Context, slotsPerArchivedPoint types.Slot) error {

View File

@@ -2,11 +2,15 @@ package kv
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
"go.uber.org/multierr"
)
// SaveStateSummary saves a state summary object to the DB.
@@ -21,10 +25,20 @@ func (s *Store) SaveStateSummary(ctx context.Context, summary *ethpb.StateSummar
func (s *Store) SaveStateSummaries(ctx context.Context, summaries []*ethpb.StateSummary) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveStateSummaries")
defer span.End()
return s.SaveStateSummariesWithPendingBlocks(ctx, summaries, nil)
}
// SaveStateSummariesWithPendingBlocks saves state summary objects to the DB.
func (s *Store) SaveStateSummariesWithPendingBlocks(ctx context.Context, summaries []*ethpb.StateSummary, blockCache func([32]byte) (interfaces.SignedBeaconBlock, error)) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveStateSummariesWithPendingBlocks")
defer span.End()
// When we reach the state summary cache prune count,
// dump the cached state summaries to the DB.
if s.stateSummaryCache.len() >= stateSummaryCachePruneCount {
if err := s.ensureBlocksSaved(ctx, blockCache); err != nil {
return err
}
if err := s.saveCachedStateSummariesDB(ctx); err != nil {
return err
}
@@ -37,6 +51,25 @@ func (s *Store) SaveStateSummaries(ctx context.Context, summaries []*ethpb.State
return nil
}
func (s *Store) ensureBlocksSaved(ctx context.Context, blockCache func([32]byte) (interfaces.SignedBeaconBlock, error)) error {
if blockCache == nil {
return nil
}
summaries := s.stateSummaryCache.getAll()
blocks := make([]interfaces.SignedBeaconBlock, 0, len(summaries))
for _, ss := range summaries {
if s.HasBlock(ctx, bytesutil.ToBytes32(ss.Root)) {
log.WithField("blockRoot", fmt.Sprintf("%#x", ss.Root)).Trace("Block already saved")
continue
}
if b, err := blockCache(bytesutil.ToBytes32(ss.Root)); err == nil && !b.IsNil() {
blocks = append(blocks, b)
}
}
log.WithField("blocks", len(blocks)).WithField("summaries", len(summaries)).Debug("Saving blocks from state summary cache")
return s.SaveBlocks(ctx, blocks)
}
// StateSummary returns the state summary object from the db using input block root.
func (s *Store) StateSummary(ctx context.Context, blockRoot [32]byte) (*ethpb.StateSummary, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.StateSummary")
@@ -96,10 +129,16 @@ func (s *Store) saveCachedStateSummariesDB(ctx context.Context) error {
}
encs[i] = enc
}
var errs []error
if err := s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(stateSummaryBucket)
bBkt := tx.Bucket(blocksBucket)
ssBkt := tx.Bucket(stateSummaryBucket)
for i, s := range summaries {
if err := bucket.Put(s.Root, encs[i]); err != nil {
if bBkt.Get(s.Root) == nil {
errs = append(errs, errors.Wrapf(ErrNotFoundBlock, "failed to save state summary with block root %#x", s.Root))
continue
}
if err := ssBkt.Put(s.Root, encs[i]); err != nil {
return err
}
}
@@ -108,7 +147,7 @@ func (s *Store) saveCachedStateSummariesDB(ctx context.Context) error {
return err
}
s.stateSummaryCache.clear()
return nil
return multierr.Combine(errs...)
}
// deleteStateSummary deletes a state summary object from the db using input block root.

View File

@@ -4,18 +4,27 @@ import (
"context"
"testing"
"github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/testing/assert"
"github.com/prysmaticlabs/prysm/v3/testing/require"
"github.com/prysmaticlabs/prysm/v3/testing/util"
)
func TestStateSummary_CanSaveRetrieve(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
r1 := bytesutil.ToBytes32([]byte{'A'})
r2 := bytesutil.ToBytes32([]byte{'B'})
b1 := util.NewBeaconBlock()
b1.Block.Slot = 1
r1, err := util.SaveBlock(t, ctx, db, b1).Block().HashTreeRoot()
require.NoError(t, err)
b2 := util.NewBeaconBlock()
b2.Block.Slot = 2
r2, err := util.SaveBlock(t, ctx, db, b2).Block().HashTreeRoot()
require.NoError(t, err)
s1 := &ethpb.StateSummary{Slot: 1, Root: r1[:]}
// State summary should not exist yet.
@@ -43,23 +52,130 @@ func TestStateSummary_CanSaveRetrieve(t *testing.T) {
func TestStateSummary_CacheToDB(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
summaries := make([]*ethpb.StateSummary, stateSummaryCachePruneCount-1)
roots := make([][32]byte, stateSummaryCachePruneCount-1)
for i := range summaries {
summaries[i] = &ethpb.StateSummary{Slot: types.Slot(i), Root: bytesutil.PadTo(bytesutil.Uint64ToBytesLittleEndian(uint64(i)), 32)}
b := util.NewBeaconBlock()
b.Block.Slot = types.Slot(i)
b.Block.Body.Graffiti = bytesutil.PadTo([]byte{byte(i)}, 32)
r, err := util.SaveBlock(t, ctx, db, b).Block().HashTreeRoot()
require.NoError(t, err)
summaries[i] = &ethpb.StateSummary{Slot: types.Slot(i), Root: r[:]}
roots[i] = r
}
require.NoError(t, db.SaveStateSummaries(context.Background(), summaries))
require.Equal(t, db.stateSummaryCache.len(), stateSummaryCachePruneCount-1)
require.NoError(t, db.SaveStateSummary(context.Background(), &ethpb.StateSummary{Slot: 1000, Root: []byte{'a', 'b'}}))
b := util.NewBeaconBlock()
b.Block.Slot = types.Slot(1000)
r, err := util.SaveBlock(t, ctx, db, b).Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, db.SaveStateSummary(context.Background(), &ethpb.StateSummary{Slot: 1000, Root: r[:]}))
require.Equal(t, db.stateSummaryCache.len(), stateSummaryCachePruneCount)
require.NoError(t, db.SaveStateSummary(context.Background(), &ethpb.StateSummary{Slot: 1001, Root: []byte{'c', 'd'}}))
b = util.NewBeaconBlock()
b.Block.Slot = 1001
r, err = util.SaveBlock(t, ctx, db, b).Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, db.SaveStateSummary(context.Background(), &ethpb.StateSummary{Slot: 1001, Root: r[:]}))
require.Equal(t, db.stateSummaryCache.len(), 1)
for _, r := range roots {
require.Equal(t, true, db.HasStateSummary(context.Background(), r))
}
}
func TestStateSummary_CacheToDB_FailsIfMissingBlock(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
summaries := make([]*ethpb.StateSummary, stateSummaryCachePruneCount-1)
for i := range summaries {
r := bytesutil.Uint64ToBytesLittleEndian(uint64(i))
require.Equal(t, true, db.HasStateSummary(context.Background(), bytesutil.ToBytes32(r)))
b := util.NewBeaconBlock()
b.Block.Slot = types.Slot(i)
b.Block.Body.Graffiti = bytesutil.PadTo([]byte{byte(i)}, 32)
r, err := util.SaveBlock(t, ctx, db, b).Block().HashTreeRoot()
require.NoError(t, err)
summaries[i] = &ethpb.StateSummary{Slot: types.Slot(i), Root: r[:]}
}
require.NoError(t, db.SaveStateSummaries(context.Background(), summaries))
require.Equal(t, db.stateSummaryCache.len(), stateSummaryCachePruneCount-1)
junkRoot := [32]byte{1, 2, 3}
require.NoError(t, db.SaveStateSummary(context.Background(), &ethpb.StateSummary{Slot: 1000, Root: junkRoot[:]}))
require.Equal(t, db.stateSummaryCache.len(), stateSummaryCachePruneCount)
// Next insertion causes the buffer to flush.
b := util.NewBeaconBlock()
b.Block.Slot = 1001
r, err := util.SaveBlock(t, ctx, db, b).Block().HashTreeRoot()
require.NoError(t, err)
require.ErrorIs(t, db.SaveStateSummary(context.Background(), &ethpb.StateSummary{Slot: 1001, Root: r[:]}), ErrNotFoundBlock)
require.Equal(t, db.HasStateSummary(ctx, junkRoot), false)
require.NoError(t, db.deleteStateSummary(junkRoot)) // Delete bad summary or db will throw an error on test cleanup.
}
func TestStateSummary_CacheToDB_UsesProvidedBlockCache(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
summaries := make([]*ethpb.StateSummary, stateSummaryCachePruneCount-1)
bCache := make(map[[32]byte]interfaces.SignedBeaconBlock, stateSummaryCachePruneCount-1)
for i := range summaries {
b := util.NewBeaconBlock()
b.Block.Slot = types.Slot(i)
b.Block.Body.Graffiti = bytesutil.PadTo([]byte{byte(i)}, 32)
wsb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
r, err := wsb.Block().HashTreeRoot()
require.NoError(t, err)
bCache[r] = wsb
summaries[i] = &ethpb.StateSummary{Slot: types.Slot(i), Root: r[:]}
}
// Simulated initial sync block cache.
pendingBlocksFn := func(r [32]byte) (interfaces.SignedBeaconBlock, error) {
b, ok := bCache[r]
if !ok {
return nil, ErrNotFoundBlock
}
return b, nil
}
require.NoError(t, db.SaveStateSummariesWithPendingBlocks(context.Background(), summaries, pendingBlocksFn))
require.Equal(t, db.stateSummaryCache.len(), stateSummaryCachePruneCount-1)
b := util.NewBeaconBlock()
b.Block.Slot = types.Slot(1000)
r, err := util.SaveBlock(t, ctx, db, b).Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, db.SaveStateSummariesWithPendingBlocks(context.Background(), []*ethpb.StateSummary{{Slot: 1000, Root: r[:]}}, pendingBlocksFn))
require.Equal(t, db.stateSummaryCache.len(), stateSummaryCachePruneCount)
// Next insertion causes the buffer to flush.
b = util.NewBeaconBlock()
b.Block.Slot = 1001
r, err = util.SaveBlock(t, ctx, db, b).Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, db.SaveStateSummariesWithPendingBlocks(context.Background(), []*ethpb.StateSummary{{Slot: 1001, Root: r[:]}}, pendingBlocksFn))
require.Equal(t, db.stateSummaryCache.len(), 1)
// All blocks in the bCache should have been saved.
for r := range bCache {
require.Equal(t, true, db.HasBlock(ctx, r), "Block %#x was not saved to db", r)
}
}

View File

@@ -24,12 +24,18 @@ var (
configMismatchLog = "Configuration mismatch between your execution client and Prysm. " +
"Please check your execution client and restart it with the proper configuration. If this is not done, " +
"your node will not be able to complete the proof-of-stake transition"
needsEnginePortLog = "Could not check execution client configuration. " +
"You are probably connecting to your execution client on the wrong port. For the Ethereum " +
"merge, you will need to connect to your " +
"execution client on port 8551 rather than 8545. This is known as the 'engine API' port and needs to be " +
"authenticated if connecting via HTTP. See our documentation on how to set up this up here " +
"https://docs.prylabs.network/docs/execution-node/authentication"
)
// Checks the transition configuration between Prysm and the connected execution node to ensure
// there are no differences in terminal block difficulty and block hash.
// If there are any discrepancies, we must log errors to ensure users can resolve
//the problem and be ready for the merge transition.
// the problem and be ready for the merge transition.
func (s *Service) checkTransitionConfiguration(
ctx context.Context, blockNotifications chan *feed.Event,
) {
@@ -48,10 +54,14 @@ func (s *Service) checkTransitionConfiguration(
}
err := s.ExchangeTransitionConfiguration(ctx, cfg)
if err != nil {
if errors.Is(err, ErrConfigMismatch) {
switch {
case errors.Is(err, ErrConfigMismatch):
log.WithError(err).Fatal(configMismatchLog)
case errors.Is(err, ErrMethodNotFound):
log.WithError(err).Error(needsEnginePortLog)
default:
log.WithError(err).Error("Could not check configuration values between execution and consensus client")
}
log.WithError(err).Error("Could not check configuration values between execution and consensus client")
}
// We poll the execution client to see if the transition configuration has changed.
@@ -115,6 +125,9 @@ func (s *Service) handleExchangeConfigurationError(err error) {
s.runError = err
log.WithError(err).Error(configMismatchLog)
return
} else if errors.Is(err, ErrMethodNotFound) {
log.WithError(err).Error(needsEnginePortLog)
return
}
log.WithError(err).Error("Could not check configuration values between execution and consensus client")
}

View File

@@ -537,22 +537,31 @@ func handleRPCError(err error) error {
}
switch e.ErrorCode() {
case -32700:
errParseCount.Inc()
return ErrParse
case -32600:
errInvalidRequestCount.Inc()
return ErrInvalidRequest
case -32601:
errMethodNotFoundCount.Inc()
return ErrMethodNotFound
case -32602:
errInvalidParamsCount.Inc()
return ErrInvalidParams
case -32603:
errInternalCount.Inc()
return ErrInternal
case -38001:
errUnknownPayloadCount.Inc()
return ErrUnknownPayload
case -38002:
errInvalidForkchoiceStateCount.Inc()
return ErrInvalidForkchoiceState
case -38003:
errInvalidPayloadAttributesCount.Inc()
return ErrInvalidPayloadAttributes
case -32000:
errServerErrorCount.Inc()
// Only -32000 status codes are data errors in the RPC specification.
errWithData, ok := err.(rpc.DataError)
if !ok {

View File

@@ -31,6 +31,42 @@ var (
Buckets: []float64{25, 50, 100, 200, 500, 1000, 2000, 4000},
},
)
errParseCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "execution_parse_error_count",
Help: "The number of errors that occurred while parsing execution payload",
})
errInvalidRequestCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "execution_invalid_request_count",
Help: "The number of errors that occurred due to invalid request",
})
errMethodNotFoundCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "execution_method_not_found_count",
Help: "The number of errors that occurred due to method not found",
})
errInvalidParamsCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "execution_invalid_params_count",
Help: "The number of errors that occurred due to invalid params",
})
errInternalCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "execution_internal_error_count",
Help: "The number of errors that occurred due to internal error",
})
errUnknownPayloadCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "execution_unknown_payload_count",
Help: "The number of errors that occurred due to unknown payload",
})
errInvalidForkchoiceStateCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "execution_invalid_forkchoice_state_count",
Help: "The number of errors that occurred due to invalid forkchoice state",
})
errInvalidPayloadAttributesCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "execution_invalid_payload_attributes_count",
Help: "The number of errors that occurred due to invalid payload attributes",
})
errServerErrorCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "execution_server_error_count",
Help: "The number of errors that occurred due to server error",
})
reconstructedExecutionPayloadCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "reconstructed_execution_payload_count",
Help: "Count the number of execution payloads that are reconstructed using JSON-RPC from payload headers",

View File

@@ -38,7 +38,13 @@ func (s *Service) setupExecutionClientConnections(ctx context.Context, currEndpo
// Ensure we have the correct chain and deposit IDs.
if err := ensureCorrectExecutionChain(ctx, fetcher); err != nil {
client.Close()
return errors.Wrap(err, "could not make initial request to verify execution chain ID")
errStr := err.Error()
if strings.Contains(errStr, "401 Unauthorized") {
errStr = "could not verify execution chain ID as your connection is not authenticated. " +
"If connecting to your execution client via HTTP, you will need to set up JWT authentication. " +
"See our documentation here https://docs.prylabs.network/docs/execution-node/authentication"
}
return errors.Wrap(err, errStr)
}
s.updateConnectedETH1(true)
s.runError = nil

View File

@@ -66,10 +66,6 @@ var (
logThreshold = 8
// period to log chainstart related information
logPeriod = 1 * time.Minute
// threshold of how old we will accept an eth1 node's head to be.
eth1Threshold = 20 * time.Minute
// error when eth1 node is too far behind.
errFarBehind = errors.Errorf("eth1 head is more than %s behind from current wall clock time", eth1Threshold.String())
)
// ChainStartFetcher retrieves information pertaining to the chain start event
@@ -604,11 +600,6 @@ func (s *Service) run(done <-chan struct{}) {
log.WithError(err).Debug("Could not fetch latest eth1 header")
continue
}
if eth1HeadIsBehind(head.Time) {
s.pollConnectionStatus(s.ctx)
log.WithError(errFarBehind).Debug("Could not get an up to date eth1 header")
continue
}
s.processBlockHeader(head)
s.handleETH1FollowDistance()
case <-chainstartTicker.C:
@@ -834,11 +825,3 @@ func dedupEndpoints(endpoints []string) []string {
}
return newEndpoints
}
// Checks if the provided timestamp is beyond the prescribed bound from
// the current wall clock time.
func eth1HeadIsBehind(timestamp uint64) bool {
timeout := prysmTime.Now().Add(-eth1Threshold)
// check that web3 client is syncing
return time.Unix(int64(timestamp), 0).Before(timeout) // lint:ignore uintcast -- timestamp will not exceed int64 in your lifetime.
}

View File

@@ -146,7 +146,7 @@ func TestStart_OK(t *testing.T) {
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
)
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
require.NoError(t, err, "unable to setup execution service")
web3Service = setDefaultMocks(web3Service)
web3Service.rpcClient = &mockExecution.RPCClient{Backend: testAcc.Backend}
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
@@ -156,7 +156,7 @@ func TestStart_OK(t *testing.T) {
web3Service.Start()
if len(hook.Entries) > 0 {
msg := hook.LastEntry().Message
want := "Could not connect to ETH1.0 chain RPC client"
want := "Could not connect to execution endpoint"
if strings.Contains(want, msg) {
t.Errorf("incorrect log, expected %s, got %s", want, msg)
}
@@ -473,6 +473,7 @@ func TestInitDepositCacheWithFinalization_OK(t *testing.T) {
emptyState, err := util.NewBeaconState()
require.NoError(t, err)
util.SaveBlock(t, context.Background(), s.cfg.beaconDB, headBlock)
require.NoError(t, s.cfg.beaconDB.SaveGenesisBlockRoot(context.Background(), headRoot))
require.NoError(t, s.cfg.beaconDB.SaveState(context.Background(), emptyState, headRoot))
require.NoError(t, stateGen.SaveState(context.Background(), headRoot, emptyState))
@@ -752,15 +753,6 @@ func TestService_ValidateDepositContainers(t *testing.T) {
}
}
func TestTimestampIsChecked(t *testing.T) {
timestamp := uint64(time.Now().Unix())
assert.Equal(t, false, eth1HeadIsBehind(timestamp))
// Give an older timestmap beyond threshold.
timestamp = uint64(time.Now().Add(-eth1Threshold).Add(-1 * time.Minute).Unix())
assert.Equal(t, true, eth1HeadIsBehind(timestamp))
}
func TestETH1Endpoints(t *testing.T) {
server, firstEndpoint, err := mockExecution.SetupRPCServer()
require.NoError(t, err)

View File

@@ -64,6 +64,7 @@ go_test(
"//beacon-chain/forkchoice/types:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/v3:go_default_library",
"//config/features:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/primitives:go_default_library",

View File

@@ -885,6 +885,15 @@ func (s *Store) viableForHead(node *Node) bool {
// It's also viable if we are in genesis epoch.
justified := s.justifiedCheckpoint.Epoch == node.justifiedEpoch || s.justifiedCheckpoint.Epoch == 0
finalized := s.finalizedCheckpoint.Epoch == node.finalizedEpoch || s.finalizedCheckpoint.Epoch == 0
if features.Get().EnableDefensivePull {
currentEpoch := slots.EpochsSinceGenesis(time.Unix(int64(s.genesisTime), 0))
if !justified && s.justifiedCheckpoint.Epoch+1 == currentEpoch {
if node.unrealizedJustifiedEpoch+1 >= currentEpoch {
justified = true
finalized = true
}
}
}
return justified && finalized
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/beacon-chain/forkchoice"
forkchoicetypes "github.com/prysmaticlabs/prysm/v3/beacon-chain/forkchoice/types"
"github.com/prysmaticlabs/prysm/v3/config/features"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
@@ -887,6 +888,43 @@ func TestStore_ViableForHead(t *testing.T) {
}
}
func TestStore_ViableForHead_DefensivePull(t *testing.T) {
resetCfg := features.InitWithReset(&features.Flags{
EnableDefensivePull: true,
})
defer resetCfg()
tests := []struct {
n *Node
justifiedEpoch types.Epoch
finalizedEpoch types.Epoch
currentEpoch types.Epoch
want bool
}{
{&Node{}, 0, 0, 0, true},
{&Node{}, 1, 0, 1, false},
{&Node{}, 0, 1, 1, false},
{&Node{finalizedEpoch: 1, justifiedEpoch: 1}, 1, 1, 1, true},
{&Node{finalizedEpoch: 1, justifiedEpoch: 1}, 2, 2, 2, false},
{&Node{finalizedEpoch: 3, justifiedEpoch: 4}, 4, 3, 3, true},
{&Node{unrealizedFinalizedEpoch: 3, unrealizedJustifiedEpoch: 4}, 3, 2, 4, true},
{&Node{unrealizedFinalizedEpoch: 2, unrealizedJustifiedEpoch: 3}, 3, 2, 4, true},
{&Node{unrealizedFinalizedEpoch: 1, unrealizedJustifiedEpoch: 2}, 3, 2, 4, false},
}
for _, tc := range tests {
jc := &forkchoicetypes.Checkpoint{Epoch: tc.justifiedEpoch}
fc := &forkchoicetypes.Checkpoint{Epoch: tc.finalizedEpoch}
currentTime := uint64(time.Now().Unix())
driftSeconds := uint64(params.BeaconConfig().SlotsPerEpoch) * params.BeaconConfig().SecondsPerSlot
s := &Store{
justifiedCheckpoint: jc,
finalizedCheckpoint: fc,
genesisTime: currentTime - driftSeconds*uint64(tc.currentEpoch),
}
assert.Equal(t, tc.want, s.viableForHead(tc.n))
}
}
func TestStore_HasParent(t *testing.T) {
tests := []struct {
m map[[32]byte]uint64

View File

@@ -53,6 +53,7 @@ go_test(
"//beacon-chain/core/altair:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/db/iface:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//config/params:go_default_library",

View File

@@ -3,6 +3,7 @@ package monitor
import (
"bytes"
"context"
"fmt"
"testing"
"github.com/prysmaticlabs/go-bitfield"
@@ -33,7 +34,7 @@ func TestGetAttestingIndices(t *testing.T) {
func TestProcessIncludedAttestationTwoTracked(t *testing.T) {
hook := logTest.NewGlobal()
s := setupService(t)
s, _ := setupService(t)
state, _ := util.DeterministicGenesisStateAltair(t, 256)
require.NoError(t, state.SetSlot(2))
require.NoError(t, state.SetCurrentParticipationBits(bytes.Repeat([]byte{0xff}, 13)))
@@ -66,7 +67,7 @@ func TestProcessUnaggregatedAttestationStateNotCached(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
s := setupService(t)
s, _ := setupService(t)
state, _ := util.DeterministicGenesisStateAltair(t, 256)
require.NoError(t, state.SetSlot(2))
header := state.LatestBlockHeader()
@@ -98,13 +99,13 @@ func TestProcessUnaggregatedAttestationStateCached(t *testing.T) {
ctx := context.Background()
hook := logTest.NewGlobal()
s := setupService(t)
s, db := setupService(t)
state, _ := util.DeterministicGenesisStateAltair(t, 256)
participation := []byte{0xff, 0xff, 0x01, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
require.NoError(t, state.SetCurrentParticipationBits(participation))
root := [32]byte{}
copy(root[:], "hello-world")
root, err := util.SaveBlock(t, ctx, db, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
att := &ethpb.Attestation{
Data: &ethpb.AttestationData{
@@ -124,8 +125,9 @@ func TestProcessUnaggregatedAttestationStateCached(t *testing.T) {
}
require.NoError(t, s.config.StateGen.SaveState(ctx, root, state))
s.processUnaggregatedAttestation(context.Background(), att)
wanted1 := "\"Processed unaggregated attestation\" Head=0x68656c6c6f2d Slot=1 Source=0x68656c6c6f2d Target=0x68656c6c6f2d ValidatorIndex=2 prefix=monitor"
wanted2 := "\"Processed unaggregated attestation\" Head=0x68656c6c6f2d Slot=1 Source=0x68656c6c6f2d Target=0x68656c6c6f2d ValidatorIndex=12 prefix=monitor"
rootStr := fmt.Sprintf("%#x", bytesutil.Trunc(root[:]))
wanted1 := fmt.Sprintf("\"Processed unaggregated attestation\" Head=%s Slot=1 Source=%s Target=%s ValidatorIndex=2 prefix=monitor", rootStr, rootStr, rootStr)
wanted2 := fmt.Sprintf("\"Processed unaggregated attestation\" Head=%s Slot=1 Source=%s Target=%s ValidatorIndex=12 prefix=monitor", rootStr, rootStr, rootStr)
require.LogsContain(t, hook, wanted1)
require.LogsContain(t, hook, wanted2)
}
@@ -135,7 +137,7 @@ func TestProcessAggregatedAttestationStateNotCached(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
s := setupService(t)
s, _ := setupService(t)
state, _ := util.DeterministicGenesisStateAltair(t, 256)
require.NoError(t, state.SetSlot(2))
header := state.LatestBlockHeader()
@@ -170,13 +172,13 @@ func TestProcessAggregatedAttestationStateNotCached(t *testing.T) {
func TestProcessAggregatedAttestationStateCached(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
s := setupService(t)
s, db := setupService(t)
state, _ := util.DeterministicGenesisStateAltair(t, 256)
participation := []byte{0xff, 0xff, 0x01, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
require.NoError(t, state.SetCurrentParticipationBits(participation))
root := [32]byte{}
copy(root[:], "hello-world")
root, err := util.SaveBlock(t, ctx, db, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
att := &ethpb.AggregateAttestationAndProof{
AggregatorIndex: 2,
@@ -200,14 +202,15 @@ func TestProcessAggregatedAttestationStateCached(t *testing.T) {
require.NoError(t, s.config.StateGen.SaveState(ctx, root, state))
s.processAggregatedAttestation(ctx, att)
require.LogsContain(t, hook, "\"Processed attestation aggregation\" AggregatorIndex=2 BeaconBlockRoot=0x68656c6c6f2d Slot=1 SourceRoot=0x68656c6c6f2d TargetRoot=0x68656c6c6f2d prefix=monitor")
require.LogsContain(t, hook, "\"Processed aggregated attestation\" Head=0x68656c6c6f2d Slot=1 Source=0x68656c6c6f2d Target=0x68656c6c6f2d ValidatorIndex=2 prefix=monitor")
require.LogsDoNotContain(t, hook, "\"Processed aggregated attestation\" Head=0x68656c6c6f2d Slot=1 Source=0x68656c6c6f2d Target=0x68656c6c6f2d ValidatorIndex=12 prefix=monitor")
rootStr := fmt.Sprintf("%#x", bytesutil.Trunc(root[:]))
require.LogsContain(t, hook, fmt.Sprintf("\"Processed attestation aggregation\" AggregatorIndex=2 BeaconBlockRoot=%s Slot=1 SourceRoot=%s TargetRoot=%s prefix=monitor", rootStr, rootStr, rootStr))
require.LogsContain(t, hook, fmt.Sprintf("\"Processed aggregated attestation\" Head=%s Slot=1 Source=%s Target=%s ValidatorIndex=2 prefix=monitor", rootStr, rootStr, rootStr))
require.LogsDoNotContain(t, hook, fmt.Sprintf("\"Processed aggregated attestation\" Head=%s Slot=1 Source=%s Target=%s ValidatorIndex=12 prefix=monitor", rootStr, rootStr, rootStr))
}
func TestProcessAttestations(t *testing.T) {
hook := logTest.NewGlobal()
s := setupService(t)
s, _ := setupService(t)
ctx := context.Background()
state, _ := util.DeterministicGenesisStateAltair(t, 256)
require.NoError(t, state.SetSlot(2))

View File

@@ -168,7 +168,7 @@ func TestProcessProposedBlock(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
hook := logTest.NewGlobal()
s := setupService(t)
s, _ := setupService(t)
beaconState, _ := util.DeterministicGenesisState(t, 256)
root := [32]byte{}
copy(root[:], "hello-world")
@@ -199,7 +199,7 @@ func TestProcessBlock_AllEventsTrackedVals(t *testing.T) {
genConfig.FullSyncAggregate = true
b, err := util.GenerateFullBlockAltair(genesis, keys, genConfig, 1)
require.NoError(t, err)
s := setupService(t)
s, db := setupService(t)
pubKeys := make([][]byte, 3)
pubKeys[0] = genesis.Validators()[0].PublicKey
@@ -223,7 +223,7 @@ func TestProcessBlock_AllEventsTrackedVals(t *testing.T) {
s.RUnlock()
s.updateSyncCommitteeTrackedVals(genesis)
root, err := b.GetBlock().HashTreeRoot()
root, err := util.SaveBlock(t, ctx, db, b).Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, s.config.StateGen.SaveState(ctx, root, genesis))
wanted1 := fmt.Sprintf("\"Proposed beacon block was included\" BalanceChange=100000000 BlockRoot=%#x NewBalance=32000000000 ParentRoot=0xf732eaeb7fae ProposerIndex=15 Slot=1 Version=1 prefix=monitor", bytesutil.Trunc(root[:]))
@@ -241,7 +241,7 @@ func TestProcessBlock_AllEventsTrackedVals(t *testing.T) {
func TestLogAggregatedPerformance(t *testing.T) {
hook := logTest.NewGlobal()
s := setupService(t)
s, _ := setupService(t)
s.logAggregatedPerformance()
time.Sleep(3000 * time.Millisecond)

View File

@@ -13,7 +13,7 @@ import (
func TestProcessSyncCommitteeContribution(t *testing.T) {
hook := logTest.NewGlobal()
s := setupService(t)
s, _ := setupService(t)
contrib := &ethpb.SignedContributionAndProof{
Message: &ethpb.ContributionAndProof{
@@ -28,7 +28,7 @@ func TestProcessSyncCommitteeContribution(t *testing.T) {
func TestProcessSyncAggregate(t *testing.T) {
hook := logTest.NewGlobal()
s := setupService(t)
s, _ := setupService(t)
beaconState, _ := util.DeterministicGenesisStateAltair(t, 256)
block := &ethpb.BeaconBlockAltair{

View File

@@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/db/iface"
testDB "github.com/prysmaticlabs/prysm/v3/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
@@ -22,7 +23,7 @@ import (
logTest "github.com/sirupsen/logrus/hooks/test"
)
func setupService(t *testing.T) *Service {
func setupService(t *testing.T) (*Service, iface.Database) {
beaconDB := testDB.SetupDB(t)
state, _ := util.DeterministicGenesisStateAltair(t, 256)
@@ -100,7 +101,7 @@ func setupService(t *testing.T) *Service {
aggregatedPerformance: aggregatedPerformance,
trackedSyncCommitteeIndices: trackedSyncCommitteeIndices,
lastSyncedEpoch: 0,
}
}, beaconDB
}
func TestTrackedIndex(t *testing.T) {
@@ -116,7 +117,7 @@ func TestTrackedIndex(t *testing.T) {
func TestUpdateSyncCommitteeTrackedVals(t *testing.T) {
hook := logTest.NewGlobal()
s := setupService(t)
s, _ := setupService(t)
state, _ := util.DeterministicGenesisStateAltair(t, 1024)
s.updateSyncCommitteeTrackedVals(state)
@@ -138,7 +139,7 @@ func TestNewService(t *testing.T) {
func TestStart(t *testing.T) {
hook := logTest.NewGlobal()
s := setupService(t)
s, _ := setupService(t)
stateChannel := make(chan *feed.Event, 1)
stateSub := s.config.StateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
@@ -180,7 +181,7 @@ func TestStart(t *testing.T) {
func TestInitializePerformanceStructures(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
s := setupService(t)
s, _ := setupService(t)
state, err := s.config.HeadFetcher.HeadState(ctx)
require.NoError(t, err)
epoch := slots.ToEpoch(state.Slot())
@@ -222,7 +223,7 @@ func TestInitializePerformanceStructures(t *testing.T) {
func TestMonitorRoutine(t *testing.T) {
ctx := context.Background()
hook := logTest.NewGlobal()
s := setupService(t)
s, db := setupService(t)
stateChannel := make(chan *feed.Event, 1)
stateSub := s.config.StateNotifier.StateFeed().Subscribe(stateChannel)
@@ -242,7 +243,7 @@ func TestMonitorRoutine(t *testing.T) {
genConfig := util.DefaultBlockGenConfig()
block, err := util.GenerateFullBlockAltair(genesis, keys, genConfig, 1)
require.NoError(t, err)
root, err := block.GetBlock().HashTreeRoot()
root, err := util.SaveBlock(t, ctx, db, block).Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, s.config.StateGen.SaveState(ctx, root, genesis))
@@ -266,7 +267,7 @@ func TestMonitorRoutine(t *testing.T) {
}
func TestWaitForSync(t *testing.T) {
s := setupService(t)
s, _ := setupService(t)
stateChannel := make(chan *feed.Event, 1)
stateSub := s.config.StateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
@@ -290,7 +291,7 @@ func TestWaitForSync(t *testing.T) {
func TestRun(t *testing.T) {
hook := logTest.NewGlobal()
s := setupService(t)
s, _ := setupService(t)
stateChannel := make(chan *feed.Event, 1)
stateSub := s.config.StateNotifier.StateFeed().Subscribe(stateChannel)

View File

@@ -1,20 +1,39 @@
package p2p
import (
"strings"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
knownAgentVersions = []string{
"lighthouse",
"nimbus",
"prysm",
"teku",
"js-libp2p",
"rust-libp2p",
}
p2pPeerCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "p2p_peer_count",
Help: "The number of peers in a given state.",
},
[]string{"state"})
totalPeerCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "libp2p_peers",
Help: "Tracks the total number of libp2p peers",
})
connectedPeersCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "connected_libp2p_peers",
Help: "Tracks the total number of connected libp2p peers by agent string",
},
[]string{"agent"},
)
avgScoreConnectedClients = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "connected_libp2p_peers_average_scores",
Help: "Tracks the overall p2p scores of connected libp2p peers by agent string",
},
[]string{"agent"},
)
repeatPeerConnections = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_repeat_attempts",
Help: "The number of repeat attempts the connection handler is triggered for a peer.",
@@ -46,10 +65,60 @@ var (
)
func (s *Service) updateMetrics() {
totalPeerCount.Set(float64(len(s.peers.Connected())))
p2pPeerCount.WithLabelValues("Connected").Set(float64(len(s.peers.Connected())))
connectedPeers := s.peers.Connected()
p2pPeerCount.WithLabelValues("Connected").Set(float64(len(connectedPeers)))
p2pPeerCount.WithLabelValues("Disconnected").Set(float64(len(s.peers.Disconnected())))
p2pPeerCount.WithLabelValues("Connecting").Set(float64(len(s.peers.Connecting())))
p2pPeerCount.WithLabelValues("Disconnecting").Set(float64(len(s.peers.Disconnecting())))
p2pPeerCount.WithLabelValues("Bad").Set(float64(len(s.peers.Bad())))
store := s.Host().Peerstore()
numConnectedPeersByClient := make(map[string]float64)
peerScoresByClient := make(map[string][]float64)
for i := 0; i < len(connectedPeers); i++ {
p := connectedPeers[i]
pid, err := peer.Decode(p.String())
if err != nil {
log.WithError(err).Debug("Could not decode peer string")
continue
}
// Get the agent data.
rawAgent, err := store.Get(pid, "AgentVersion")
agent, ok := rawAgent.(string)
if err != nil || !ok {
agent = "unknown"
}
foundName := "unknown"
for _, knownAgent := range knownAgentVersions {
// If the agent string matches one of our known agents, we set
// the value to our own, sanitized string.
if strings.Contains(strings.ToLower(agent), knownAgent) {
foundName = knownAgent
}
}
numConnectedPeersByClient[foundName] += 1
// Get peer scoring data.
overallScore := s.peers.Scorers().Score(pid)
peerScoresByClient[foundName] = append(peerScoresByClient[foundName], overallScore)
}
for agent, total := range numConnectedPeersByClient {
connectedPeersCount.WithLabelValues(agent).Set(total)
}
for agent, scoringData := range peerScoresByClient {
avgScore := average(scoringData)
avgScoreConnectedClients.WithLabelValues(agent).Set(avgScore)
}
}
func average(xs []float64) float64 {
if len(xs) == 0 {
return 0
}
total := 0.0
for _, v := range xs {
total += v
}
return total / float64(len(xs))
}

View File

@@ -245,9 +245,7 @@ func (s *Service) Start() {
})
async.RunEvery(s.ctx, 30*time.Minute, s.Peers().Prune)
async.RunEvery(s.ctx, params.BeaconNetworkConfig().RespTimeout, s.updateMetrics)
async.RunEvery(s.ctx, refreshRate, func() {
s.RefreshENR()
})
async.RunEvery(s.ctx, refreshRate, s.RefreshENR)
async.RunEvery(s.ctx, 1*time.Minute, func() {
log.WithFields(logrus.Fields{
"inbound": len(s.peers.InboundConnected()),

View File

@@ -501,8 +501,12 @@ func TestServer_ListIndexedAttestations_GenesisEpoch(t *testing.T) {
db := dbTest.SetupDB(t)
helpers.ClearCache()
ctx := context.Background()
targetRoot1 := bytesutil.ToBytes32([]byte("root"))
targetRoot2 := bytesutil.ToBytes32([]byte("root2"))
targetRoot1, err := util.SaveBlock(t, ctx, db, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
b := util.NewBeaconBlock()
b.Block.ParentRoot = targetRoot1[:]
targetRoot2, err := util.SaveBlock(t, ctx, db, b).Block().HashTreeRoot()
require.NoError(t, err)
count := params.BeaconConfig().SlotsPerEpoch
atts := make([]*ethpb.Attestation, 0, count)
@@ -571,7 +575,7 @@ func TestServer_ListIndexedAttestations_GenesisEpoch(t *testing.T) {
HeadFetcher: &chainMock.ChainService{State: state},
StateGen: stategen.New(db),
}
err := db.SaveStateSummary(ctx, &ethpb.StateSummary{
err = db.SaveStateSummary(ctx, &ethpb.StateSummary{
Root: targetRoot1[:],
Slot: 1,
})
@@ -610,7 +614,8 @@ func TestServer_ListIndexedAttestations_OldEpoch(t *testing.T) {
helpers.ClearCache()
ctx := context.Background()
blockRoot := bytesutil.ToBytes32([]byte("root"))
blockRoot, err := util.SaveBlock(t, ctx, db, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
count := params.BeaconConfig().SlotsPerEpoch
atts := make([]*ethpb.Attestation, 0, count)
epoch := types.Epoch(50)

View File

@@ -1542,12 +1542,15 @@ func TestServer_GetValidatorParticipation_CurrentAndPrevEpoch(t *testing.T) {
b.Block.Slot = 16
util.SaveBlock(t, ctx, beaconDB, b)
bRoot, err := b.Block.HashTreeRoot()
require.NoError(t, err)
bRoot2, err := util.SaveBlock(t, ctx, beaconDB, b).Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, beaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: bRoot[:]}))
require.NoError(t, beaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: params.BeaconConfig().ZeroHash[:]}))
require.NoError(t, beaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: bRoot2[:]}))
require.NoError(t, beaconDB.SaveGenesisBlockRoot(ctx, bRoot))
require.NoError(t, err)
require.NoError(t, beaconDB.SaveState(ctx, headState, bRoot))
require.NoError(t, beaconDB.SaveState(ctx, headState, params.BeaconConfig().ZeroHash))
require.NoError(t, beaconDB.SaveState(ctx, headState, bRoot2))
m := &mock.ChainService{State: headState}
offset := int64(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))

View File

@@ -58,7 +58,9 @@ func TestServer_GetAttestationInclusionSlot(t *testing.T) {
}
s, _ := util.DeterministicGenesisState(t, 2048)
tr := [32]byte{'a'}
b := util.NewBeaconBlock()
tr, err := util.SaveBlock(t, ctx, bs.BeaconDB, b).Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, bs.StateGen.SaveState(ctx, tr, s))
c, err := helpers.BeaconCommitteeFromState(context.Background(), s, 1, 0)
require.NoError(t, err)
@@ -73,7 +75,7 @@ func TestServer_GetAttestationInclusionSlot(t *testing.T) {
AggregationBits: bitfield.Bitlist{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01},
Signature: make([]byte, fieldparams.BLSSignatureLength),
}
b := util.NewBeaconBlock()
b = util.NewBeaconBlock()
b.Block.Slot = 2
b.Block.Body.Attestations = []*ethpb.Attestation{a}
util.SaveBlock(t, ctx, bs.BeaconDB, b)

View File

@@ -53,6 +53,7 @@ go_library(
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/sync:go_default_library",
"//config/features:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",

View File

@@ -127,9 +127,8 @@ func (vs *Server) getPayloadHeaderFromBuilder(ctx context.Context, slot types.Sl
return nil, errors.New("builder returned nil bid")
}
v := bid.Message.Value
if new(big.Int).SetBytes(bytesutil.ReverseByteOrder(v)).String() == "0" {
v := new(big.Int).SetBytes(bytesutil.ReverseByteOrder(bid.Message.Value))
if v.String() == "0" {
return nil, errors.New("builder returned header with 0 bid amount")
}
@@ -159,7 +158,7 @@ func (vs *Server) getPayloadHeaderFromBuilder(ctx context.Context, slot types.Sl
}
log.WithFields(logrus.Fields{
"bid": bytesutil.BytesToUint64BigEndian(bid.Message.Value),
"value": v.String(),
"builderPubKey": fmt.Sprintf("%#x", bid.Message.Pubkey),
"blockHash": fmt.Sprintf("%#x", bid.Message.Header.BlockHash),
}).Info("Received header with bid")

View File

@@ -8,6 +8,7 @@ import (
fastssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v3/config/features"
"github.com/prysmaticlabs/prysm/v3/config/params"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/crypto/hash"
@@ -106,6 +107,9 @@ func (vs *Server) canonicalEth1Data(
canonicalEth1Data = beaconState.Eth1Data()
eth1BlockHash = bytesutil.ToBytes32(beaconState.Eth1Data().BlockHash)
}
if features.Get().DisableStakinContractCheck && eth1BlockHash == [32]byte{} {
return canonicalEth1Data, new(big.Int).SetInt64(0), nil
}
_, canonicalEth1DataHeight, err := vs.Eth1BlockFetcher.BlockExists(ctx, eth1BlockHash)
if err != nil {
return nil, nil, errors.Wrap(err, "could not fetch eth1data height")

View File

@@ -123,6 +123,9 @@ func (vs *Server) ValidatorIndex(ctx context.Context, req *ethpb.ValidatorIndexR
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not determine head state: %v", err)
}
if st == nil || st.IsNil() {
return nil, status.Errorf(codes.Internal, "head state is empty")
}
index, ok := st.ValidatorIndexByPubkey(bytesutil.ToBytes48(req.PublicKey))
if !ok {
return nil, status.Errorf(codes.NotFound, "Could not find validator index for public key %#x", req.PublicKey)

View File

@@ -48,6 +48,18 @@ func TestValidatorIndex_OK(t *testing.T) {
assert.NoError(t, err, "Could not get validator index")
}
func TestValidatorIndex_StateEmpty(t *testing.T) {
Server := &Server{
HeadFetcher: &mockChain.ChainService{},
}
pubKey := pubKey(1)
req := &ethpb.ValidatorIndexRequest{
PublicKey: pubKey,
}
_, err := Server.ValidatorIndex(context.Background(), req)
assert.ErrorContains(t, "head state is empty", err)
}
func TestWaitForActivation_ContextClosed(t *testing.T) {
beaconState, err := v1.InitializeFromProto(&ethpb.BeaconState{
Slot: 0,

View File

@@ -67,7 +67,8 @@ func Test_processQueuedBlocks_DetectsDoubleProposals(t *testing.T) {
blksQueue: newBlocksQueue(),
}
parentRoot := bytesutil.ToBytes32([]byte("parent"))
parentRoot, err := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
err = s.serviceCfg.StateGen.SaveState(ctx, parentRoot, beaconState)
require.NoError(t, err)

View File

@@ -11,7 +11,6 @@ import (
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/crypto/bls"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/testing/require"
"github.com/prysmaticlabs/prysm/v3/testing/util"
@@ -157,7 +156,8 @@ func TestService_processProposerSlashings(t *testing.T) {
},
}
parentRoot := bytesutil.ToBytes32([]byte("parent"))
parentRoot, err := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
err = s.serviceCfg.StateGen.SaveState(ctx, parentRoot, beaconState)
require.NoError(t, err)

View File

@@ -62,7 +62,9 @@ func TestStateByRootIfCachedNoCopy_HotState(t *testing.T) {
service := New(beaconDB)
beaconState, _ := util.DeterministicGenesisState(t, 32)
r := [32]byte{'A'}
b := util.NewBeaconBlock()
r, err := util.SaveBlock(t, ctx, beaconDB, b).Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.beaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: r[:]}))
service.hotStateCache.put(r, beaconState)
@@ -102,6 +104,7 @@ func TestStateByRoot_HotStateUsingEpochBoundaryCacheNoReplay(t *testing.T) {
beaconState, _ := util.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(10))
blk := util.NewBeaconBlock()
util.SaveBlock(t, ctx, beaconDB, blk)
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.beaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: blkRoot[:]}))
@@ -143,7 +146,8 @@ func TestStateByRoot_HotStateCached(t *testing.T) {
service := New(beaconDB)
beaconState, _ := util.DeterministicGenesisState(t, 32)
r := [32]byte{'A'}
r, err := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.beaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: r[:]}))
service.hotStateCache.put(r, beaconState)
@@ -223,7 +227,8 @@ func TestStateByRootInitialSync_UseCache(t *testing.T) {
service := New(beaconDB)
beaconState, _ := util.DeterministicGenesisState(t, 32)
r := [32]byte{'A'}
r, err := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.beaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: r[:]}))
service.hotStateCache.put(r, beaconState)

View File

@@ -13,4 +13,16 @@ var (
Buckets: []float64{64, 256, 1024, 2048, 4096},
},
)
replayBlocksSummary = promauto.NewSummary(
prometheus.SummaryOpts{
Name: "replay_blocks_milliseconds",
Help: "Time it took to replay blocks",
},
)
replayToSlotSummary = promauto.NewSummary(
prometheus.SummaryOpts{
Name: "replay_to_slot_milliseconds",
Help: "Time it took to replay to slot",
},
)
)

View File

@@ -119,6 +119,7 @@ func (rs *stateReplayer) ReplayBlocks(ctx context.Context) (state.BeaconState, e
log.WithFields(logrus.Fields{
"duration": duration,
}).Debug("Finished calling process_blocks on all blocks in ReplayBlocks")
replayBlocksSummary.Observe(float64(duration.Milliseconds()))
return s, nil
}
@@ -150,14 +151,14 @@ func (rs *stateReplayer) ReplayToSlot(ctx context.Context, replayTo types.Slot)
// err will be handled after the bookend log
s, err = ReplayProcessSlots(ctx, s, replayTo)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("ReplayToSlot failed to seek to slot %d after applying blocks", replayTo))
}
duration := time.Since(start)
log.WithFields(logrus.Fields{
"duration": duration,
}).Debug("time spent in process_slots")
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("ReplayToSlot failed to seek to slot %d after applying blocks", replayTo))
}
replayToSlotSummary.Observe(float64(duration.Milliseconds()))
return s, nil
}

View File

@@ -74,13 +74,6 @@ func (s *State) saveStateByRoot(ctx context.Context, blockRoot [32]byte, st stat
return nil
}
// Only on an epoch boundary slot, save epoch boundary state in epoch boundary root state cache.
if slots.IsEpochStart(st.Slot()) {
if err := s.epochBoundaryStateCache.put(blockRoot, st); err != nil {
return err
}
}
// On an intermediate slot, save state summary.
if err := s.beaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{
Slot: st.Slot(),
@@ -89,6 +82,13 @@ func (s *State) saveStateByRoot(ctx context.Context, blockRoot [32]byte, st stat
return err
}
// Only on an epoch boundary slot, save epoch boundary state in epoch boundary root state cache.
if slots.IsEpochStart(st.Slot()) {
if err := s.epochBoundaryStateCache.put(blockRoot, st); err != nil {
return err
}
}
// Store the copied state in the hot state cache.
s.hotStateCache.put(blockRoot, st)

View File

@@ -6,6 +6,7 @@ import (
testDB "github.com/prysmaticlabs/prysm/v3/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v3/testing/assert"
"github.com/prysmaticlabs/prysm/v3/testing/require"
"github.com/prysmaticlabs/prysm/v3/testing/util"
@@ -22,7 +23,8 @@ func TestSaveState_HotStateCanBeSaved(t *testing.T) {
// This goes to hot section, verify it can save on epoch boundary.
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch))
r := [32]byte{'a'}
r, err := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.SaveState(ctx, r, beaconState))
// Should save both state and state summary.
@@ -43,7 +45,8 @@ func TestSaveState_HotStateCached(t *testing.T) {
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch))
// Cache the state prior.
r := [32]byte{'a'}
r, err := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
service.hotStateCache.put(r, beaconState)
require.NoError(t, service.SaveState(ctx, r, beaconState))
@@ -61,7 +64,8 @@ func TestState_ForceCheckpoint_SavesStateToDatabase(t *testing.T) {
beaconState, _ := util.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch))
r := [32]byte{'a'}
r, err := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
svc.hotStateCache.put(r, beaconState)
require.Equal(t, false, beaconDB.HasState(ctx, r), "Database has state stored already")
@@ -80,7 +84,8 @@ func TestSaveState_Alreadyhas(t *testing.T) {
beaconState, _ := util.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch))
r := [32]byte{'A'}
r, err := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
// Pre cache the hot state.
service.hotStateCache.put(r, beaconState)
@@ -99,7 +104,8 @@ func TestSaveState_CanSaveOnEpochBoundary(t *testing.T) {
beaconState, _ := util.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch))
r := [32]byte{'A'}
r, err := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.saveStateByRoot(ctx, r, beaconState))
@@ -120,8 +126,10 @@ func TestSaveState_NoSaveNotEpochBoundary(t *testing.T) {
beaconState, _ := util.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch-1))
r := [32]byte{'A'}
r, err := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
b := util.NewBeaconBlock()
b.Block.Body.Graffiti = bytesutil.PadTo([]byte("foo"), 32)
util.SaveBlock(t, ctx, beaconDB, b)
gRoot, err := b.Block.HashTreeRoot()
require.NoError(t, err)
@@ -145,7 +153,8 @@ func TestSaveState_CanSaveHotStateToDB(t *testing.T) {
beaconState, _ := util.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(defaultHotStateDBInterval))
r := [32]byte{'A'}
r, err := util.SaveBlock(t, ctx, beaconDB, util.NewBeaconBlock()).Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.saveStateByRoot(ctx, r, beaconState))
require.LogsContain(t, hook, "Saving hot state to DB")

View File

@@ -131,7 +131,7 @@ func (s *Service) processFetchedData(
// Use Batch Block Verify to process and verify batches directly.
if err := s.processBatchedBlocks(ctx, genesis, data.blocks, s.cfg.Chain.ReceiveBlockBatch); err != nil {
log.WithError(err).Warn("Batch is not processed")
log.WithError(err).Warn("Skip processing batched blocks")
}
}
@@ -260,7 +260,7 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
headSlot := s.cfg.Chain.HeadSlot()
for headSlot >= firstBlock.Block().Slot() && s.isProcessedBlock(ctx, firstBlock, blkRoot) {
if len(blks) == 1 {
return errors.New("no good blocks in batch")
return fmt.Errorf("headSlot:%d, blockSlot:%d , root %#x:%w", headSlot, firstBlock.Block().Slot(), blkRoot, errBlockAlreadyProcessed)
}
blks = blks[1:]
firstBlock = blks[0]

View File

@@ -457,7 +457,7 @@ func TestService_processBlockBatch(t *testing.T) {
ctx context.Context, blocks []interfaces.SignedBeaconBlock, blockRoots [][32]byte) error {
return nil
})
assert.ErrorContains(t, "no good blocks in batch", err)
assert.ErrorContains(t, "block is already processed", err)
var badBatch2 []interfaces.SignedBeaconBlock
for i, b := range batch2 {

View File

@@ -89,6 +89,38 @@ var (
Buckets: []float64{250, 500, 1000, 1500, 2000, 4000, 8000, 16000},
},
)
// Attestation processing granular error tracking.
attBadBlockCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "gossip_attestation_bad_block_total",
Help: "Increased when a gossip attestation references a bad block",
})
attBadLmdConsistencyCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "gossip_attestation_bad_lmd_consistency_total",
Help: "Increased when a gossip attestation has bad LMD GHOST consistency",
})
attBadSelectionProofCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "gossip_attestation_bad_selection_proof_total",
Help: "Increased when a gossip attestation has a bad selection proof",
})
attBadSignatureBatchCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "gossip_attestation_bad_signature_batch_total",
Help: "Increased when a gossip attestation has a bad signature batch",
})
// Attestation and block gossip verification performance.
aggregateAttestationVerificationGossipSummary = promauto.NewSummary(
prometheus.SummaryOpts{
Name: "gossip_aggregate_attestation_verification_milliseconds",
Help: "Time to verify gossiped attestations",
},
)
blockVerificationGossipSummary = promauto.NewSummary(
prometheus.SummaryOpts{
Name: "gossip_block_verification_milliseconds",
Help: "Time to verify gossiped blocks",
},
)
)
func (s *Service) updateMetrics() {

View File

@@ -20,6 +20,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v3/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
prysmTime "github.com/prysmaticlabs/prysm/v3/time"
"github.com/prysmaticlabs/prysm/v3/time/slots"
"go.opencensus.io/trace"
)
@@ -27,6 +28,7 @@ import (
// validateAggregateAndProof verifies the aggregated signature and the selection proof is valid before forwarding to the
// network and downstream services.
func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
receivedTime := prysmTime.Now()
if pid == s.cfg.p2p.PeerID() {
return pubsub.ValidationAccept, nil
}
@@ -75,8 +77,11 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
// Attestation's slot is within ATTESTATION_PROPAGATION_SLOT_RANGE and early attestation
// processing tolerance.
if err := helpers.ValidateAttestationTime(m.Message.Aggregate.Data.Slot, s.cfg.chain.GenesisTime(),
earlyAttestationProcessingTolerance); err != nil {
if err := helpers.ValidateAttestationTime(
m.Message.Aggregate.Data.Slot,
s.cfg.chain.GenesisTime(),
earlyAttestationProcessingTolerance,
); err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationIgnore, err
}
@@ -89,6 +94,7 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
if s.hasBadBlock(bytesutil.ToBytes32(m.Message.Aggregate.Data.BeaconBlockRoot)) ||
s.hasBadBlock(bytesutil.ToBytes32(m.Message.Aggregate.Data.Target.Root)) ||
s.hasBadBlock(bytesutil.ToBytes32(m.Message.Aggregate.Data.Source.Root)) {
attBadBlockCount.Inc()
return pubsub.ValidationReject, errors.New("bad block referenced in attestation data")
}
@@ -114,6 +120,8 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
msg.ValidatorData = m
aggregateAttestationVerificationGossipSummary.Observe(float64(prysmTime.Since(receivedTime).Milliseconds()))
return pubsub.ValidationAccept, nil
}
@@ -127,6 +135,7 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed *ethpb.Signe
// but it's invalid in the spirit of the protocol. Here we choose safety over profit.
if err := s.cfg.chain.VerifyLmdFfgConsistency(ctx, signed.Message.Aggregate); err != nil {
tracing.AnnotateError(span, err)
attBadLmdConsistencyCount.Inc()
return pubsub.ValidationReject, err
}
@@ -168,6 +177,7 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed *ethpb.Signe
if err != nil {
wrappedErr := errors.Wrapf(err, "Could not validate selection for validator %d", signed.Message.AggregatorIndex)
tracing.AnnotateError(span, wrappedErr)
attBadSelectionProofCount.Inc()
return pubsub.ValidationReject, wrappedErr
}

View File

@@ -124,6 +124,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
if s.hasBadBlock(bytesutil.ToBytes32(att.Data.BeaconBlockRoot)) ||
s.hasBadBlock(bytesutil.ToBytes32(att.Data.Target.Root)) ||
s.hasBadBlock(bytesutil.ToBytes32(att.Data.Source.Root)) {
attBadBlockCount.Inc()
return pubsub.ValidationReject, errors.New("attestation data references bad block root")
}
@@ -141,6 +142,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
}
if err := s.cfg.chain.VerifyLmdFfgConsistency(ctx, att); err != nil {
tracing.AnnotateError(span, err)
attBadLmdConsistencyCount.Inc()
return pubsub.ValidationReject, err
}
@@ -222,6 +224,7 @@ func (s *Service) validateUnaggregatedAttWithState(ctx context.Context, a *eth.A
set, err := blocks.AttestationSignatureBatch(ctx, bs, []*eth.Attestation{a})
if err != nil {
tracing.AnnotateError(span, err)
attBadSignatureBatchCount.Inc()
return pubsub.ValidationReject, err
}
return s.validateWithBatchVerifier(ctx, "attestation", set)

View File

@@ -204,6 +204,8 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms
"proposerIndex": blk.Block().ProposerIndex(),
"graffiti": string(blk.Block().Body().Graffiti()),
}).Debug("Received block")
blockVerificationGossipSummary.Observe(float64(prysmTime.Since(receivedTime).Milliseconds()))
return pubsub.ValidationAccept, nil
}
@@ -253,16 +255,17 @@ func (s *Service) validateBeaconBlock(ctx context.Context, blk interfaces.Signed
// validateBellatrixBeaconBlock validates the block for the Bellatrix fork.
// spec code:
// If the execution is enabled for the block -- i.e. is_execution_enabled(state, block.body) then validate the following:
// [REJECT] The block's execution payload timestamp is correct with respect to the slot --
// i.e. execution_payload.timestamp == compute_timestamp_at_slot(state, block.slot).
//
// If exection_payload verification of block's parent by an execution node is not complete:
// [REJECT] The block's parent (defined by block.parent_root) passes all validation (excluding execution
// node verification of the block.body.execution_payload).
// otherwise:
// [IGNORE] The block's parent (defined by block.parent_root) passes all validation (including execution
// node verification of the block.body.execution_payload).
// If the execution is enabled for the block -- i.e. is_execution_enabled(state, block.body) then validate the following:
// [REJECT] The block's execution payload timestamp is correct with respect to the slot --
// i.e. execution_payload.timestamp == compute_timestamp_at_slot(state, block.slot).
//
// If exection_payload verification of block's parent by an execution node is not complete:
// [REJECT] The block's parent (defined by block.parent_root) passes all validation (excluding execution
// node verification of the block.body.execution_payload).
// otherwise:
// [IGNORE] The block's parent (defined by block.parent_root) passes all validation (including execution
// node verification of the block.body.execution_payload).
func (s *Service) validateBellatrixBeaconBlock(ctx context.Context, parentState state.BeaconState, blk interfaces.BeaconBlock) error {
// Error if block and state are not the same version
if parentState.Version() != blk.Version() {

View File

@@ -216,6 +216,7 @@ func TestValidateBeaconBlockPubSub_IsInCache(t *testing.T) {
ctx := context.Background()
beaconState, privKeys := util.DeterministicGenesisState(t, 100)
parentBlock := util.NewBeaconBlock()
util.SaveBlock(t, ctx, db, parentBlock)
bRoot, err := parentBlock.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, db.SaveState(ctx, beaconState, bRoot))

View File

@@ -11,9 +11,11 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v3/cmd/prysmctl",
visibility = ["//visibility:private"],
deps = [
"//cmd/prysmctl/checkpoint:go_default_library",
"//cmd/prysmctl/checkpointsync:go_default_library",
"//cmd/prysmctl/deprecated:go_default_library",
"//cmd/prysmctl/p2p:go_default_library",
"//cmd/prysmctl/testnet:go_default_library",
"//cmd/prysmctl/weaksubjectivity:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
],

View File

@@ -3,11 +3,10 @@ load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"checkpoint.go",
"latest.go",
"save.go",
"cmd.go",
"download.go",
],
importpath = "github.com/prysmaticlabs/prysm/v3/cmd/prysmctl/checkpoint",
importpath = "github.com/prysmaticlabs/prysm/v3/cmd/prysmctl/checkpointsync",
visibility = ["//visibility:public"],
deps = [
"//api/client/beacon:go_default_library",

View File

@@ -0,0 +1,14 @@
package checkpointsync
import "github.com/urfave/cli/v2"
var Commands = []*cli.Command{
{
Name: "checkpoint-sync",
Aliases: []string{"cpt-sync"},
Usage: "commands for managing checkpoint sync",
Subcommands: []*cli.Command{
downloadCmd,
},
},
}

View File

@@ -1,4 +1,4 @@
package checkpoint
package checkpointsync
import (
"context"
@@ -10,37 +10,38 @@ import (
"github.com/urfave/cli/v2"
)
var saveFlags = struct {
var downloadFlags = struct {
BeaconNodeHost string
Timeout time.Duration
}{}
var saveCmd = &cli.Command{
Name: "save",
Usage: "Save the latest finalized header and the most recent block it integrates. To be used for checkpoint sync.",
Action: cliActionSave,
var downloadCmd = &cli.Command{
Name: "download",
Aliases: []string{"dl"},
Usage: "Download the latest finalized state and the most recent block it integrates. To be used for checkpoint sync.",
Action: cliActionDownload,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "beacon-node-host",
Usage: "host:port for beacon node connection",
Destination: &saveFlags.BeaconNodeHost,
Destination: &downloadFlags.BeaconNodeHost,
Value: "localhost:3500",
},
&cli.DurationFlag{
Name: "http-timeout",
Usage: "timeout for http requests made to beacon-node-url (uses duration format, ex: 2m31s). default: 4m",
Destination: &saveFlags.Timeout,
Destination: &downloadFlags.Timeout,
Value: time.Minute * 4,
},
},
}
func cliActionSave(_ *cli.Context) error {
func cliActionDownload(_ *cli.Context) error {
ctx := context.Background()
f := saveFlags
f := downloadFlags
opts := []beacon.ClientOpt{beacon.WithTimeout(f.Timeout)}
client, err := beacon.NewClient(saveFlags.BeaconNodeHost, opts...)
client, err := beacon.NewClient(downloadFlags.BeaconNodeHost, opts...)
if err != nil {
return err
}

View File

@@ -0,0 +1,12 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["cmd.go"],
importpath = "github.com/prysmaticlabs/prysm/v3/cmd/prysmctl/deprecated",
visibility = ["//visibility:public"],
deps = [
"//cmd/prysmctl/deprecated/checkpoint:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
],
)

View File

@@ -0,0 +1,13 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"checkpoint.go",
"latest.go",
"save.go",
],
importpath = "github.com/prysmaticlabs/prysm/v3/cmd/prysmctl/deprecated/checkpoint",
visibility = ["//visibility:public"],
deps = ["@com_github_urfave_cli_v2//:go_default_library"],
)

View File

@@ -6,9 +6,9 @@ var Commands = []*cli.Command{
{
Name: "checkpoint",
Aliases: []string{"cpt"},
Usage: "commands for managing checkpoint syncing",
Usage: "deprecated",
Subcommands: []*cli.Command{
latestCmd,
checkpointCmd,
saveCmd,
},
},

View File

@@ -0,0 +1,17 @@
package checkpoint
import (
"fmt"
"github.com/urfave/cli/v2"
)
var checkpointCmd = &cli.Command{
Name: "latest",
Usage: "deprecated - please use 'prysmctl weak-subjectivity checkpoint' instead!",
Action: cliDeprecatedLatest,
}
func cliDeprecatedLatest(_ *cli.Context) error {
return fmt.Errorf("This command has moved. Please use 'prysmctl weak-subjectivity checkpoint' instead!")
}

View File

@@ -0,0 +1,17 @@
package checkpoint
import (
"fmt"
"github.com/urfave/cli/v2"
)
var saveCmd = &cli.Command{
Name: "save",
Usage: "deprecated - please use 'prysmctl checkpoint-sync download' instead!",
Action: cliActionDeprecatedSave,
}
func cliActionDeprecatedSave(_ *cli.Context) error {
return fmt.Errorf("This command has moved. Please use 'prysmctl checkpoint-sync download' instead!")
}

View File

@@ -0,0 +1,12 @@
package deprecated
import (
"github.com/prysmaticlabs/prysm/v3/cmd/prysmctl/deprecated/checkpoint"
"github.com/urfave/cli/v2"
)
var Commands = []*cli.Command{}
func init() {
Commands = append(Commands, checkpoint.Commands...)
}

View File

@@ -3,9 +3,11 @@ package main
import (
"os"
"github.com/prysmaticlabs/prysm/v3/cmd/prysmctl/checkpoint"
"github.com/prysmaticlabs/prysm/v3/cmd/prysmctl/checkpointsync"
"github.com/prysmaticlabs/prysm/v3/cmd/prysmctl/deprecated"
"github.com/prysmaticlabs/prysm/v3/cmd/prysmctl/p2p"
"github.com/prysmaticlabs/prysm/v3/cmd/prysmctl/testnet"
"github.com/prysmaticlabs/prysm/v3/cmd/prysmctl/weaksubjectivity"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)
@@ -23,7 +25,12 @@ func main() {
}
func init() {
prysmctlCommands = append(prysmctlCommands, checkpoint.Commands...)
prysmctlCommands = append(prysmctlCommands, testnet.Commands...)
// contains the old checkpoint sync subcommands. these commands should display help/warn messages
// pointing to their new locations
prysmctlCommands = append(prysmctlCommands, deprecated.Commands...)
prysmctlCommands = append(prysmctlCommands, checkpointsync.Commands...)
prysmctlCommands = append(prysmctlCommands, p2p.Commands...)
prysmctlCommands = append(prysmctlCommands, testnet.Commands...)
prysmctlCommands = append(prysmctlCommands, weaksubjectivity.Commands...)
}

View File

@@ -0,0 +1,15 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"checkpoint.go",
"cmd.go",
],
importpath = "github.com/prysmaticlabs/prysm/v3/cmd/prysmctl/weaksubjectivity",
visibility = ["//visibility:public"],
deps = [
"//api/client/beacon:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
],
)

View File

@@ -1,4 +1,4 @@
package checkpoint
package weaksubjectivity
import (
"context"
@@ -9,37 +9,38 @@ import (
"github.com/urfave/cli/v2"
)
var latestFlags = struct {
var checkpointFlags = struct {
BeaconNodeHost string
Timeout time.Duration
}{}
var latestCmd = &cli.Command{
Name: "latest",
Usage: "Compute the latest weak subjectivity checkpoint (block_root:epoch) using trusted server data.",
Action: cliActionLatest,
var checkpointCmd = &cli.Command{
Name: "checkpoint",
Aliases: []string{"cpt"},
Usage: "Compute the latest weak subjectivity checkpoint (block_root:epoch) using trusted server data.",
Action: cliActionCheckpoint,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "beacon-node-host",
Usage: "host:port for beacon node to query",
Destination: &latestFlags.BeaconNodeHost,
Destination: &checkpointFlags.BeaconNodeHost,
Value: "http://localhost:3500",
},
&cli.DurationFlag{
Name: "http-timeout",
Usage: "timeout for http requests made to beacon-node-url (uses duration format, ex: 2m31s). default: 2m",
Destination: &latestFlags.Timeout,
Destination: &checkpointFlags.Timeout,
Value: time.Minute * 2,
},
},
}
func cliActionLatest(_ *cli.Context) error {
func cliActionCheckpoint(_ *cli.Context) error {
ctx := context.Background()
f := latestFlags
f := checkpointFlags
opts := []beacon.ClientOpt{beacon.WithTimeout(f.Timeout)}
client, err := beacon.NewClient(latestFlags.BeaconNodeHost, opts...)
client, err := beacon.NewClient(checkpointFlags.BeaconNodeHost, opts...)
if err != nil {
return err
}

View File

@@ -0,0 +1,14 @@
package weaksubjectivity
import "github.com/urfave/cli/v2"
var Commands = []*cli.Command{
{
Name: "weak-subjectivity",
Aliases: []string{"ws"},
Usage: "commands dealing with weak subjectivity",
Subcommands: []*cli.Command{
checkpointCmd,
},
},
}

View File

@@ -69,6 +69,8 @@ type Flags struct {
EnableOnlyBlindedBeaconBlocks bool // EnableOnlyBlindedBeaconBlocks enables only storing blinded beacon blocks in the DB post-Bellatrix fork.
EnableStartOptimistic bool // EnableStartOptimistic treats every block as optimistic at startup.
DisableStakinContractCheck bool // Disables check for deposit contract when proposing blocks
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
// changed on disk. This feature is for advanced use cases only.
KeystoreImportDebounceInterval time.Duration
@@ -214,7 +216,10 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
logEnabled(enableDefensivePull)
cfg.EnableDefensivePull = true
}
if ctx.Bool(disableStakinContractCheck.Name) {
logEnabled(disableStakinContractCheck)
cfg.DisableStakinContractCheck = true
}
if ctx.Bool(disableVecHTR.Name) {
logEnabled(disableVecHTR)
} else {

View File

@@ -90,6 +90,8 @@ var deprecatedFlags = []cli.Flag{
deprecatedFallbackProvider,
}
// deprecatedBeaconFlags contains flags that are still used by other components
// and therefore cannot be added to deprecatedFlags
var deprecatedBeaconFlags = []cli.Flag{
deprecatedBackupWebHookFlag,
}

View File

@@ -83,6 +83,10 @@ var (
"a foolproof method to find duplicate instances in the network. Your validator will still be" +
" vulnerable if it is being run in unsafe configurations.",
}
disableStakinContractCheck = &cli.BoolFlag{
Name: "disable-staking-contract-check",
Usage: "Disables checking of staking contract deposits when proposing blocks, useful for devnets",
}
enableHistoricalSpaceRepresentation = &cli.BoolFlag{
Name: "enable-historical-state-representation",
Usage: "Enables the beacon chain to save historical states in a space efficient manner." +

View File

@@ -24,8 +24,6 @@ var (
ErrNilObject = errors.New("received nil object")
// ErrNilSignedBeaconBlock is returned when a nil signed beacon block is received.
ErrNilSignedBeaconBlock = errors.New("signed beacon block can't be nil")
errNilBeaconBlock = errors.New("beacon block can't be nil")
errNilBeaconBlockBody = errors.New("beacon block body can't be nil")
)
// NewSignedBeaconBlock creates a signed beacon block from a protobuf signed beacon block.

View File

@@ -17,12 +17,6 @@ func BeaconBlockIsNil(b interfaces.SignedBeaconBlock) error {
if b == nil || b.IsNil() {
return ErrNilSignedBeaconBlock
}
if b.Block().IsNil() {
return errNilBeaconBlock
}
if b.Block().Body().IsNil() {
return errNilBeaconBlockBody
}
return nil
}

2
go.mod
View File

@@ -84,6 +84,7 @@ require (
go.etcd.io/bbolt v1.3.5
go.opencensus.io v0.23.0
go.uber.org/automaxprocs v1.3.0
go.uber.org/multierr v1.8.0
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d
golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4
@@ -221,7 +222,6 @@ require (
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect

View File

@@ -75,6 +75,9 @@ func handleConditionalExpression(exp *ast.BinaryExpr, pass *analysis.Pass) {
return
}
for _, iface := range selectedInterfaces {
if strings.HasPrefix(typeMap[identX].Type.String(), "func") {
return // Ignore functions as they are not interfaces.
}
xIsIface := strings.Contains(typeMap[identX].Type.String(), iface)
xIsNil := typeMap[identX].IsNil()
yisIface := strings.Contains(typeMap[identY].Type.String(), iface)
@@ -94,5 +97,6 @@ func handleConditionalExpression(exp *ast.BinaryExpr, pass *analysis.Pass) {
func reportFailure(pos token.Pos, pass *analysis.Pass) {
pass.Reportf(pos, "A single nilness check is being performed on an interface"+
", this check needs another accompanying nilness check on the underlying object for the interface.")
", this check needs another accompanying nilness check on the underlying object for the interface. "+
"Please use IsNil() if it is available")
}

View File

@@ -938,7 +938,7 @@ func (v *validator) logDuties(slot types.Slot, duties []*ethpb.DutiesResponse_Du
}
for i := types.Slot(0); i < params.BeaconConfig().SlotsPerEpoch; i++ {
startTime := slots.StartTime(v.genesisTime, slotOffset+i)
durationTillDuty := time.Until(startTime)
durationTillDuty := time.Until(startTime) + time.Second
if len(attesterKeys[i]) > 0 {
log.WithFields(logrus.Fields{