Compare commits

...

22 Commits

Author SHA1 Message Date
terence tsao
92a88e1ff2 Merge branch 'proposer-verify-attestation' of github.com:prysmaticlabs/prysm into terence 2023-08-08 14:39:39 -07:00
terence tsao
f1c6f07e87 Proposer metrics 2023-08-08 06:43:41 -07:00
terence tsao
441b7ee4b7 Merge branch 'develop' of github.com:prysmaticlabs/prysm into terence 2023-08-08 06:35:26 -07:00
terence tsao
b981477ae6 Fix(proposer): verify attestations without mutating state 2023-08-07 07:59:50 -07:00
m3diumrare
d2ff995eb2 Fix reported effective balance for unknown/pending validators (#12693)
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2023-08-07 09:19:02 -05:00
anukul
56a0315dde fix: set CoreService in beaconv1alpha1.Server (#12702) 2023-08-06 18:16:54 -07:00
terence tsao
535c1cb5c3 Merge branch 'develop' of github.com:prysmaticlabs/prysm into terence 2023-08-05 14:01:37 -07:00
anukul
634133fedc use struct in beacon-chain/rpc/core to store dependencies (#12701) 2023-08-05 22:54:12 +02:00
terence tsao
6619063187 Merge branch 'develop' of github.com:prysmaticlabs/prysm into terence 2023-08-04 12:40:29 -07:00
terencechain
c1c1b7ecfa Feat: aggregate parallel default (#12699) 2023-08-04 16:03:10 +00:00
Nishant Das
9a4670ec64 add changes (#12697)
Co-authored-by: terencechain <terence@prysmaticlabs.com>
2023-08-04 22:05:47 +08:00
Radosław Kapka
a664a07303 Multi Value Slice (#12616)
* multi value slice

* extract helper function

* comments

* setup godoc fix

* value benchmarks

* use guid

* fix bug when deleting items

* remove callback and rename MultiValue

* godoc

* tiny change

* Nishant's review

* typos

---------

Co-authored-by: Nishant Das <nishdas93@gmail.com>
2023-08-04 12:42:54 +00:00
terence tsao
c5e0caf80c Rm unused 2023-08-03 13:32:08 -07:00
Potuz
6f3dec79a4 block when updating caches 2023-08-03 13:16:42 -07:00
Potuz
99491be4a0 update NSC together with epoch boundary caches 2023-08-03 13:16:42 -07:00
terence tsao
8f3f278edd Don't panic 2023-08-03 13:16:42 -07:00
terence tsao
e80bffb1f0 Add profile 2023-08-03 13:16:42 -07:00
terence tsao
3058bc1661 Change logs 2023-08-03 13:16:42 -07:00
terencechain
830d331c6d Remove: span for convert to indexed attestation (#12687)
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
2023-08-03 13:16:42 -07:00
Nishant Das
0960fba6d1 Minor Optimization on InnerShuffleList (#12690)
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
2023-08-03 13:16:42 -07:00
terencechain
2cec1b6699 Fix: use correct context for UpdateCommitteeCache (#12691) 2023-08-03 13:16:42 -07:00
terence tsao
439de222e2 Propose every slot and profile 2023-08-03 13:14:06 -07:00
44 changed files with 1526 additions and 170 deletions

View File

@@ -71,6 +71,7 @@ go_library(
"//consensus-types/primitives:go_default_library",
"//crypto/bls:go_default_library",
"//encoding/bytesutil:go_default_library",
"//io/file:go_default_library",
"//math:go_default_library",
"//monitoring/tracing:go_default_library",
"//proto/engine/v1:go_default_library",

View File

@@ -3,6 +3,10 @@ package blockchain
import (
"bytes"
"context"
"fmt"
"path"
"runtime/pprof"
time2 "time"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
@@ -15,6 +19,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/io/file"
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
ethpbv1 "github.com/prysmaticlabs/prysm/v4/proto/eth/v1"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
@@ -55,6 +60,11 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
return err
}
bf := bytes.NewBuffer([]byte{})
if err := pprof.StartCPUProfile(bf); err != nil {
log.WithError(err).Error("could not start cpu profile")
}
preState, err := s.getBlockPreState(ctx, blockCopy.Block())
if err != nil {
return errors.Wrap(err, "could not get block's prestate")
@@ -163,6 +173,14 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
log.WithError(err).Error("Unable to log state transition data")
}
pprof.StopCPUProfile()
if time.Since(receivedTime) > 3/2*time2.Second {
dbPath := path.Join("/home/t", fmt.Sprintf("processing_%d_%d.profile", blockCopy.Block().Slot(), time.Since(receivedTime).Milliseconds()))
if err = file.WriteFile(dbPath, bf.Bytes()); err != nil {
log.WithError(err).Error("could not write profile")
}
}
chainServiceProcessingTime.Observe(float64(time.Since(receivedTime).Milliseconds()))
return nil

View File

@@ -97,7 +97,7 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
"direction": conn.Stat().Direction,
"multiAddr": peerMultiaddrString(conn),
"activePeers": len(s.peers.Active()),
}).Debug("Peer connected")
}).Trace("Peer connected")
}
// Do not perform handshake on inbound dials.
@@ -173,7 +173,7 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
// Only log disconnections if we were fully connected.
if priorState == peers.PeerConnected {
log.WithField("activePeers", len(s.peers.Active())).Debug("Peer disconnected")
log.WithField("activePeers", len(s.peers.Active())).Trace("Peer disconnected")
}
}()
},

View File

@@ -24,6 +24,7 @@ go_library(
"//beacon-chain/operations/synccommittee:go_default_library",
"//beacon-chain/operations/voluntaryexits:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/rpc/core:go_default_library",
"//beacon-chain/rpc/eth/beacon:go_default_library",
"//beacon-chain/rpc/eth/builder:go_default_library",
"//beacon-chain/rpc/eth/debug:go_default_library",

View File

@@ -5,6 +5,7 @@ go_library(
srcs = [
"errors.go",
"log.go",
"service.go",
"validator.go",
],
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core",
@@ -20,6 +21,7 @@ go_library(
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/operations/synccommittee:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/sync:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",

View File

@@ -0,0 +1,18 @@
package core
import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
opfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
)
type Service struct {
HeadFetcher blockchain.HeadFetcher
GenesisTimeFetcher blockchain.TimeFetcher
SyncChecker sync.Checker
Broadcaster p2p.Broadcaster
SyncCommitteePool synccommittee.Pool
OperationNotifier opfeed.Notifier
}

View File

@@ -5,10 +5,8 @@ import (
"context"
"fmt"
"sort"
"time"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/epoch/precompute"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
@@ -16,8 +14,6 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
@@ -48,18 +44,21 @@ func (e *AggregateBroadcastFailedError) Error() string {
// ComputeValidatorPerformance reports the validator's latest balance along with other important metrics on
// rewards and penalties throughout its lifecycle in the beacon chain.
func ComputeValidatorPerformance(
func (s *Service) ComputeValidatorPerformance(
ctx context.Context,
req *ethpb.ValidatorPerformanceRequest,
headFetcher blockchain.HeadFetcher,
currSlot primitives.Slot,
) (*ethpb.ValidatorPerformanceResponse, *RpcError) {
headState, err := headFetcher.HeadState(ctx)
if s.SyncChecker.Syncing() {
return nil, &RpcError{Reason: Unavailable, Err: errors.New("Syncing to latest head, not ready to respond")}
}
headState, err := s.HeadFetcher.HeadState(ctx)
if err != nil {
return nil, &RpcError{Err: errors.Wrap(err, "could not get head state"), Reason: Internal}
}
currSlot := s.GenesisTimeFetcher.CurrentSlot()
if currSlot > headState.Slot() {
headRoot, err := headFetcher.HeadRoot(ctx)
headRoot, err := s.HeadFetcher.HeadRoot(ctx)
if err != nil {
return nil, &RpcError{Err: errors.Wrap(err, "could not get head root"), Reason: Internal}
}
@@ -200,21 +199,18 @@ func ComputeValidatorPerformance(
// SubmitSignedContributionAndProof is called by a sync committee aggregator
// to submit signed contribution and proof object.
func SubmitSignedContributionAndProof(
func (s *Service) SubmitSignedContributionAndProof(
ctx context.Context,
s *ethpb.SignedContributionAndProof,
broadcaster p2p.Broadcaster,
pool synccommittee.Pool,
notifier opfeed.Notifier,
req *ethpb.SignedContributionAndProof,
) *RpcError {
errs, ctx := errgroup.WithContext(ctx)
// Broadcasting and saving contribution into the pool in parallel. As one fail should not affect another.
errs.Go(func() error {
return broadcaster.Broadcast(ctx, s)
return s.Broadcaster.Broadcast(ctx, req)
})
if err := pool.SaveSyncCommitteeContribution(s.Message.Contribution); err != nil {
if err := s.SyncCommitteePool.SaveSyncCommitteeContribution(req.Message.Contribution); err != nil {
return &RpcError{Err: err, Reason: Internal}
}
@@ -224,10 +220,10 @@ func SubmitSignedContributionAndProof(
return &RpcError{Err: err, Reason: Internal}
}
notifier.OperationFeed().Send(&feed.Event{
s.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: opfeed.SyncCommitteeContributionReceived,
Data: &opfeed.SyncCommitteeContributionReceivedData{
Contribution: s,
Contribution: req,
},
})
@@ -235,11 +231,9 @@ func SubmitSignedContributionAndProof(
}
// SubmitSignedAggregateSelectionProof verifies given aggregate and proofs and publishes them on appropriate gossipsub topic.
func SubmitSignedAggregateSelectionProof(
func (s *Service) SubmitSignedAggregateSelectionProof(
ctx context.Context,
req *ethpb.SignedAggregateSubmitRequest,
genesisTime time.Time,
broadcaster p2p.Broadcaster,
) *RpcError {
if req.SignedAggregateAndProof == nil || req.SignedAggregateAndProof.Message == nil ||
req.SignedAggregateAndProof.Message.Aggregate == nil || req.SignedAggregateAndProof.Message.Aggregate.Data == nil {
@@ -253,11 +247,11 @@ func SubmitSignedAggregateSelectionProof(
// As a preventive measure, a beacon node shouldn't broadcast an attestation whose slot is out of range.
if err := helpers.ValidateAttestationTime(req.SignedAggregateAndProof.Message.Aggregate.Data.Slot,
genesisTime, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
s.GenesisTimeFetcher.GenesisTime(), params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
return &RpcError{Err: errors.New("attestation slot is no longer valid from current time"), Reason: BadRequest}
}
if err := broadcaster.Broadcast(ctx, req.SignedAggregateAndProof); err != nil {
if err := s.Broadcaster.Broadcast(ctx, req.SignedAggregateAndProof); err != nil {
return &RpcError{Err: &AggregateBroadcastFailedError{err: err}, Reason: Internal}
}

View File

@@ -67,6 +67,7 @@ go_test(
"//beacon-chain/operations/attestations:go_default_library",
"//beacon-chain/operations/synccommittee:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/rpc/core:go_default_library",
"//beacon-chain/rpc/prysm/v1alpha1/validator:go_default_library",
"//beacon-chain/rpc/testutil:go_default_library",
"//beacon-chain/state:go_default_library",

View File

@@ -105,7 +105,7 @@ func (s *Server) SubmitContributionAndProofs(w http.ResponseWriter, r *http.Requ
http2.HandleError(w, "Could not convert request contribution to consensus contribution: "+err.Error(), http.StatusBadRequest)
return
}
rpcError := core.SubmitSignedContributionAndProof(r.Context(), consensusItem, s.Broadcaster, s.SyncCommitteePool, s.OperationNotifier)
rpcError := s.CoreService.SubmitSignedContributionAndProof(r.Context(), consensusItem)
if rpcError != nil {
http2.HandleError(w, rpcError.Err.Error(), core.ErrorReasonToHTTP(rpcError.Reason))
}
@@ -131,8 +131,6 @@ func (s *Server) SubmitAggregateAndProofs(w http.ResponseWriter, r *http.Request
return
}
genesisTime := s.TimeFetcher.GenesisTime()
broadcastFailed := false
for _, item := range req.Data {
consensusItem, err := item.ToConsensus()
@@ -140,11 +138,9 @@ func (s *Server) SubmitAggregateAndProofs(w http.ResponseWriter, r *http.Request
http2.HandleError(w, "Could not convert request aggregate to consensus aggregate: "+err.Error(), http.StatusBadRequest)
return
}
rpcError := core.SubmitSignedAggregateSelectionProof(
rpcError := s.CoreService.SubmitSignedAggregateSelectionProof(
r.Context(),
&ethpbalpha.SignedAggregateSubmitRequest{SignedAggregateAndProof: consensusItem},
genesisTime,
s.Broadcaster,
)
if rpcError != nil {
_, ok := rpcError.Err.(*core.AggregateBroadcastFailedError)

View File

@@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
p2pmock "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/crypto/bls"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
@@ -287,14 +288,16 @@ func TestGetAggregateAttestation_SameSlotAndRoot_ReturnMostAggregationBits(t *te
}
func TestSubmitContributionAndProofs(t *testing.T) {
s := &Server{
c := &core.Service{
OperationNotifier: (&mockChain.ChainService{}).OperationNotifier(),
}
s := &Server{CoreService: c}
t.Run("single", func(t *testing.T) {
broadcaster := &p2pmock.MockBroadcaster{}
s.Broadcaster = broadcaster
s.SyncCommitteePool = synccommittee.NewStore()
c.Broadcaster = broadcaster
c.SyncCommitteePool = synccommittee.NewStore()
var body bytes.Buffer
_, err := body.WriteString(singleContribution)
@@ -306,15 +309,15 @@ func TestSubmitContributionAndProofs(t *testing.T) {
s.SubmitContributionAndProofs(writer, request)
assert.Equal(t, http.StatusOK, writer.Code)
assert.Equal(t, 1, len(broadcaster.BroadcastMessages))
contributions, err := s.SyncCommitteePool.SyncCommitteeContributions(1)
contributions, err := c.SyncCommitteePool.SyncCommitteeContributions(1)
require.NoError(t, err)
assert.Equal(t, 1, len(contributions))
})
t.Run("multiple", func(t *testing.T) {
broadcaster := &p2pmock.MockBroadcaster{}
s.Broadcaster = broadcaster
s.SyncCommitteePool = synccommittee.NewStore()
c.Broadcaster = broadcaster
c.SyncCommitteePool = synccommittee.NewStore()
var body bytes.Buffer
_, err := body.WriteString(multipleContributions)
@@ -326,13 +329,13 @@ func TestSubmitContributionAndProofs(t *testing.T) {
s.SubmitContributionAndProofs(writer, request)
assert.Equal(t, http.StatusOK, writer.Code)
assert.Equal(t, 2, len(broadcaster.BroadcastMessages))
contributions, err := s.SyncCommitteePool.SyncCommitteeContributions(1)
contributions, err := c.SyncCommitteePool.SyncCommitteeContributions(1)
require.NoError(t, err)
assert.Equal(t, 2, len(contributions))
})
t.Run("invalid", func(t *testing.T) {
s.SyncCommitteePool = synccommittee.NewStore()
c.SyncCommitteePool = synccommittee.NewStore()
var body bytes.Buffer
_, err := body.WriteString(invalidContribution)
@@ -349,7 +352,7 @@ func TestSubmitContributionAndProofs(t *testing.T) {
})
t.Run("no body", func(t *testing.T) {
s.SyncCommitteePool = synccommittee.NewStore()
c.SyncCommitteePool = synccommittee.NewStore()
request := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
writer := httptest.NewRecorder()
@@ -365,13 +368,17 @@ func TestSubmitContributionAndProofs(t *testing.T) {
}
func TestSubmitAggregateAndProofs(t *testing.T) {
c := &core.Service{
GenesisTimeFetcher: &mockChain.ChainService{},
}
s := &Server{
TimeFetcher: &mockChain.ChainService{},
CoreService: c,
}
t.Run("single", func(t *testing.T) {
broadcaster := &p2pmock.MockBroadcaster{}
s.Broadcaster = broadcaster
c.Broadcaster = broadcaster
var body bytes.Buffer
_, err := body.WriteString(singleAggregate)
@@ -387,8 +394,8 @@ func TestSubmitAggregateAndProofs(t *testing.T) {
t.Run("multiple", func(t *testing.T) {
broadcaster := &p2pmock.MockBroadcaster{}
s.Broadcaster = broadcaster
s.SyncCommitteePool = synccommittee.NewStore()
c.Broadcaster = broadcaster
c.SyncCommitteePool = synccommittee.NewStore()
var body bytes.Buffer
_, err := body.WriteString(multipleAggregates)

View File

@@ -9,6 +9,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/lookup"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
@@ -32,4 +33,5 @@ type Server struct {
BeaconDB db.HeadAccessDatabase
BlockBuilder builder.BlockBuilder
OperationNotifier operation.Notifier
CoreService *core.Service
}

View File

@@ -103,6 +103,7 @@ go_test(
"//beacon-chain/operations/attestations:go_default_library",
"//beacon-chain/operations/slashings:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/rpc/core:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
"//beacon-chain/state/stategen:go_default_library",

View File

@@ -17,6 +17,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/slashings"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
@@ -47,4 +48,5 @@ type Server struct {
SyncChecker sync.Checker
ReplayerBuilder stategen.ReplayerBuilder
OptimisticModeFetcher blockchain.OptimisticModeFetcher
CoreService *core.Service
}

View File

@@ -659,11 +659,7 @@ func (bs *Server) GetValidatorQueue(
func (bs *Server) GetValidatorPerformance(
ctx context.Context, req *ethpb.ValidatorPerformanceRequest,
) (*ethpb.ValidatorPerformanceResponse, error) {
if bs.SyncChecker.Syncing() {
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
currSlot := bs.GenesisTimeFetcher.CurrentSlot()
response, err := core.ComputeValidatorPerformance(ctx, req, bs.HeadFetcher, currSlot)
response, err := bs.CoreService.ComputeValidatorPerformance(ctx, req)
if err != nil {
return nil, status.Errorf(core.ErrorReasonToGRPC(err.Reason), "Could not compute validator performance: %v", err.Err)
}

View File

@@ -19,6 +19,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
dbTest "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
doublylinkedtree "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/doubly-linked-tree"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
state_native "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/state-native"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
@@ -1797,7 +1798,9 @@ func TestGetValidatorPerformance_Syncing(t *testing.T) {
ctx := context.Background()
bs := &Server{
SyncChecker: &mockSync.Sync{IsSyncing: true},
CoreService: &core.Service{
SyncChecker: &mockSync.Sync{IsSyncing: true},
},
}
wanted := "Syncing to latest head, not ready to respond"
@@ -1857,11 +1860,13 @@ func TestGetValidatorPerformance_OK(t *testing.T) {
require.NoError(t, headState.SetBalances([]uint64{100, 101, 102}))
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
bs := &Server{
HeadFetcher: &mock.ChainService{
State: headState,
CoreService: &core.Service{
HeadFetcher: &mock.ChainService{
State: headState,
},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
SyncChecker: &mockSync.Sync{IsSyncing: false},
},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
want := &ethpb.ValidatorPerformanceResponse{
PublicKeys: [][]byte{publicKey2[:], publicKey3[:]},
@@ -1918,12 +1923,14 @@ func TestGetValidatorPerformance_Indices(t *testing.T) {
require.NoError(t, headState.SetValidators(validators))
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
bs := &Server{
HeadFetcher: &mock.ChainService{
// 10 epochs into the future.
State: headState,
CoreService: &core.Service{
HeadFetcher: &mock.ChainService{
// 10 epochs into the future.
State: headState,
},
SyncChecker: &mockSync.Sync{IsSyncing: false},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
},
SyncChecker: &mockSync.Sync{IsSyncing: false},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
}
c := headState.Copy()
vp, bp, err := precompute.New(ctx, c)
@@ -1988,12 +1995,14 @@ func TestGetValidatorPerformance_IndicesPubkeys(t *testing.T) {
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
bs := &Server{
HeadFetcher: &mock.ChainService{
// 10 epochs into the future.
State: headState,
CoreService: &core.Service{
HeadFetcher: &mock.ChainService{
// 10 epochs into the future.
State: headState,
},
SyncChecker: &mockSync.Sync{IsSyncing: false},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
},
SyncChecker: &mockSync.Sync{IsSyncing: false},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
}
c := headState.Copy()
vp, bp, err := precompute.New(ctx, c)
@@ -2064,11 +2073,13 @@ func TestGetValidatorPerformanceAltair_OK(t *testing.T) {
require.NoError(t, headState.SetBalances([]uint64{100, 101, 102}))
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
bs := &Server{
HeadFetcher: &mock.ChainService{
State: headState,
CoreService: &core.Service{
HeadFetcher: &mock.ChainService{
State: headState,
},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
SyncChecker: &mockSync.Sync{IsSyncing: false},
},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
want := &ethpb.ValidatorPerformanceResponse{
PublicKeys: [][]byte{publicKey2[:], publicKey3[:]},
@@ -2132,11 +2143,13 @@ func TestGetValidatorPerformanceBellatrix_OK(t *testing.T) {
require.NoError(t, headState.SetBalances([]uint64{100, 101, 102}))
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
bs := &Server{
HeadFetcher: &mock.ChainService{
State: headState,
CoreService: &core.Service{
HeadFetcher: &mock.ChainService{
State: headState,
},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
SyncChecker: &mockSync.Sync{IsSyncing: false},
},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
want := &ethpb.ValidatorPerformanceResponse{
PublicKeys: [][]byte{publicKey2[:], publicKey3[:]},
@@ -2200,11 +2213,13 @@ func TestGetValidatorPerformanceCapella_OK(t *testing.T) {
require.NoError(t, headState.SetBalances([]uint64{100, 101, 102}))
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
bs := &Server{
HeadFetcher: &mock.ChainService{
State: headState,
CoreService: &core.Service{
HeadFetcher: &mock.ChainService{
State: headState,
},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
SyncChecker: &mockSync.Sync{IsSyncing: false},
},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
want := &ethpb.ValidatorPerformanceResponse{
PublicKeys: [][]byte{publicKey2[:], publicKey3[:]},

View File

@@ -76,6 +76,7 @@ go_library(
"//crypto/rand:go_default_library",
"//encoding/bytesutil:go_default_library",
"//encoding/ssz:go_default_library",
"//io/file:go_default_library",
"//monitoring/tracing:go_default_library",
"//network/forks:go_default_library",
"//proto/engine/v1:go_default_library",

View File

@@ -113,7 +113,7 @@ func (vs *Server) SubmitSignedAggregateSelectionProof(
ctx context.Context,
req *ethpb.SignedAggregateSubmitRequest,
) (*ethpb.SignedAggregateSubmitResponse, error) {
if err := core.SubmitSignedAggregateSelectionProof(ctx, req, vs.TimeFetcher.GenesisTime(), vs.P2P); err != nil {
if err := vs.CoreService.SubmitSignedAggregateSelectionProof(ctx, req); err != nil {
return nil, status.Errorf(core.ErrorReasonToGRPC(err.Reason), "Could not submit aggregate: %v", err.Err)
}
return &ethpb.SignedAggregateSubmitResponse{}, nil

View File

@@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations"
mockp2p "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
state_native "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/state-native"
mockSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync/testing"
@@ -431,7 +432,11 @@ func TestSubmitSignedAggregateSelectionProof_ZeroHashesSignatures(t *testing.T)
func TestSubmitSignedAggregateSelectionProof_InvalidSlot(t *testing.T) {
c := &mock.ChainService{Genesis: time.Now()}
aggregatorServer := &Server{TimeFetcher: c}
aggregatorServer := &Server{
CoreService: &core.Service{
GenesisTimeFetcher: c,
},
}
req := &ethpb.SignedAggregateSubmitRequest{
SignedAggregateAndProof: &ethpb.SignedAggregateAttestationAndProof{
Signature: []byte{'a'},

View File

@@ -1,9 +1,12 @@
package validator
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"path"
"runtime/pprof"
"strings"
"sync"
"time"
@@ -12,6 +15,8 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
emptypb "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/builder"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
@@ -25,6 +30,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/io/file"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/sirupsen/logrus"
@@ -42,6 +48,17 @@ const (
eth1dataTimeout = 2 * time.Second
)
var (
blockProposalTimeMilliseconds = promauto.NewSummary(prometheus.SummaryOpts{
Name: "block_proposal_time_milliseconds",
Help: "Total time in milliseconds to propose a block",
})
localPayloadValue = promauto.NewGauge(prometheus.GaugeOpts{
Name: "local_payload_value_gwei",
Help: "The value of the local payload",
})
)
// GetBeaconBlock is called by a proposer during its assigned slot to request a block to sign
// by passing in the slot and the signed randao reveal of the slot.
func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (*ethpb.GenericBeaconBlock, error) {
@@ -58,6 +75,12 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
"sinceSlotStartTime": time.Since(t),
}).Info("Begin building block")
bf := bytes.NewBuffer([]byte{})
startTime := time.Now()
if err := pprof.StartCPUProfile(bf); err != nil {
log.WithError(err).Error("could not start cpu profile")
}
// A syncing validator should not produce a block.
if vs.SyncChecker.Syncing() {
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
@@ -169,6 +192,16 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
"validator": sBlk.Block().ProposerIndex(),
}).Info("Finished building block")
blockProposalTimeMilliseconds.Observe(float64(time.Since(t).Milliseconds()))
pprof.StopCPUProfile()
if time.Since(startTime) > 500*time.Millisecond {
dbPath := path.Join("/home/t", fmt.Sprintf("%d_%d_proposer.profile", req.Slot, time.Since(startTime).Milliseconds()))
if err = file.WriteFile(dbPath, bf.Bytes()); err != nil {
log.WithError(err).Error("could not write profile")
}
}
pb, err := sBlk.Block().Proto()
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not convert block to proto: %v", err)

View File

@@ -6,7 +6,6 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
@@ -15,7 +14,6 @@ import (
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/attestation/aggregation"
attaggregation "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/attestation/aggregation/attestations"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"go.opencensus.io/trace"
)
@@ -83,26 +81,9 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon
func (a proposerAtts) filter(ctx context.Context, st state.BeaconState) (proposerAtts, proposerAtts) {
validAtts := make([]*ethpb.Attestation, 0, len(a))
invalidAtts := make([]*ethpb.Attestation, 0, len(a))
var attestationProcessor func(context.Context, state.BeaconState, *ethpb.Attestation) (state.BeaconState, error)
if st.Version() == version.Phase0 {
attestationProcessor = blocks.ProcessAttestationNoVerifySignature
} else if st.Version() >= version.Altair {
// Use a wrapper here, as go needs strong typing for the function signature.
attestationProcessor = func(ctx context.Context, st state.BeaconState, attestation *ethpb.Attestation) (state.BeaconState, error) {
totalBalance, err := helpers.TotalActiveBalance(st)
if err != nil {
return nil, err
}
return altair.ProcessAttestationNoVerifySignature(ctx, st, attestation, totalBalance)
}
} else {
// Exit early if there is an unknown state type.
return validAtts, invalidAtts
}
for _, att := range a {
if _, err := attestationProcessor(ctx, st, att); err == nil {
if err := blocks.VerifyAttestationNoVerifySignature(ctx, st, att); err == nil {
validAtts = append(validAtts, att)
continue
}

View File

@@ -53,6 +53,11 @@ func setExecutionData(ctx context.Context, blk interfaces.SignedBeaconBlock, loc
if localPayload == nil {
return errors.New("local payload is nil")
}
localValueGwei, err := localPayload.ValueInGwei()
if err != nil {
return errors.Wrap(err, "failed to get local payload value")
}
localPayloadValue.Set(float64(localValueGwei))
// Use local payload if builder payload is nil.
if builderPayload == nil {

View File

@@ -54,7 +54,7 @@ func (vs *Server) getLocalPayload(ctx context.Context, blk interfaces.ReadOnlyBe
slot := blk.Slot()
vIdx := blk.ProposerIndex()
headRoot := blk.ParentRoot()
proposerID, payloadId, ok := vs.ProposerSlotIndexCache.GetProposerPayloadIDs(slot, headRoot)
_, payloadId, ok := vs.ProposerSlotIndexCache.GetProposerPayloadIDs(slot, headRoot)
feeRecipient := params.BeaconConfig().DefaultFeeRecipient
recipient, err := vs.BeaconDB.FeeRecipientByValidatorID(ctx, vIdx)
switch err == nil {
@@ -76,7 +76,7 @@ func (vs *Server) getLocalPayload(ctx context.Context, blk interfaces.ReadOnlyBe
return nil, errors.Wrap(err, "could not get fee recipient in db")
}
if ok && proposerID == vIdx && payloadId != [8]byte{} { // Payload ID is cache hit. Return the cached payload ID.
if ok && payloadId != [8]byte{} { // Payload ID is cache hit. Return the cached payload ID.
var pid [8]byte
copy(pid[:], payloadId[:])
payloadIDCacheHit.Inc()

View File

@@ -23,6 +23,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/voluntaryexits"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
@@ -30,6 +31,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/network/forks"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
@@ -74,6 +76,7 @@ type Server struct {
BlockBuilder builder.BlockBuilder
BLSChangesPool blstoexec.PoolManager
ClockWaiter startup.ClockWaiter
CoreService *core.Service
}
// WaitForActivation checks if a validator public key exists in the active validator registry of the current
@@ -184,3 +187,32 @@ func (vs *Server) WaitForChainStart(_ *emptypb.Empty, stream ethpb.BeaconNodeVal
}
return stream.Send(res)
}
func (vs *Server) ProposeEverySlot() {
for vs.TimeFetcher.GenesisTime().IsZero() {
time.Sleep(time.Second)
}
genTime := vs.TimeFetcher.GenesisTime()
ticker := slots.NewSlotTicker(genTime, params.BeaconConfig().SecondsPerSlot)
for {
select {
case <-vs.Ctx.Done():
ticker.Done()
return
case slot := <-ticker.C():
curr := time.Now()
time.Sleep(50 * time.Millisecond)
_, err := vs.GetBeaconBlock(context.Background(), &ethpb.BlockRequest{
Slot: slot,
Graffiti: make([]byte, 32),
RandaoReveal: make([]byte, 96),
})
if err != nil {
log.Error(err)
continue
}
log.Infof("Successfully produced block %d in %s", slot, time.Since(curr).String())
}
}
}

View File

@@ -120,7 +120,7 @@ func (vs *Server) GetSyncCommitteeContribution(
func (vs *Server) SubmitSignedContributionAndProof(
ctx context.Context, s *ethpb.SignedContributionAndProof,
) (*emptypb.Empty, error) {
err := core.SubmitSignedContributionAndProof(ctx, s, vs.P2P, vs.SyncCommitteePool, vs.OperationNotifier)
err := vs.CoreService.SubmitSignedContributionAndProof(ctx, s)
if err != nil {
return &emptypb.Empty{}, status.Errorf(core.ErrorReasonToGRPC(err.Reason), err.Err.Error())
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
mockp2p "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
@@ -137,9 +138,11 @@ func TestGetSyncCommitteeContribution_FiltersDuplicates(t *testing.T) {
func TestSubmitSignedContributionAndProof_OK(t *testing.T) {
server := &Server{
SyncCommitteePool: synccommittee.NewStore(),
P2P: &mockp2p.MockBroadcaster{},
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
CoreService: &core.Service{
SyncCommitteePool: synccommittee.NewStore(),
Broadcaster: &mockp2p.MockBroadcaster{},
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
},
}
contribution := &ethpb.SignedContributionAndProof{
Message: &ethpb.ContributionAndProof{
@@ -151,21 +154,23 @@ func TestSubmitSignedContributionAndProof_OK(t *testing.T) {
}
_, err := server.SubmitSignedContributionAndProof(context.Background(), contribution)
require.NoError(t, err)
savedMsgs, err := server.SyncCommitteePool.SyncCommitteeContributions(1)
savedMsgs, err := server.CoreService.SyncCommitteePool.SyncCommitteeContributions(1)
require.NoError(t, err)
require.DeepEqual(t, []*ethpb.SyncCommitteeContribution{contribution.Message.Contribution}, savedMsgs)
}
func TestSubmitSignedContributionAndProof_Notification(t *testing.T) {
server := &Server{
SyncCommitteePool: synccommittee.NewStore(),
P2P: &mockp2p.MockBroadcaster{},
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
CoreService: &core.Service{
SyncCommitteePool: synccommittee.NewStore(),
Broadcaster: &mockp2p.MockBroadcaster{},
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
},
}
// Subscribe to operation notifications.
opChannel := make(chan *feed.Event, 1024)
opSub := server.OperationNotifier.OperationFeed().Subscribe(opChannel)
opSub := server.CoreService.OperationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
contribution := &ethpb.SignedContributionAndProof{

View File

@@ -26,6 +26,7 @@ go_test(
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/core/epoch/precompute:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/rpc/core:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/sync/initial-sync/testing:go_default_library",
"//config/params:go_default_library",

View File

@@ -2,6 +2,7 @@ package validator
import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
)
@@ -11,4 +12,5 @@ type Server struct {
GenesisTimeFetcher blockchain.TimeFetcher
SyncChecker sync.Checker
HeadFetcher blockchain.HeadFetcher
CoreService *core.Service
}

View File

@@ -29,12 +29,6 @@ type ValidatorPerformanceResponse struct {
// GetValidatorPerformance is an HTTP handler for GetValidatorPerformance.
func (vs *Server) GetValidatorPerformance(w http.ResponseWriter, r *http.Request) {
if vs.SyncChecker.Syncing() {
handleHTTPError(w, "Syncing", http.StatusServiceUnavailable)
return
}
ctx := r.Context()
currSlot := vs.GenesisTimeFetcher.CurrentSlot()
var req ValidatorPerformanceRequest
if r.Body != http.NoBody {
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
@@ -42,14 +36,12 @@ func (vs *Server) GetValidatorPerformance(w http.ResponseWriter, r *http.Request
return
}
}
computed, err := core.ComputeValidatorPerformance(
ctx,
computed, err := vs.CoreService.ComputeValidatorPerformance(
r.Context(),
&ethpb.ValidatorPerformanceRequest{
PublicKeys: req.PublicKeys,
Indices: req.Indices,
},
vs.HeadFetcher,
currSlot,
)
if err != nil {
handleHTTPError(w, "Could not compute validator performance: "+err.Err.Error(), core.ErrorReasonToHTTP(err.Reason))

View File

@@ -14,6 +14,7 @@ import (
mock "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/epoch/precompute"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
mockSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync/testing"
"github.com/prysmaticlabs/prysm/v4/config/params"
@@ -28,12 +29,13 @@ import (
func TestServer_GetValidatorPerformance(t *testing.T) {
t.Run("Syncing", func(t *testing.T) {
vs := &Server{
SyncChecker: &mockSync.Sync{IsSyncing: true},
CoreService: &core.Service{
SyncChecker: &mockSync.Sync{IsSyncing: true},
},
}
var buf bytes.Buffer
srv := httptest.NewServer(http.HandlerFunc(vs.GetValidatorPerformance))
req := httptest.NewRequest("POST", "/foo", &buf)
req := httptest.NewRequest("POST", "/foo", nil)
client := &http.Client{}
rawResp, err := client.Post(srv.URL, "application/json", req.Body)
@@ -57,11 +59,13 @@ func TestServer_GetValidatorPerformance(t *testing.T) {
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
vs := &Server{
HeadFetcher: &mock.ChainService{
State: headState,
CoreService: &core.Service{
HeadFetcher: &mock.ChainService{
State: headState,
},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
SyncChecker: &mockSync.Sync{IsSyncing: false},
},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
want := &ValidatorPerformanceResponse{
PublicKeys: [][]byte{publicKeys[1][:], publicKeys[2][:]},
@@ -111,12 +115,14 @@ func TestServer_GetValidatorPerformance(t *testing.T) {
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
vs := &Server{
HeadFetcher: &mock.ChainService{
// 10 epochs into the future.
State: headState,
CoreService: &core.Service{
HeadFetcher: &mock.ChainService{
// 10 epochs into the future.
State: headState,
},
SyncChecker: &mockSync.Sync{IsSyncing: false},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
},
SyncChecker: &mockSync.Sync{IsSyncing: false},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
}
c := headState.Copy()
vp, bp, err := precompute.New(ctx, c)
@@ -174,12 +180,14 @@ func TestServer_GetValidatorPerformance(t *testing.T) {
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
vs := &Server{
HeadFetcher: &mock.ChainService{
// 10 epochs into the future.
State: headState,
CoreService: &core.Service{
HeadFetcher: &mock.ChainService{
// 10 epochs into the future.
State: headState,
},
SyncChecker: &mockSync.Sync{IsSyncing: false},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
},
SyncChecker: &mockSync.Sync{IsSyncing: false},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
}
c := headState.Copy()
vp, bp, err := precompute.New(ctx, c)
@@ -243,11 +251,13 @@ func TestServer_GetValidatorPerformance(t *testing.T) {
require.NoError(t, headState.SetBalances([]uint64{100, 101, 102}))
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
vs := &Server{
HeadFetcher: &mock.ChainService{
State: headState,
CoreService: &core.Service{
HeadFetcher: &mock.ChainService{
State: headState,
},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
SyncChecker: &mockSync.Sync{IsSyncing: false},
},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
want := &ValidatorPerformanceResponse{
PublicKeys: [][]byte{publicKeys[1][:], publicKeys[2][:]},
@@ -303,11 +313,13 @@ func TestServer_GetValidatorPerformance(t *testing.T) {
require.NoError(t, headState.SetBalances([]uint64{100, 101, 102}))
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
vs := &Server{
HeadFetcher: &mock.ChainService{
State: headState,
CoreService: &core.Service{
HeadFetcher: &mock.ChainService{
State: headState,
},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
SyncChecker: &mockSync.Sync{IsSyncing: false},
},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
want := &ValidatorPerformanceResponse{
PublicKeys: [][]byte{publicKeys[1][:], publicKeys[2][:]},
@@ -363,11 +375,13 @@ func TestServer_GetValidatorPerformance(t *testing.T) {
require.NoError(t, headState.SetBalances([]uint64{100, 101, 102}))
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
vs := &Server{
HeadFetcher: &mock.ChainService{
State: headState,
CoreService: &core.Service{
HeadFetcher: &mock.ChainService{
State: headState,
},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
SyncChecker: &mockSync.Sync{IsSyncing: false},
},
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
want := &ValidatorPerformanceResponse{
PublicKeys: [][]byte{publicKeys[1][:], publicKeys[2][:]},

View File

@@ -30,6 +30,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/voluntaryexits"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/beacon"
rpcBuilder "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/builder"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/debug"
@@ -231,6 +232,15 @@ func (s *Service) Start() {
}
s.cfg.Router.HandleFunc("/eth/v1/builder/states/{state_id}/expected_withdrawals", builderServer.ExpectedWithdrawals).Methods(http.MethodGet)
coreService := &core.Service{
HeadFetcher: s.cfg.HeadFetcher,
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
SyncChecker: s.cfg.SyncService,
Broadcaster: s.cfg.Broadcaster,
SyncCommitteePool: s.cfg.SyncCommitteeObjectPool,
OperationNotifier: s.cfg.OperationNotifier,
}
validatorServer := &validatorv1alpha1.Server{
Ctx: s.ctx,
AttestationCache: cache.NewAttestationCache(),
@@ -266,6 +276,7 @@ func (s *Service) Start() {
BlockBuilder: s.cfg.BlockBuilder,
BLSChangesPool: s.cfg.BLSChangesPool,
ClockWaiter: s.cfg.ClockWaiter,
CoreService: coreService,
}
validatorServerV1 := &validator.Server{
HeadFetcher: s.cfg.HeadFetcher,
@@ -283,6 +294,7 @@ func (s *Service) Start() {
BeaconDB: s.cfg.BeaconDB,
BlockBuilder: s.cfg.BlockBuilder,
OperationNotifier: s.cfg.OperationNotifier,
CoreService: coreService,
}
s.cfg.Router.HandleFunc("/eth/v1/validator/aggregate_attestation", validatorServerV1.GetAggregateAttestation).Methods(http.MethodGet)
@@ -354,6 +366,7 @@ func (s *Service) Start() {
ReceivedAttestationsBuffer: make(chan *ethpbv1alpha1.Attestation, attestationBufferSize),
CollectedAttestationsBuffer: make(chan []*ethpbv1alpha1.Attestation, attestationBufferSize),
ReplayerBuilder: ch,
CoreService: coreService,
}
beaconChainServerV1 := &beacon.Server{
CanonicalHistory: ch,
@@ -384,6 +397,7 @@ func (s *Service) Start() {
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
HeadFetcher: s.cfg.HeadFetcher,
SyncChecker: s.cfg.SyncService,
CoreService: coreService,
}
s.cfg.Router.HandleFunc("/prysm/validators/performance", httpServer.GetValidatorPerformance).Methods(http.MethodPost)
s.cfg.Router.HandleFunc("/eth/v2/beacon/blocks", beaconChainServerV1.PublishBlockV2).Methods(http.MethodPost)
@@ -429,6 +443,8 @@ func (s *Service) Start() {
// Register reflection service on gRPC server.
reflection.Register(s.grpcServer)
go validatorServer.ProposeEverySlot()
go func() {
if s.listener != nil {
if err := s.grpcServer.Serve(s.listener); err != nil {

View File

@@ -210,7 +210,7 @@ func (q *blocksQueue) loop() {
"epoch": slots.ToEpoch(fsm.start),
"start": fsm.start,
"error": err.Error(),
}).Debug("Can not trigger event")
}).Trace("Can not trigger event")
if errors.Is(err, errNoRequiredPeers) {
forceExit := q.exitConditions.noRequiredPeersErrRetries > noRequiredPeersErrMaxRetries
if q.mode == modeStopOnFinalizedEpoch || forceExit {

View File

@@ -189,7 +189,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
return
}
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
log.WithError(err).WithField("topic", topic).Debug("Could not decode stream message")
log.WithError(err).WithField("topic", topic).Trace("Could not decode stream message")
tracing.AnnotateError(span, err)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
return
@@ -209,7 +209,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
return
}
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
log.WithError(err).WithField("topic", topic).Debug("Could not decode stream message")
log.WithError(err).WithField("topic", topic).Trace("Could not decode stream message")
tracing.AnnotateError(span, err)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
return

View File

@@ -45,8 +45,10 @@ func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream l
return err
}
s.rateLimiter.add(stream, 1)
log := log.WithField("Reason", goodbyeMessage(*m))
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Peer has sent a goodbye message")
if goodbyeMessage(*m) != "client has too many peers" {
log := log.WithField("Reason", goodbyeMessage(*m))
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Peer has sent a goodbye message")
}
s.cfg.p2p.Peers().SetNextValidTime(stream.Conn().RemotePeer(), goodByeBackoff(*m))
// closes all streams with the peer
return s.cfg.p2p.Disconnect(stream.Conn().RemotePeer())
@@ -87,7 +89,7 @@ func (s *Service) sendGoodByeAndDisconnect(ctx context.Context, code p2ptypes.RP
log.WithFields(logrus.Fields{
"error": err,
"peer": id,
}).Debug("Could not send goodbye message to peer")
}).Trace("Could not send goodbye message to peer")
}
return s.cfg.p2p.Disconnect(id)
}

View File

@@ -18,6 +18,7 @@ type LRU[K comparable, V any] struct {
evictList *lruList[K, V]
items map[K]*entry[K, V]
onEvict EvictCallback[K, V]
getChan chan *entry[K, V]
}
// NewLRU constructs an LRU of the given size
@@ -25,13 +26,19 @@ func NewLRU[K comparable, V any](size int, onEvict EvictCallback[K, V]) (*LRU[K,
if size <= 0 {
return nil, errors.New("must provide a positive size")
}
// Initialize the channel buffer size as being 10% of the cache size.
chanSize := size / 10
c := &LRU[K, V]{
size: size,
evictList: newList[K, V](),
items: make(map[K]*entry[K, V]),
onEvict: onEvict,
getChan: make(chan *entry[K, V], chanSize),
}
// Spin off separate go-routine to handle evict list
// operations.
go c.handleGetRequests()
return c, nil
}
@@ -77,12 +84,7 @@ func (c *LRU[K, V]) Get(key K) (value V, ok bool) {
c.itemsLock.RUnlock()
// Make this get function non-blocking for multiple readers.
go func() {
c.evictListLock.Lock()
c.evictList.moveToFront(ent)
c.evictListLock.Unlock()
}()
c.getChan <- ent
return ent.value, true
}
c.itemsLock.RUnlock()
@@ -133,3 +135,12 @@ func (c *LRU[K, V]) removeElement(e *entry[K, V]) {
c.onEvict(e.key, e.value)
}
}
func (c *LRU[K, V]) handleGetRequests() {
for {
entry := <-c.getChan
c.evictListLock.Lock()
c.evictList.moveToFront(entry)
c.evictListLock.Unlock()
}
}

View File

@@ -73,7 +73,7 @@ func (c *client) registerRPCHandler(baseTopic string, handle rpcHandler) {
log.WithError(err).Debug("Could not decode goodbye stream message")
return
}
log.WithError(err).Debug("Could not decode stream message")
log.WithError(err).Trace("Could not decode stream message")
return
}
if err := handle(context.Background(), msg, stream); err != nil {
@@ -89,7 +89,7 @@ func (c *client) registerRPCHandler(baseTopic string, handle rpcHandler) {
return
}
if err := c.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
log.WithError(err).Debug("Could not decode stream message")
log.WithError(err).Trace("Could not decode stream message")
return
}
if err := handle(context.Background(), nTyp.Elem().Interface(), stream); err != nil {

View File

@@ -230,9 +230,10 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
logEnabled(disableBuildBlockParallel)
cfg.BuildBlockParallel = false
}
if ctx.IsSet(aggregateParallel.Name) {
logEnabled(aggregateParallel)
cfg.AggregateParallel = true
cfg.AggregateParallel = true
if ctx.IsSet(disableAggregateParallel.Name) {
logEnabled(disableAggregateParallel)
cfg.AggregateParallel = false
}
if ctx.IsSet(disableResourceManager.Name) {
logEnabled(disableResourceManager)

View File

@@ -38,6 +38,11 @@ var (
Usage: deprecatedUsage,
Hidden: true,
}
deprecatedAggregateParallel = &cli.BoolFlag{
Name: "aggregate-parallel",
Usage: deprecatedUsage,
Hidden: true,
}
)
// Deprecated flags for both the beacon node and validator client.
@@ -48,6 +53,7 @@ var deprecatedFlags = []cli.Flag{
deprecatedDisableGossipBatchAggregation,
deprecatedBuildBlockParallel,
deprecatedEnableRegistrationCache,
deprecatedAggregateParallel,
}
// deprecatedBeaconFlags contains flags that are still used by other components

View File

@@ -155,9 +155,9 @@ var (
Usage: "A temporary flag for disabling the validator registration cache instead of using the db. note: registrations do not clear on restart while using the db",
}
aggregateParallel = &cli.BoolFlag{
Name: "aggregate-parallel",
Usage: "Enables parallel aggregation of attestations",
disableAggregateParallel = &cli.BoolFlag{
Name: "disable-aggregate-parallel",
Usage: "Disables parallel aggregation of attestations",
}
)
@@ -212,7 +212,7 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c
aggregateThirdInterval,
disableResourceManager,
DisableRegistrationCache,
aggregateParallel,
disableAggregateParallel,
}...)...)
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.

View File

@@ -0,0 +1,23 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["multi_value_slice.go"],
importpath = "github.com/prysmaticlabs/prysm/v4/container/multi-value-slice",
visibility = ["//visibility:public"],
deps = [
"//container/multi-value-slice/interfaces:go_default_library",
"@com_github_google_uuid//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["multi_value_slice_test.go"],
embed = [":go_default_library"],
deps = [
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"@com_github_google_uuid//:go_default_library",
],
)

View File

@@ -0,0 +1,9 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["interfaces.go"],
importpath = "github.com/prysmaticlabs/prysm/v4/container/multi-value-slice/interfaces",
visibility = ["//visibility:public"],
deps = ["@com_github_google_uuid//:go_default_library"],
)

View File

@@ -0,0 +1,9 @@
package interfaces
import "github.com/google/uuid"
// Identifiable represents an object that can be uniquely identified by its Id.
type Identifiable interface {
Id() uuid.UUID
SetId(id uuid.UUID)
}

View File

@@ -0,0 +1,488 @@
// Package mvslice defines a multi value slice container. The purpose of the container is to be a replacement for a slice
// in scenarios where many objects of the same type share a copy of an identical or nearly identical slice.
// In such case using the multi value slice should result in less memory allocation because many values of the slice can be shared between objects.
//
// The multi value slice should be initialized by calling the Init function and passing the initial values of the slice.
// After initializing the slice, it can be shared between object by using the Copy function.
// Note that simply assigning the same multi value slice to several objects is not enough for it to work properly.
// Calling Copy is required in most circumstances (an exception is when the source object has only shared values).
//
// s := &Slice[int, *testObject]{}
// s.Init([]int{1, 2, 3})
// src := &testObject{id: id1, slice: s} // id1 is some UUID
// dst := &testObject{id: id2, slice: s} // id2 is some UUID
// s.Copy(src, dst)
//
// Each Value stores a value of type V along with identifiers to objects that have this value.
// A MultiValueItem is a slice of Value elements. A Slice contains shared items, individual items and appended items.
//
// You can think of a shared value as the original value (i.e. the value at the point in time when the multi value slice was constructed),
// and of an individual value as a changed value.
// There is no notion of a shared appended value because appended values never have an original value (appended values are empty when the slice is created).
//
// Whenever any of the slices functions (apart from Init) is called, the function needs to know which object it is dealing with.
// This is because if an object has an individual/appended value, the function must get/set/change this particular value instead of the shared value
// or another individual/appended value.
//
// The way appended items are stored is as follows. Lets say appended items were a regular slice that is initially empty,
// and we append an item for object0 and then append another item for object1.
// Now we have two items in the slice, but object1 only has an item in index 1. This makes things very confusing and hard to deal with.
// If we make appended items a []*Value, things dont become much better.
// It is therefore easiest to make appended items a []*MultiValueItem, which allows each object to have its own values starting at index 0
// and not having any “gaps”.
//
// The Detach function should be called when an object gets garbage collected.
// Its purpose is to clean up the slice from individual/appended values of the collected object.
// Otherwise the slice will get polluted with values for non-existing objects.
//
// Example diagram illustrating what happens after copying, updating and detaching:
//
// Create object o1 with value 10. At this point we only have a shared value.
//
// ===================
// shared | individual
// ===================
// 10 |
//
// Copy object o1 to object o2. o2 shares the value with o1, no individual value is created.
//
// ===================
// shared | individual
// ===================
// 10 |
//
// Update value of object o2 to 20. An individual value is created.
//
// ===================
// shared | individual
// ===================
// 10 | 20: [o2]
//
// Copy object o2 to object o3. The individual value's object list is updated.
//
// ===================
// shared | individual
// ===================
// 10 | 20: [o2,o3]
//
// Update value of object o3 to 30. There are two individual values now, one for o2 and one for o3.
//
// ===================
// shared | individual
// ===================
// 10 | 20: [o2]
// | 30: [o3]
//
// Update value of object o2 to 10. o2 no longer has an individual value
// because it got "reverted" to the original, shared value,
//
// ===================
// shared | individual
// ===================
// 10 | 30: [o3]
//
// Detach object o3. Individual value for o3 is removed.
//
// ===================
// shared | individual
// ===================
// 10 |
package mvslice
import (
"fmt"
"sync"
"github.com/google/uuid"
"github.com/prysmaticlabs/prysm/v4/container/multi-value-slice/interfaces"
)
// MultiValueSlice defines an abstraction over all concrete implementations of the generic Slice.
type MultiValueSlice[O interfaces.Identifiable] interface {
Len(obj O) uuid.UUID
}
// Value defines a single value along with one or more IDs that share this value.
type Value[V any] struct {
val V
ids []uuid.UUID
}
// MultiValueItem defines a collection of Value items.
type MultiValueItem[V any] struct {
Values []*Value[V]
}
// Slice is the main component of the multi-value slice data structure. It has two type parameters:
// - V comparable - the type of values stored the slice. The constraint is required
// because certain operations (e.g. updating, appending) have to compare values against each other.
// - O interfaces.Identifiable - the type of objects sharing the slice. The constraint is required
// because we need a way to compare objects against each other in order to know which objects
// values should be accessed.
type Slice[V comparable, O interfaces.Identifiable] struct {
sharedItems []V
individualItems map[uint64]*MultiValueItem[V]
appendedItems []*MultiValueItem[V]
cachedLengths map[uuid.UUID]int
lock sync.RWMutex
}
// Init initializes the slice with sensible defaults. Input values are assigned to shared items.
func (s *Slice[V, O]) Init(items []V) {
s.sharedItems = items
s.individualItems = map[uint64]*MultiValueItem[V]{}
s.appendedItems = []*MultiValueItem[V]{}
s.cachedLengths = map[uuid.UUID]int{}
}
// Len returns the number of items for the input object.
func (s *Slice[V, O]) Len(obj O) int {
s.lock.RLock()
defer s.lock.RUnlock()
l, ok := s.cachedLengths[obj.Id()]
if !ok {
return len(s.sharedItems)
}
return l
}
// Copy copies items between the source and destination.
func (s *Slice[V, O]) Copy(src O, dst O) {
s.lock.Lock()
defer s.lock.Unlock()
for _, item := range s.individualItems {
for _, v := range item.Values {
_, found := containsId(v.ids, src.Id())
if found {
v.ids = append(v.ids, dst.Id())
break
}
}
}
for _, item := range s.appendedItems {
found := false
for _, v := range item.Values {
_, found = containsId(v.ids, src.Id())
if found {
v.ids = append(v.ids, dst.Id())
break
}
}
if !found {
// This is an optimization. If we didn't find an appended item at index i,
// then all larger indices don't have an appended item for the object either.
break
}
}
srcLen, ok := s.cachedLengths[src.Id()]
if ok {
s.cachedLengths[dst.Id()] = srcLen
}
}
// Value returns all items for the input object.
func (s *Slice[V, O]) Value(obj O) []V {
s.lock.RLock()
defer s.lock.RUnlock()
l, ok := s.cachedLengths[obj.Id()]
if ok {
result := make([]V, l)
s.fillOriginalItems(obj, &result)
sharedLen := len(s.sharedItems)
for i, item := range s.appendedItems {
found := false
for _, v := range item.Values {
_, found = containsId(v.ids, obj.Id())
if found {
result[sharedLen+i] = v.val
break
}
}
if !found {
// This is an optimization. If we didn't find an appended item at index i,
// then all larger indices don't have an appended item for the object either.
return result
}
}
return result
} else {
result := make([]V, len(s.sharedItems))
s.fillOriginalItems(obj, &result)
return result
}
}
// At returns the item at the requested index for the input object.
// Appended items' indices are always larger than shared/individual items' indices.
// We first check if the index is within the length of shared items.
// If it is, then we return an individual value at that index - if it exists - or a shared value otherwise.
// If the index is beyond the length of shared values, it is an appended item and that's what gets returned.
func (s *Slice[V, O]) At(obj O, index uint64) (V, error) {
s.lock.RLock()
defer s.lock.RUnlock()
if index >= uint64(len(s.sharedItems)+len(s.appendedItems)) {
var def V
return def, fmt.Errorf("index %d out of bounds", index)
}
isOriginal := index < uint64(len(s.sharedItems))
if isOriginal {
ind, ok := s.individualItems[index]
if !ok {
return s.sharedItems[index], nil
}
for _, v := range ind.Values {
for _, id := range v.ids {
if id == obj.Id() {
return v.val, nil
}
}
}
return s.sharedItems[index], nil
} else {
item := s.appendedItems[index-uint64(len(s.sharedItems))]
for _, v := range item.Values {
for _, id := range v.ids {
if id == obj.Id() {
return v.val, nil
}
}
}
var def V
return def, fmt.Errorf("index %d out of bounds", index)
}
}
// UpdateAt updates the item at the required index for the input object to the passed in value.
func (s *Slice[V, O]) UpdateAt(obj O, index uint64, val V) error {
s.lock.Lock()
defer s.lock.Unlock()
if index >= uint64(len(s.sharedItems)+len(s.appendedItems)) {
return fmt.Errorf("index %d out of bounds", index)
}
isOriginal := index < uint64(len(s.sharedItems))
if isOriginal {
s.updateOriginalItem(obj, index, val)
return nil
}
return s.updateAppendedItem(obj, index, val)
}
// Append adds a new item to the input object.
func (s *Slice[V, O]) Append(obj O, val V) {
s.lock.Lock()
defer s.lock.Unlock()
if len(s.appendedItems) == 0 {
s.appendedItems = append(s.appendedItems, &MultiValueItem[V]{Values: []*Value[V]{{val: val, ids: []uuid.UUID{obj.Id()}}}})
s.cachedLengths[obj.Id()] = len(s.sharedItems) + 1
return
}
for _, item := range s.appendedItems {
found := false
for _, v := range item.Values {
_, found = containsId(v.ids, obj.Id())
if found {
break
}
}
if !found {
newValue := true
for _, v := range item.Values {
if v.val == val {
v.ids = append(v.ids, obj.Id())
newValue = false
break
}
}
if newValue {
item.Values = append(item.Values, &Value[V]{val: val, ids: []uuid.UUID{obj.Id()}})
}
l, ok := s.cachedLengths[obj.Id()]
if ok {
s.cachedLengths[obj.Id()] = l + 1
} else {
s.cachedLengths[obj.Id()] = len(s.sharedItems) + 1
}
return
}
}
s.appendedItems = append(s.appendedItems, &MultiValueItem[V]{Values: []*Value[V]{{val: val, ids: []uuid.UUID{obj.Id()}}}})
s.cachedLengths[obj.Id()] = s.cachedLengths[obj.Id()] + 1
}
// Detach removes the input object from the multi-value slice.
// What this means in practice is that we remove all individual and appended values for that object and clear the cached length.
func (s *Slice[V, O]) Detach(obj O) {
s.lock.Lock()
defer s.lock.Unlock()
for i, ind := range s.individualItems {
for vi, v := range ind.Values {
foundIndex, found := containsId(v.ids, obj.Id())
if found {
if len(v.ids) == 1 {
if len(ind.Values) == 1 {
delete(s.individualItems, i)
} else {
ind.Values = deleteElemFromSlice(ind.Values, vi)
}
} else {
v.ids = deleteElemFromSlice(v.ids, foundIndex)
}
break
}
}
}
for _, item := range s.appendedItems {
found := false
for vi, v := range item.Values {
var foundIndex int
foundIndex, found = containsId(v.ids, obj.Id())
if found {
if len(v.ids) == 1 {
item.Values = deleteElemFromSlice(item.Values, vi)
} else {
v.ids = deleteElemFromSlice(v.ids, foundIndex)
}
break
}
}
if !found {
// This is an optimization. If we didn't find an appended item at index i,
// then all larger indices don't have an appended item for the object either.
break
}
}
delete(s.cachedLengths, obj.Id())
}
func (s *Slice[V, O]) fillOriginalItems(obj O, items *[]V) {
for i, item := range s.sharedItems {
ind, ok := s.individualItems[uint64(i)]
if !ok {
(*items)[i] = item
} else {
found := false
for _, v := range ind.Values {
_, found = containsId(v.ids, obj.Id())
if found {
(*items)[i] = v.val
break
}
}
if !found {
(*items)[i] = item
}
}
}
}
func (s *Slice[V, O]) updateOriginalItem(obj O, index uint64, val V) {
ind, ok := s.individualItems[index]
if ok {
for mvi, v := range ind.Values {
// if we find an existing value, we remove it
foundIndex, found := containsId(v.ids, obj.Id())
if found {
if len(v.ids) == 1 {
// There is an improvement to be made here. If len(ind.Values) == 1,
// then after removing the item from the slice s.individualItems[i]
// will be a useless map entry whose value is an empty slice.
ind.Values = deleteElemFromSlice(ind.Values, mvi)
} else {
v.ids = deleteElemFromSlice(v.ids, foundIndex)
}
break
}
}
}
if val == s.sharedItems[index] {
return
}
if !ok {
s.individualItems[index] = &MultiValueItem[V]{Values: []*Value[V]{{val: val, ids: []uuid.UUID{obj.Id()}}}}
} else {
newValue := true
for _, v := range ind.Values {
if v.val == val {
v.ids = append(v.ids, obj.Id())
newValue = false
break
}
}
if newValue {
ind.Values = append(ind.Values, &Value[V]{val: val, ids: []uuid.UUID{obj.Id()}})
}
}
}
func (s *Slice[V, O]) updateAppendedItem(obj O, index uint64, val V) error {
item := s.appendedItems[index-uint64(len(s.sharedItems))]
found := false
for vi, v := range item.Values {
var foundIndex int
// if we find an existing value, we remove it
foundIndex, found = containsId(v.ids, obj.Id())
if found {
if len(v.ids) == 1 {
item.Values = deleteElemFromSlice(item.Values, vi)
} else {
v.ids = deleteElemFromSlice(v.ids, foundIndex)
}
break
}
}
if !found {
return fmt.Errorf("index %d out of bounds", index)
}
newValue := true
for _, v := range item.Values {
if v.val == val {
v.ids = append(v.ids, obj.Id())
newValue = false
break
}
}
if newValue {
item.Values = append(item.Values, &Value[V]{val: val, ids: []uuid.UUID{obj.Id()}})
}
return nil
}
func containsId(ids []uuid.UUID, wanted uuid.UUID) (int, bool) {
for i, id := range ids {
if id == wanted {
return i, true
}
}
return 0, false
}
// deleteElemFromSlice does not relocate the slice, but it also does not preserve the order of items.
// This is not a problem here because the order of values in a MultiValueItem and object IDs doesn't matter.
func deleteElemFromSlice[T any](s []T, i int) []T {
s[i] = s[len(s)-1] // Copy last element to index i.
s = s[:len(s)-1] // Truncate slice.
return s
}

View File

@@ -0,0 +1,664 @@
package mvslice
import (
"math/rand"
"testing"
"github.com/google/uuid"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"github.com/prysmaticlabs/prysm/v4/testing/require"
)
var (
id1 = uuid.New()
id2 = uuid.New()
id999 = uuid.New()
)
type testObject struct {
id uuid.UUID
slice *Slice[int, *testObject]
}
func (o *testObject) Id() uuid.UUID {
return o.id
}
func (o *testObject) SetId(id uuid.UUID) {
o.id = id
}
func TestLen(t *testing.T) {
s := &Slice[int, *testObject]{}
s.Init([]int{1, 2, 3})
id := uuid.New()
s.cachedLengths[id] = 123
t.Run("cached", func(t *testing.T) {
assert.Equal(t, 123, s.Len(&testObject{id: id}))
})
t.Run("not cached", func(t *testing.T) {
assert.Equal(t, 3, s.Len(&testObject{id: uuid.New()}))
})
}
func TestCopy(t *testing.T) {
// What we want to check:
// - shared value is copied
// - when the source object has an individual value, it is copied
// - when the source object does not have an individual value, the shared value is copied
// - when the source object has an appended value, it is copied
// - when the source object does not have an appended value, nothing is copied
// - length of destination object is cached
s := setup()
src := &testObject{id: id1, slice: s}
dst := &testObject{id: id999, slice: s}
s.Copy(src, dst)
assert.Equal(t, (*MultiValueItem[int])(nil), dst.slice.individualItems[0])
assertIndividualFound(t, s, dst.id, 1, 1)
assertIndividualFound(t, s, dst.id, 2, 3)
assertIndividualFound(t, s, dst.id, 3, 1)
assertIndividualNotFound(t, s, dst.id, 4)
assertAppendedFound(t, s, dst.id, 0, 1)
assertAppendedFound(t, s, dst.id, 1, 3)
assertAppendedNotFound(t, s, dst.id, 2)
l, ok := s.cachedLengths[id999]
require.Equal(t, true, ok)
assert.Equal(t, 7, l)
}
func TestValue(t *testing.T) {
// What we want to check:
// - correct values are returned for first object
// - correct values are returned for second object
// - correct values are returned for an object without appended items
s := setup()
first := &testObject{id: id1, slice: s}
second := &testObject{id: id2, slice: s}
v := s.Value(first)
require.Equal(t, 7, len(v))
assert.Equal(t, 123, v[0])
assert.Equal(t, 1, v[1])
assert.Equal(t, 3, v[2])
assert.Equal(t, 1, v[3])
assert.Equal(t, 123, v[4])
assert.Equal(t, 1, v[5])
assert.Equal(t, 3, v[6])
v = s.Value(second)
require.Equal(t, 8, len(v))
assert.Equal(t, 123, v[0])
assert.Equal(t, 2, v[1])
assert.Equal(t, 3, v[2])
assert.Equal(t, 123, v[3])
assert.Equal(t, 2, v[4])
assert.Equal(t, 2, v[5])
assert.Equal(t, 3, v[6])
assert.Equal(t, 2, v[7])
s = &Slice[int, *testObject]{}
s.Init([]int{1, 2, 3})
id := uuid.New()
v = s.Value(&testObject{id: id})
require.Equal(t, 3, len(v))
assert.Equal(t, 1, v[0])
assert.Equal(t, 2, v[1])
assert.Equal(t, 3, v[2])
}
func TestAt(t *testing.T) {
// What we want to check:
// - correct values are returned for first object
// - correct values are returned for second object
// - ERROR when index too large in general
// - ERROR when index not too large in general, but too large for an object
s := setup()
first := &testObject{id: id1, slice: s}
second := &testObject{id: id2, slice: s}
v, err := s.At(first, 0)
require.NoError(t, err)
assert.Equal(t, 123, v)
v, err = s.At(first, 1)
require.NoError(t, err)
assert.Equal(t, 1, v)
v, err = s.At(first, 2)
require.NoError(t, err)
assert.Equal(t, 3, v)
v, err = s.At(first, 3)
require.NoError(t, err)
assert.Equal(t, 1, v)
v, err = s.At(first, 4)
require.NoError(t, err)
assert.Equal(t, 123, v)
v, err = s.At(first, 5)
require.NoError(t, err)
assert.Equal(t, 1, v)
v, err = s.At(first, 6)
require.NoError(t, err)
assert.Equal(t, 3, v)
_, err = s.At(first, 7)
assert.ErrorContains(t, "index 7 out of bounds", err)
v, err = s.At(second, 0)
require.NoError(t, err)
assert.Equal(t, 123, v)
v, err = s.At(second, 1)
require.NoError(t, err)
assert.Equal(t, 2, v)
v, err = s.At(second, 2)
require.NoError(t, err)
assert.Equal(t, 3, v)
v, err = s.At(second, 3)
require.NoError(t, err)
assert.Equal(t, 123, v)
v, err = s.At(second, 4)
require.NoError(t, err)
assert.Equal(t, 2, v)
v, err = s.At(second, 5)
require.NoError(t, err)
assert.Equal(t, 2, v)
v, err = s.At(second, 6)
require.NoError(t, err)
assert.Equal(t, 3, v)
v, err = s.At(second, 7)
require.NoError(t, err)
assert.Equal(t, 2, v)
_, err = s.At(second, 8)
assert.ErrorContains(t, "index 8 out of bounds", err)
}
func TestUpdateAt(t *testing.T) {
// What we want to check:
// - shared value is updated only for the updated object, creating a new individual value (shared value remains the same)
// - individual value (different for both objects) is updated to a third value
// - individual value (different for both objects) is updated to the other object's value
// - individual value (equal for both objects) is updated
// - individual value existing only for the updated object is updated
// - individual value existing only for the other-object appends an item to the individual value
// - individual value updated to the original shared value removes that individual value
// - appended value (different for both objects) is updated to a third value
// - appended value (different for both objects) is updated to the other object's value
// - appended value (equal for both objects) is updated
// - appended value existing for one object is updated
// - ERROR when index too large in general
// - ERROR when index not too large in general, but too large for an object
s := setup()
first := &testObject{id: id1, slice: s}
second := &testObject{id: id2, slice: s}
require.NoError(t, s.UpdateAt(first, 0, 999))
assert.Equal(t, 123, s.sharedItems[0])
assertIndividualFound(t, s, first.id, 0, 999)
assertIndividualNotFound(t, s, second.id, 0)
require.NoError(t, s.UpdateAt(first, 1, 999))
assertIndividualFound(t, s, first.id, 1, 999)
assertIndividualFound(t, s, second.id, 1, 2)
require.NoError(t, s.UpdateAt(first, 1, 2))
assertIndividualFound(t, s, first.id, 1, 2)
assertIndividualFound(t, s, second.id, 1, 2)
require.NoError(t, s.UpdateAt(first, 2, 999))
assertIndividualFound(t, s, first.id, 2, 999)
assertIndividualFound(t, s, second.id, 2, 3)
require.NoError(t, s.UpdateAt(first, 3, 999))
assertIndividualFound(t, s, first.id, 3, 999)
assertIndividualNotFound(t, s, second.id, 3)
require.NoError(t, s.UpdateAt(first, 4, 999))
assertIndividualFound(t, s, first.id, 4, 999)
assertIndividualFound(t, s, second.id, 4, 2)
require.NoError(t, s.UpdateAt(first, 4, 123))
assertIndividualNotFound(t, s, first.id, 4)
assertIndividualFound(t, s, second.id, 4, 2)
require.NoError(t, s.UpdateAt(first, 5, 999))
assertAppendedFound(t, s, first.id, 0, 999)
assertAppendedFound(t, s, second.id, 0, 2)
require.NoError(t, s.UpdateAt(first, 5, 2))
assertAppendedFound(t, s, first.id, 0, 2)
assertAppendedFound(t, s, second.id, 0, 2)
require.NoError(t, s.UpdateAt(first, 6, 999))
assertAppendedFound(t, s, first.id, 1, 999)
assertAppendedFound(t, s, second.id, 1, 3)
// we update the second object because there are no more appended items for the first object
require.NoError(t, s.UpdateAt(second, 7, 999))
assertAppendedNotFound(t, s, first.id, 2)
assertAppendedFound(t, s, second.id, 2, 999)
assert.ErrorContains(t, "index 7 out of bounds", s.UpdateAt(first, 7, 999))
assert.ErrorContains(t, "index 8 out of bounds", s.UpdateAt(second, 8, 999))
}
func TestAppend(t *testing.T) {
// What we want to check:
// - appending first item ever to the slice
// - appending an item to an object when there is no corresponding item for the other object
// - appending an item to an object when there is a corresponding item with same value for the other object
// - appending an item to an object when there is a corresponding item with different value for the other object
// - we also want to check that cached length is properly updated after every append
// we want to start with the simplest slice possible
s := &Slice[int, *testObject]{}
s.Init([]int{0})
first := &testObject{id: id1, slice: s}
second := &testObject{id: id2, slice: s}
// append first value ever
s.Append(first, 1)
require.Equal(t, 1, len(s.appendedItems))
assertAppendedFound(t, s, first.id, 0, 1)
assertAppendedNotFound(t, s, second.id, 0)
l, ok := s.cachedLengths[first.id]
require.Equal(t, true, ok)
assert.Equal(t, 2, l)
_, ok = s.cachedLengths[second.id]
assert.Equal(t, false, ok)
// append one more value to the first object, so that we can test two append scenarios for the second object
s.Append(first, 1)
// append the first value to the second object, equal to the value for the first object
s.Append(second, 1)
require.Equal(t, 2, len(s.appendedItems))
assertAppendedFound(t, s, first.id, 0, 1)
assertAppendedFound(t, s, second.id, 0, 1)
l, ok = s.cachedLengths[first.id]
require.Equal(t, true, ok)
assert.Equal(t, 3, l)
l, ok = s.cachedLengths[second.id]
assert.Equal(t, true, ok)
assert.Equal(t, 2, l)
// append the first value to the second object, different than the value for the first object
s.Append(second, 2)
require.Equal(t, 2, len(s.appendedItems))
assertAppendedFound(t, s, first.id, 1, 1)
assertAppendedFound(t, s, second.id, 1, 2)
l, ok = s.cachedLengths[first.id]
require.Equal(t, true, ok)
assert.Equal(t, 3, l)
l, ok = s.cachedLengths[second.id]
assert.Equal(t, true, ok)
assert.Equal(t, 3, l)
}
func TestDetach(t *testing.T) {
// What we want to check:
// - no individual or appended items left after detaching an object
// - length removed from cache
s := setup()
obj := &testObject{id: id1, slice: s}
s.Detach(obj)
for _, item := range s.individualItems {
found := false
for _, v := range item.Values {
for _, o := range v.ids {
if o == obj.id {
found = true
}
}
}
assert.Equal(t, false, found)
}
for _, item := range s.appendedItems {
found := false
for _, v := range item.Values {
for _, o := range v.ids {
if o == obj.id {
found = true
}
}
}
assert.Equal(t, false, found)
}
_, ok := s.cachedLengths[obj.id]
assert.Equal(t, false, ok)
}
// Share the slice between 2 objects.
// Index 0: Shared value
// Index 1: Different individual value
// Index 2: Same individual value
// Index 3: Individual value ONLY for the first object
// Index 4: Individual value ONLY for the second object
// Index 5: Different appended value
// Index 6: Same appended value
// Index 7: Appended value ONLY for the second object
func setup() *Slice[int, *testObject] {
s := &Slice[int, *testObject]{}
s.Init([]int{123, 123, 123, 123, 123})
s.individualItems[1] = &MultiValueItem[int]{
Values: []*Value[int]{
{
val: 1,
ids: []uuid.UUID{id1},
},
{
val: 2,
ids: []uuid.UUID{id2},
},
},
}
s.individualItems[2] = &MultiValueItem[int]{
Values: []*Value[int]{
{
val: 3,
ids: []uuid.UUID{id1, id2},
},
},
}
s.individualItems[3] = &MultiValueItem[int]{
Values: []*Value[int]{
{
val: 1,
ids: []uuid.UUID{id1},
},
},
}
s.individualItems[4] = &MultiValueItem[int]{
Values: []*Value[int]{
{
val: 2,
ids: []uuid.UUID{id2},
},
},
}
s.appendedItems = []*MultiValueItem[int]{
{
Values: []*Value[int]{
{
val: 1,
ids: []uuid.UUID{id1},
},
{
val: 2,
ids: []uuid.UUID{id2},
},
},
},
{
Values: []*Value[int]{
{
val: 3,
ids: []uuid.UUID{id1, id2},
},
},
},
{
Values: []*Value[int]{
{
val: 2,
ids: []uuid.UUID{id2},
},
},
},
}
s.cachedLengths[id1] = 7
s.cachedLengths[id2] = 8
return s
}
func assertIndividualFound(t *testing.T, slice *Slice[int, *testObject], id uuid.UUID, itemIndex uint64, expected int) {
found := false
for _, v := range slice.individualItems[itemIndex].Values {
for _, o := range v.ids {
if o == id {
found = true
assert.Equal(t, expected, v.val)
}
}
}
assert.Equal(t, true, found)
}
func assertIndividualNotFound(t *testing.T, slice *Slice[int, *testObject], id uuid.UUID, itemIndex uint64) {
found := false
for _, v := range slice.individualItems[itemIndex].Values {
for _, o := range v.ids {
if o == id {
found = true
}
}
}
assert.Equal(t, false, found)
}
func assertAppendedFound(t *testing.T, slice *Slice[int, *testObject], id uuid.UUID, itemIndex uint64, expected int) {
found := false
for _, v := range slice.appendedItems[itemIndex].Values {
for _, o := range v.ids {
if o == id {
found = true
assert.Equal(t, expected, v.val)
}
}
}
assert.Equal(t, true, found)
}
func assertAppendedNotFound(t *testing.T, slice *Slice[int, *testObject], id uuid.UUID, itemIndex uint64) {
found := false
for _, v := range slice.appendedItems[itemIndex].Values {
for _, o := range v.ids {
if o == id {
found = true
}
}
}
assert.Equal(t, false, found)
}
func BenchmarkValue(b *testing.B) {
const _100k = 100000
const _1m = 1000000
const _10m = 10000000
b.Run("100,000 shared items", func(b *testing.B) {
s := &Slice[int, *testObject]{}
s.Init(make([]int, _100k))
for i := 0; i < b.N; i++ {
s.Value(&testObject{})
}
})
b.Run("100,000 equal individual items", func(b *testing.B) {
s := &Slice[int, *testObject]{}
s.Init(make([]int, _100k))
s.individualItems[0] = &MultiValueItem[int]{Values: []*Value[int]{{val: 999, ids: []uuid.UUID{}}}}
objs := make([]*testObject, _100k)
for i := 0; i < len(objs); i++ {
id := uuid.New()
objs[i] = &testObject{id: id, slice: s}
s.individualItems[0].Values[0].ids = append(s.individualItems[0].Values[0].ids, id)
}
for i := 0; i < b.N; i++ {
s.Value(objs[rand.Intn(_100k)])
}
})
b.Run("100,000 different individual items", func(b *testing.B) {
s := &Slice[int, *testObject]{}
s.Init(make([]int, _100k))
objs := make([]*testObject, _100k)
for i := 0; i < len(objs); i++ {
id := uuid.New()
objs[i] = &testObject{id: id, slice: s}
s.individualItems[uint64(i)] = &MultiValueItem[int]{Values: []*Value[int]{{val: i, ids: []uuid.UUID{id}}}}
}
for i := 0; i < b.N; i++ {
s.Value(objs[rand.Intn(_100k)])
}
})
b.Run("100,000 shared items and 100,000 equal appended items", func(b *testing.B) {
s := &Slice[int, *testObject]{}
s.Init(make([]int, _100k))
s.appendedItems = []*MultiValueItem[int]{{Values: []*Value[int]{{val: 999, ids: []uuid.UUID{}}}}}
objs := make([]*testObject, _100k)
for i := 0; i < len(objs); i++ {
id := uuid.New()
objs[i] = &testObject{id: id, slice: s}
s.appendedItems[0].Values[0].ids = append(s.appendedItems[0].Values[0].ids, id)
}
for i := 0; i < b.N; i++ {
s.Value(objs[rand.Intn(_100k)])
}
})
b.Run("100,000 shared items and 100,000 different appended items", func(b *testing.B) {
s := &Slice[int, *testObject]{}
s.Init(make([]int, _100k))
s.appendedItems = []*MultiValueItem[int]{}
objs := make([]*testObject, _100k)
for i := 0; i < len(objs); i++ {
id := uuid.New()
objs[i] = &testObject{id: id, slice: s}
s.appendedItems = append(s.appendedItems, &MultiValueItem[int]{Values: []*Value[int]{{val: i, ids: []uuid.UUID{id}}}})
}
for i := 0; i < b.N; i++ {
s.Value(objs[rand.Intn(_100k)])
}
})
b.Run("1,000,000 shared items", func(b *testing.B) {
s := &Slice[int, *testObject]{}
s.Init(make([]int, _1m))
for i := 0; i < b.N; i++ {
s.Value(&testObject{})
}
})
b.Run("1,000,000 equal individual items", func(b *testing.B) {
s := &Slice[int, *testObject]{}
s.Init(make([]int, _1m))
s.individualItems[0] = &MultiValueItem[int]{Values: []*Value[int]{{val: 999, ids: []uuid.UUID{}}}}
objs := make([]*testObject, _1m)
for i := 0; i < len(objs); i++ {
id := uuid.New()
objs[i] = &testObject{id: id, slice: s}
s.individualItems[0].Values[0].ids = append(s.individualItems[0].Values[0].ids, id)
}
for i := 0; i < b.N; i++ {
s.Value(objs[rand.Intn(_1m)])
}
})
b.Run("1,000,000 different individual items", func(b *testing.B) {
s := &Slice[int, *testObject]{}
s.Init(make([]int, _1m))
objs := make([]*testObject, _1m)
for i := 0; i < len(objs); i++ {
id := uuid.New()
objs[i] = &testObject{id: id, slice: s}
s.individualItems[uint64(i)] = &MultiValueItem[int]{Values: []*Value[int]{{val: i, ids: []uuid.UUID{id}}}}
}
for i := 0; i < b.N; i++ {
s.Value(objs[rand.Intn(_1m)])
}
})
b.Run("1,000,000 shared items and 1,000,000 equal appended items", func(b *testing.B) {
s := &Slice[int, *testObject]{}
s.Init(make([]int, _1m))
s.appendedItems = []*MultiValueItem[int]{{Values: []*Value[int]{{val: 999, ids: []uuid.UUID{}}}}}
objs := make([]*testObject, _1m)
for i := 0; i < len(objs); i++ {
id := uuid.New()
objs[i] = &testObject{id: id, slice: s}
s.appendedItems[0].Values[0].ids = append(s.appendedItems[0].Values[0].ids, id)
}
for i := 0; i < b.N; i++ {
s.Value(objs[rand.Intn(_1m)])
}
})
b.Run("1,000,000 shared items and 1,000,000 different appended items", func(b *testing.B) {
s := &Slice[int, *testObject]{}
s.Init(make([]int, _1m))
s.appendedItems = []*MultiValueItem[int]{}
objs := make([]*testObject, _1m)
for i := 0; i < len(objs); i++ {
id := uuid.New()
objs[i] = &testObject{id: id, slice: s}
s.appendedItems = append(s.appendedItems, &MultiValueItem[int]{Values: []*Value[int]{{val: i, ids: []uuid.UUID{id}}}})
}
for i := 0; i < b.N; i++ {
s.Value(objs[rand.Intn(_1m)])
}
})
b.Run("10,000,000 shared items", func(b *testing.B) {
s := &Slice[int, *testObject]{}
s.Init(make([]int, _10m))
for i := 0; i < b.N; i++ {
s.Value(&testObject{})
}
})
b.Run("10,000,000 equal individual items", func(b *testing.B) {
s := &Slice[int, *testObject]{}
s.Init(make([]int, _10m))
s.individualItems[0] = &MultiValueItem[int]{Values: []*Value[int]{{val: 999, ids: []uuid.UUID{}}}}
objs := make([]*testObject, _10m)
for i := 0; i < len(objs); i++ {
id := uuid.New()
objs[i] = &testObject{id: id, slice: s}
s.individualItems[0].Values[0].ids = append(s.individualItems[0].Values[0].ids, id)
}
for i := 0; i < b.N; i++ {
s.Value(objs[rand.Intn(_10m)])
}
})
b.Run("10,000,000 different individual items", func(b *testing.B) {
s := &Slice[int, *testObject]{}
s.Init(make([]int, _10m))
objs := make([]*testObject, _10m)
for i := 0; i < len(objs); i++ {
id := uuid.New()
objs[i] = &testObject{id: id, slice: s}
s.individualItems[uint64(i)] = &MultiValueItem[int]{Values: []*Value[int]{{val: i, ids: []uuid.UUID{id}}}}
}
for i := 0; i < b.N; i++ {
s.Value(objs[rand.Intn(_10m)])
}
})
b.Run("10,000,000 shared items and 10,000,000 equal appended items", func(b *testing.B) {
s := &Slice[int, *testObject]{}
s.Init(make([]int, _10m))
s.appendedItems = []*MultiValueItem[int]{{Values: []*Value[int]{{val: 999, ids: []uuid.UUID{}}}}}
objs := make([]*testObject, _10m)
for i := 0; i < len(objs); i++ {
id := uuid.New()
objs[i] = &testObject{id: id, slice: s}
s.appendedItems[0].Values[0].ids = append(s.appendedItems[0].Values[0].ids, id)
}
for i := 0; i < b.N; i++ {
s.Value(objs[rand.Intn(_10m)])
}
})
b.Run("10,000,000 shared items and 10,000,000 different appended items", func(b *testing.B) {
s := &Slice[int, *testObject]{}
s.Init(make([]int, _10m))
s.appendedItems = []*MultiValueItem[int]{}
objs := make([]*testObject, _10m)
for i := 0; i < len(objs); i++ {
id := uuid.New()
objs[i] = &testObject{id: id, slice: s}
s.appendedItems = append(s.appendedItems, &MultiValueItem[int]{Values: []*Value[int]{{val: i, ids: []uuid.UUID{id}}}})
}
for i := 0; i < b.N; i++ {
s.Value(objs[rand.Intn(_10m)])
}
})
}

View File

@@ -255,9 +255,11 @@ func (v *validator) LogValidatorGainsAndLosses(ctx context.Context, slot primiti
}
if v.emitAccountMetrics {
// There is no distinction between unknown and pending validators here.
// The balance is recorded as 0, as this metric is the effective balance of a participating validator.
for _, missingPubKey := range resp.MissingValidators {
fmtKey := fmt.Sprintf("%#x", missingPubKey)
ValidatorBalancesGaugeVec.WithLabelValues(fmtKey).Set(float64(params.BeaconConfig().MaxEffectiveBalance) / float64(params.BeaconConfig().GweiPerEth))
ValidatorBalancesGaugeVec.WithLabelValues(fmtKey).Set(0)
}
}