mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 05:47:59 -05:00
Compare commits
22 Commits
deneb-reba
...
terence
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
92a88e1ff2 | ||
|
|
f1c6f07e87 | ||
|
|
441b7ee4b7 | ||
|
|
b981477ae6 | ||
|
|
d2ff995eb2 | ||
|
|
56a0315dde | ||
|
|
535c1cb5c3 | ||
|
|
634133fedc | ||
|
|
6619063187 | ||
|
|
c1c1b7ecfa | ||
|
|
9a4670ec64 | ||
|
|
a664a07303 | ||
|
|
c5e0caf80c | ||
|
|
6f3dec79a4 | ||
|
|
99491be4a0 | ||
|
|
8f3f278edd | ||
|
|
e80bffb1f0 | ||
|
|
3058bc1661 | ||
|
|
830d331c6d | ||
|
|
0960fba6d1 | ||
|
|
2cec1b6699 | ||
|
|
439de222e2 |
@@ -71,6 +71,7 @@ go_library(
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//crypto/bls:go_default_library",
|
||||
"//encoding/bytesutil:go_default_library",
|
||||
"//io/file:go_default_library",
|
||||
"//math:go_default_library",
|
||||
"//monitoring/tracing:go_default_library",
|
||||
"//proto/engine/v1:go_default_library",
|
||||
|
||||
@@ -3,6 +3,10 @@ package blockchain
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
"runtime/pprof"
|
||||
time2 "time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
|
||||
@@ -15,6 +19,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/v4/io/file"
|
||||
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
|
||||
ethpbv1 "github.com/prysmaticlabs/prysm/v4/proto/eth/v1"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
@@ -55,6 +60,11 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
|
||||
return err
|
||||
}
|
||||
|
||||
bf := bytes.NewBuffer([]byte{})
|
||||
if err := pprof.StartCPUProfile(bf); err != nil {
|
||||
log.WithError(err).Error("could not start cpu profile")
|
||||
}
|
||||
|
||||
preState, err := s.getBlockPreState(ctx, blockCopy.Block())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get block's prestate")
|
||||
@@ -163,6 +173,14 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
|
||||
log.WithError(err).Error("Unable to log state transition data")
|
||||
}
|
||||
|
||||
pprof.StopCPUProfile()
|
||||
if time.Since(receivedTime) > 3/2*time2.Second {
|
||||
dbPath := path.Join("/home/t", fmt.Sprintf("processing_%d_%d.profile", blockCopy.Block().Slot(), time.Since(receivedTime).Milliseconds()))
|
||||
if err = file.WriteFile(dbPath, bf.Bytes()); err != nil {
|
||||
log.WithError(err).Error("could not write profile")
|
||||
}
|
||||
}
|
||||
|
||||
chainServiceProcessingTime.Observe(float64(time.Since(receivedTime).Milliseconds()))
|
||||
|
||||
return nil
|
||||
|
||||
@@ -97,7 +97,7 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
|
||||
"direction": conn.Stat().Direction,
|
||||
"multiAddr": peerMultiaddrString(conn),
|
||||
"activePeers": len(s.peers.Active()),
|
||||
}).Debug("Peer connected")
|
||||
}).Trace("Peer connected")
|
||||
}
|
||||
|
||||
// Do not perform handshake on inbound dials.
|
||||
@@ -173,7 +173,7 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
|
||||
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
|
||||
// Only log disconnections if we were fully connected.
|
||||
if priorState == peers.PeerConnected {
|
||||
log.WithField("activePeers", len(s.peers.Active())).Debug("Peer disconnected")
|
||||
log.WithField("activePeers", len(s.peers.Active())).Trace("Peer disconnected")
|
||||
}
|
||||
}()
|
||||
},
|
||||
|
||||
@@ -24,6 +24,7 @@ go_library(
|
||||
"//beacon-chain/operations/synccommittee:go_default_library",
|
||||
"//beacon-chain/operations/voluntaryexits:go_default_library",
|
||||
"//beacon-chain/p2p:go_default_library",
|
||||
"//beacon-chain/rpc/core:go_default_library",
|
||||
"//beacon-chain/rpc/eth/beacon:go_default_library",
|
||||
"//beacon-chain/rpc/eth/builder:go_default_library",
|
||||
"//beacon-chain/rpc/eth/debug:go_default_library",
|
||||
|
||||
@@ -5,6 +5,7 @@ go_library(
|
||||
srcs = [
|
||||
"errors.go",
|
||||
"log.go",
|
||||
"service.go",
|
||||
"validator.go",
|
||||
],
|
||||
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core",
|
||||
@@ -20,6 +21,7 @@ go_library(
|
||||
"//beacon-chain/core/transition:go_default_library",
|
||||
"//beacon-chain/operations/synccommittee:go_default_library",
|
||||
"//beacon-chain/p2p:go_default_library",
|
||||
"//beacon-chain/sync:go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
|
||||
18
beacon-chain/rpc/core/service.go
Normal file
18
beacon-chain/rpc/core/service.go
Normal file
@@ -0,0 +1,18 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
|
||||
opfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
HeadFetcher blockchain.HeadFetcher
|
||||
GenesisTimeFetcher blockchain.TimeFetcher
|
||||
SyncChecker sync.Checker
|
||||
Broadcaster p2p.Broadcaster
|
||||
SyncCommitteePool synccommittee.Pool
|
||||
OperationNotifier opfeed.Notifier
|
||||
}
|
||||
@@ -5,10 +5,8 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/altair"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/epoch/precompute"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
|
||||
@@ -16,8 +14,6 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
|
||||
coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
@@ -48,18 +44,21 @@ func (e *AggregateBroadcastFailedError) Error() string {
|
||||
|
||||
// ComputeValidatorPerformance reports the validator's latest balance along with other important metrics on
|
||||
// rewards and penalties throughout its lifecycle in the beacon chain.
|
||||
func ComputeValidatorPerformance(
|
||||
func (s *Service) ComputeValidatorPerformance(
|
||||
ctx context.Context,
|
||||
req *ethpb.ValidatorPerformanceRequest,
|
||||
headFetcher blockchain.HeadFetcher,
|
||||
currSlot primitives.Slot,
|
||||
) (*ethpb.ValidatorPerformanceResponse, *RpcError) {
|
||||
headState, err := headFetcher.HeadState(ctx)
|
||||
if s.SyncChecker.Syncing() {
|
||||
return nil, &RpcError{Reason: Unavailable, Err: errors.New("Syncing to latest head, not ready to respond")}
|
||||
}
|
||||
|
||||
headState, err := s.HeadFetcher.HeadState(ctx)
|
||||
if err != nil {
|
||||
return nil, &RpcError{Err: errors.Wrap(err, "could not get head state"), Reason: Internal}
|
||||
}
|
||||
currSlot := s.GenesisTimeFetcher.CurrentSlot()
|
||||
if currSlot > headState.Slot() {
|
||||
headRoot, err := headFetcher.HeadRoot(ctx)
|
||||
headRoot, err := s.HeadFetcher.HeadRoot(ctx)
|
||||
if err != nil {
|
||||
return nil, &RpcError{Err: errors.Wrap(err, "could not get head root"), Reason: Internal}
|
||||
}
|
||||
@@ -200,21 +199,18 @@ func ComputeValidatorPerformance(
|
||||
|
||||
// SubmitSignedContributionAndProof is called by a sync committee aggregator
|
||||
// to submit signed contribution and proof object.
|
||||
func SubmitSignedContributionAndProof(
|
||||
func (s *Service) SubmitSignedContributionAndProof(
|
||||
ctx context.Context,
|
||||
s *ethpb.SignedContributionAndProof,
|
||||
broadcaster p2p.Broadcaster,
|
||||
pool synccommittee.Pool,
|
||||
notifier opfeed.Notifier,
|
||||
req *ethpb.SignedContributionAndProof,
|
||||
) *RpcError {
|
||||
errs, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
// Broadcasting and saving contribution into the pool in parallel. As one fail should not affect another.
|
||||
errs.Go(func() error {
|
||||
return broadcaster.Broadcast(ctx, s)
|
||||
return s.Broadcaster.Broadcast(ctx, req)
|
||||
})
|
||||
|
||||
if err := pool.SaveSyncCommitteeContribution(s.Message.Contribution); err != nil {
|
||||
if err := s.SyncCommitteePool.SaveSyncCommitteeContribution(req.Message.Contribution); err != nil {
|
||||
return &RpcError{Err: err, Reason: Internal}
|
||||
}
|
||||
|
||||
@@ -224,10 +220,10 @@ func SubmitSignedContributionAndProof(
|
||||
return &RpcError{Err: err, Reason: Internal}
|
||||
}
|
||||
|
||||
notifier.OperationFeed().Send(&feed.Event{
|
||||
s.OperationNotifier.OperationFeed().Send(&feed.Event{
|
||||
Type: opfeed.SyncCommitteeContributionReceived,
|
||||
Data: &opfeed.SyncCommitteeContributionReceivedData{
|
||||
Contribution: s,
|
||||
Contribution: req,
|
||||
},
|
||||
})
|
||||
|
||||
@@ -235,11 +231,9 @@ func SubmitSignedContributionAndProof(
|
||||
}
|
||||
|
||||
// SubmitSignedAggregateSelectionProof verifies given aggregate and proofs and publishes them on appropriate gossipsub topic.
|
||||
func SubmitSignedAggregateSelectionProof(
|
||||
func (s *Service) SubmitSignedAggregateSelectionProof(
|
||||
ctx context.Context,
|
||||
req *ethpb.SignedAggregateSubmitRequest,
|
||||
genesisTime time.Time,
|
||||
broadcaster p2p.Broadcaster,
|
||||
) *RpcError {
|
||||
if req.SignedAggregateAndProof == nil || req.SignedAggregateAndProof.Message == nil ||
|
||||
req.SignedAggregateAndProof.Message.Aggregate == nil || req.SignedAggregateAndProof.Message.Aggregate.Data == nil {
|
||||
@@ -253,11 +247,11 @@ func SubmitSignedAggregateSelectionProof(
|
||||
|
||||
// As a preventive measure, a beacon node shouldn't broadcast an attestation whose slot is out of range.
|
||||
if err := helpers.ValidateAttestationTime(req.SignedAggregateAndProof.Message.Aggregate.Data.Slot,
|
||||
genesisTime, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
|
||||
s.GenesisTimeFetcher.GenesisTime(), params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
|
||||
return &RpcError{Err: errors.New("attestation slot is no longer valid from current time"), Reason: BadRequest}
|
||||
}
|
||||
|
||||
if err := broadcaster.Broadcast(ctx, req.SignedAggregateAndProof); err != nil {
|
||||
if err := s.Broadcaster.Broadcast(ctx, req.SignedAggregateAndProof); err != nil {
|
||||
return &RpcError{Err: &AggregateBroadcastFailedError{err: err}, Reason: Internal}
|
||||
}
|
||||
|
||||
|
||||
@@ -67,6 +67,7 @@ go_test(
|
||||
"//beacon-chain/operations/attestations:go_default_library",
|
||||
"//beacon-chain/operations/synccommittee:go_default_library",
|
||||
"//beacon-chain/p2p/testing:go_default_library",
|
||||
"//beacon-chain/rpc/core:go_default_library",
|
||||
"//beacon-chain/rpc/prysm/v1alpha1/validator:go_default_library",
|
||||
"//beacon-chain/rpc/testutil:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
|
||||
@@ -105,7 +105,7 @@ func (s *Server) SubmitContributionAndProofs(w http.ResponseWriter, r *http.Requ
|
||||
http2.HandleError(w, "Could not convert request contribution to consensus contribution: "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
rpcError := core.SubmitSignedContributionAndProof(r.Context(), consensusItem, s.Broadcaster, s.SyncCommitteePool, s.OperationNotifier)
|
||||
rpcError := s.CoreService.SubmitSignedContributionAndProof(r.Context(), consensusItem)
|
||||
if rpcError != nil {
|
||||
http2.HandleError(w, rpcError.Err.Error(), core.ErrorReasonToHTTP(rpcError.Reason))
|
||||
}
|
||||
@@ -131,8 +131,6 @@ func (s *Server) SubmitAggregateAndProofs(w http.ResponseWriter, r *http.Request
|
||||
return
|
||||
}
|
||||
|
||||
genesisTime := s.TimeFetcher.GenesisTime()
|
||||
|
||||
broadcastFailed := false
|
||||
for _, item := range req.Data {
|
||||
consensusItem, err := item.ToConsensus()
|
||||
@@ -140,11 +138,9 @@ func (s *Server) SubmitAggregateAndProofs(w http.ResponseWriter, r *http.Request
|
||||
http2.HandleError(w, "Could not convert request aggregate to consensus aggregate: "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
rpcError := core.SubmitSignedAggregateSelectionProof(
|
||||
rpcError := s.CoreService.SubmitSignedAggregateSelectionProof(
|
||||
r.Context(),
|
||||
ðpbalpha.SignedAggregateSubmitRequest{SignedAggregateAndProof: consensusItem},
|
||||
genesisTime,
|
||||
s.Broadcaster,
|
||||
)
|
||||
if rpcError != nil {
|
||||
_, ok := rpcError.Err.(*core.AggregateBroadcastFailedError)
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
|
||||
p2pmock "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/crypto/bls"
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
@@ -287,14 +288,16 @@ func TestGetAggregateAttestation_SameSlotAndRoot_ReturnMostAggregationBits(t *te
|
||||
}
|
||||
|
||||
func TestSubmitContributionAndProofs(t *testing.T) {
|
||||
s := &Server{
|
||||
c := &core.Service{
|
||||
OperationNotifier: (&mockChain.ChainService{}).OperationNotifier(),
|
||||
}
|
||||
|
||||
s := &Server{CoreService: c}
|
||||
|
||||
t.Run("single", func(t *testing.T) {
|
||||
broadcaster := &p2pmock.MockBroadcaster{}
|
||||
s.Broadcaster = broadcaster
|
||||
s.SyncCommitteePool = synccommittee.NewStore()
|
||||
c.Broadcaster = broadcaster
|
||||
c.SyncCommitteePool = synccommittee.NewStore()
|
||||
|
||||
var body bytes.Buffer
|
||||
_, err := body.WriteString(singleContribution)
|
||||
@@ -306,15 +309,15 @@ func TestSubmitContributionAndProofs(t *testing.T) {
|
||||
s.SubmitContributionAndProofs(writer, request)
|
||||
assert.Equal(t, http.StatusOK, writer.Code)
|
||||
assert.Equal(t, 1, len(broadcaster.BroadcastMessages))
|
||||
contributions, err := s.SyncCommitteePool.SyncCommitteeContributions(1)
|
||||
contributions, err := c.SyncCommitteePool.SyncCommitteeContributions(1)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, len(contributions))
|
||||
})
|
||||
|
||||
t.Run("multiple", func(t *testing.T) {
|
||||
broadcaster := &p2pmock.MockBroadcaster{}
|
||||
s.Broadcaster = broadcaster
|
||||
s.SyncCommitteePool = synccommittee.NewStore()
|
||||
c.Broadcaster = broadcaster
|
||||
c.SyncCommitteePool = synccommittee.NewStore()
|
||||
|
||||
var body bytes.Buffer
|
||||
_, err := body.WriteString(multipleContributions)
|
||||
@@ -326,13 +329,13 @@ func TestSubmitContributionAndProofs(t *testing.T) {
|
||||
s.SubmitContributionAndProofs(writer, request)
|
||||
assert.Equal(t, http.StatusOK, writer.Code)
|
||||
assert.Equal(t, 2, len(broadcaster.BroadcastMessages))
|
||||
contributions, err := s.SyncCommitteePool.SyncCommitteeContributions(1)
|
||||
contributions, err := c.SyncCommitteePool.SyncCommitteeContributions(1)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 2, len(contributions))
|
||||
})
|
||||
|
||||
t.Run("invalid", func(t *testing.T) {
|
||||
s.SyncCommitteePool = synccommittee.NewStore()
|
||||
c.SyncCommitteePool = synccommittee.NewStore()
|
||||
|
||||
var body bytes.Buffer
|
||||
_, err := body.WriteString(invalidContribution)
|
||||
@@ -349,7 +352,7 @@ func TestSubmitContributionAndProofs(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("no body", func(t *testing.T) {
|
||||
s.SyncCommitteePool = synccommittee.NewStore()
|
||||
c.SyncCommitteePool = synccommittee.NewStore()
|
||||
|
||||
request := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
|
||||
writer := httptest.NewRecorder()
|
||||
@@ -365,13 +368,17 @@ func TestSubmitContributionAndProofs(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSubmitAggregateAndProofs(t *testing.T) {
|
||||
c := &core.Service{
|
||||
GenesisTimeFetcher: &mockChain.ChainService{},
|
||||
}
|
||||
|
||||
s := &Server{
|
||||
TimeFetcher: &mockChain.ChainService{},
|
||||
CoreService: c,
|
||||
}
|
||||
|
||||
t.Run("single", func(t *testing.T) {
|
||||
broadcaster := &p2pmock.MockBroadcaster{}
|
||||
s.Broadcaster = broadcaster
|
||||
c.Broadcaster = broadcaster
|
||||
|
||||
var body bytes.Buffer
|
||||
_, err := body.WriteString(singleAggregate)
|
||||
@@ -387,8 +394,8 @@ func TestSubmitAggregateAndProofs(t *testing.T) {
|
||||
|
||||
t.Run("multiple", func(t *testing.T) {
|
||||
broadcaster := &p2pmock.MockBroadcaster{}
|
||||
s.Broadcaster = broadcaster
|
||||
s.SyncCommitteePool = synccommittee.NewStore()
|
||||
c.Broadcaster = broadcaster
|
||||
c.SyncCommitteePool = synccommittee.NewStore()
|
||||
|
||||
var body bytes.Buffer
|
||||
_, err := body.WriteString(multipleAggregates)
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/lookup"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
|
||||
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
@@ -32,4 +33,5 @@ type Server struct {
|
||||
BeaconDB db.HeadAccessDatabase
|
||||
BlockBuilder builder.BlockBuilder
|
||||
OperationNotifier operation.Notifier
|
||||
CoreService *core.Service
|
||||
}
|
||||
|
||||
@@ -103,6 +103,7 @@ go_test(
|
||||
"//beacon-chain/operations/attestations:go_default_library",
|
||||
"//beacon-chain/operations/slashings:go_default_library",
|
||||
"//beacon-chain/p2p/testing:go_default_library",
|
||||
"//beacon-chain/rpc/core:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
"//beacon-chain/state/state-native:go_default_library",
|
||||
"//beacon-chain/state/stategen:go_default_library",
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/slashings"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
@@ -47,4 +48,5 @@ type Server struct {
|
||||
SyncChecker sync.Checker
|
||||
ReplayerBuilder stategen.ReplayerBuilder
|
||||
OptimisticModeFetcher blockchain.OptimisticModeFetcher
|
||||
CoreService *core.Service
|
||||
}
|
||||
|
||||
@@ -659,11 +659,7 @@ func (bs *Server) GetValidatorQueue(
|
||||
func (bs *Server) GetValidatorPerformance(
|
||||
ctx context.Context, req *ethpb.ValidatorPerformanceRequest,
|
||||
) (*ethpb.ValidatorPerformanceResponse, error) {
|
||||
if bs.SyncChecker.Syncing() {
|
||||
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
|
||||
}
|
||||
currSlot := bs.GenesisTimeFetcher.CurrentSlot()
|
||||
response, err := core.ComputeValidatorPerformance(ctx, req, bs.HeadFetcher, currSlot)
|
||||
response, err := bs.CoreService.ComputeValidatorPerformance(ctx, req)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(core.ErrorReasonToGRPC(err.Reason), "Could not compute validator performance: %v", err.Err)
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
|
||||
dbTest "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
|
||||
doublylinkedtree "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/doubly-linked-tree"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
|
||||
state_native "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/state-native"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
|
||||
@@ -1797,7 +1798,9 @@ func TestGetValidatorPerformance_Syncing(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
bs := &Server{
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: true},
|
||||
CoreService: &core.Service{
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: true},
|
||||
},
|
||||
}
|
||||
|
||||
wanted := "Syncing to latest head, not ready to respond"
|
||||
@@ -1857,11 +1860,13 @@ func TestGetValidatorPerformance_OK(t *testing.T) {
|
||||
require.NoError(t, headState.SetBalances([]uint64{100, 101, 102}))
|
||||
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
bs := &Server{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
State: headState,
|
||||
CoreService: &core.Service{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
State: headState,
|
||||
},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
}
|
||||
want := ðpb.ValidatorPerformanceResponse{
|
||||
PublicKeys: [][]byte{publicKey2[:], publicKey3[:]},
|
||||
@@ -1918,12 +1923,14 @@ func TestGetValidatorPerformance_Indices(t *testing.T) {
|
||||
require.NoError(t, headState.SetValidators(validators))
|
||||
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
bs := &Server{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
// 10 epochs into the future.
|
||||
State: headState,
|
||||
CoreService: &core.Service{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
// 10 epochs into the future.
|
||||
State: headState,
|
||||
},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
}
|
||||
c := headState.Copy()
|
||||
vp, bp, err := precompute.New(ctx, c)
|
||||
@@ -1988,12 +1995,14 @@ func TestGetValidatorPerformance_IndicesPubkeys(t *testing.T) {
|
||||
|
||||
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
bs := &Server{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
// 10 epochs into the future.
|
||||
State: headState,
|
||||
CoreService: &core.Service{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
// 10 epochs into the future.
|
||||
State: headState,
|
||||
},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
}
|
||||
c := headState.Copy()
|
||||
vp, bp, err := precompute.New(ctx, c)
|
||||
@@ -2064,11 +2073,13 @@ func TestGetValidatorPerformanceAltair_OK(t *testing.T) {
|
||||
require.NoError(t, headState.SetBalances([]uint64{100, 101, 102}))
|
||||
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
bs := &Server{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
State: headState,
|
||||
CoreService: &core.Service{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
State: headState,
|
||||
},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
}
|
||||
want := ðpb.ValidatorPerformanceResponse{
|
||||
PublicKeys: [][]byte{publicKey2[:], publicKey3[:]},
|
||||
@@ -2132,11 +2143,13 @@ func TestGetValidatorPerformanceBellatrix_OK(t *testing.T) {
|
||||
require.NoError(t, headState.SetBalances([]uint64{100, 101, 102}))
|
||||
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
bs := &Server{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
State: headState,
|
||||
CoreService: &core.Service{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
State: headState,
|
||||
},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
}
|
||||
want := ðpb.ValidatorPerformanceResponse{
|
||||
PublicKeys: [][]byte{publicKey2[:], publicKey3[:]},
|
||||
@@ -2200,11 +2213,13 @@ func TestGetValidatorPerformanceCapella_OK(t *testing.T) {
|
||||
require.NoError(t, headState.SetBalances([]uint64{100, 101, 102}))
|
||||
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
bs := &Server{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
State: headState,
|
||||
CoreService: &core.Service{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
State: headState,
|
||||
},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
}
|
||||
want := ðpb.ValidatorPerformanceResponse{
|
||||
PublicKeys: [][]byte{publicKey2[:], publicKey3[:]},
|
||||
|
||||
@@ -76,6 +76,7 @@ go_library(
|
||||
"//crypto/rand:go_default_library",
|
||||
"//encoding/bytesutil:go_default_library",
|
||||
"//encoding/ssz:go_default_library",
|
||||
"//io/file:go_default_library",
|
||||
"//monitoring/tracing:go_default_library",
|
||||
"//network/forks:go_default_library",
|
||||
"//proto/engine/v1:go_default_library",
|
||||
|
||||
@@ -113,7 +113,7 @@ func (vs *Server) SubmitSignedAggregateSelectionProof(
|
||||
ctx context.Context,
|
||||
req *ethpb.SignedAggregateSubmitRequest,
|
||||
) (*ethpb.SignedAggregateSubmitResponse, error) {
|
||||
if err := core.SubmitSignedAggregateSelectionProof(ctx, req, vs.TimeFetcher.GenesisTime(), vs.P2P); err != nil {
|
||||
if err := vs.CoreService.SubmitSignedAggregateSelectionProof(ctx, req); err != nil {
|
||||
return nil, status.Errorf(core.ErrorReasonToGRPC(err.Reason), "Could not submit aggregate: %v", err.Err)
|
||||
}
|
||||
return ðpb.SignedAggregateSubmitResponse{}, nil
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations"
|
||||
mockp2p "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
|
||||
state_native "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/state-native"
|
||||
mockSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync/testing"
|
||||
@@ -431,7 +432,11 @@ func TestSubmitSignedAggregateSelectionProof_ZeroHashesSignatures(t *testing.T)
|
||||
|
||||
func TestSubmitSignedAggregateSelectionProof_InvalidSlot(t *testing.T) {
|
||||
c := &mock.ChainService{Genesis: time.Now()}
|
||||
aggregatorServer := &Server{TimeFetcher: c}
|
||||
aggregatorServer := &Server{
|
||||
CoreService: &core.Service{
|
||||
GenesisTimeFetcher: c,
|
||||
},
|
||||
}
|
||||
req := ðpb.SignedAggregateSubmitRequest{
|
||||
SignedAggregateAndProof: ðpb.SignedAggregateAttestationAndProof{
|
||||
Signature: []byte{'a'},
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
package validator
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"path"
|
||||
"runtime/pprof"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -12,6 +15,8 @@ import (
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
emptypb "github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/builder"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
|
||||
@@ -25,6 +30,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v4/io/file"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -42,6 +48,17 @@ const (
|
||||
eth1dataTimeout = 2 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
blockProposalTimeMilliseconds = promauto.NewSummary(prometheus.SummaryOpts{
|
||||
Name: "block_proposal_time_milliseconds",
|
||||
Help: "Total time in milliseconds to propose a block",
|
||||
})
|
||||
localPayloadValue = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "local_payload_value_gwei",
|
||||
Help: "The value of the local payload",
|
||||
})
|
||||
)
|
||||
|
||||
// GetBeaconBlock is called by a proposer during its assigned slot to request a block to sign
|
||||
// by passing in the slot and the signed randao reveal of the slot.
|
||||
func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (*ethpb.GenericBeaconBlock, error) {
|
||||
@@ -58,6 +75,12 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
|
||||
"sinceSlotStartTime": time.Since(t),
|
||||
}).Info("Begin building block")
|
||||
|
||||
bf := bytes.NewBuffer([]byte{})
|
||||
startTime := time.Now()
|
||||
if err := pprof.StartCPUProfile(bf); err != nil {
|
||||
log.WithError(err).Error("could not start cpu profile")
|
||||
}
|
||||
|
||||
// A syncing validator should not produce a block.
|
||||
if vs.SyncChecker.Syncing() {
|
||||
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
|
||||
@@ -169,6 +192,16 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
|
||||
"validator": sBlk.Block().ProposerIndex(),
|
||||
}).Info("Finished building block")
|
||||
|
||||
blockProposalTimeMilliseconds.Observe(float64(time.Since(t).Milliseconds()))
|
||||
|
||||
pprof.StopCPUProfile()
|
||||
if time.Since(startTime) > 500*time.Millisecond {
|
||||
dbPath := path.Join("/home/t", fmt.Sprintf("%d_%d_proposer.profile", req.Slot, time.Since(startTime).Milliseconds()))
|
||||
if err = file.WriteFile(dbPath, bf.Bytes()); err != nil {
|
||||
log.WithError(err).Error("could not write profile")
|
||||
}
|
||||
}
|
||||
|
||||
pb, err := sBlk.Block().Proto()
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "Could not convert block to proto: %v", err)
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/altair"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
|
||||
@@ -15,7 +14,6 @@ import (
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/attestation/aggregation"
|
||||
attaggregation "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/attestation/aggregation/attestations"
|
||||
"github.com/prysmaticlabs/prysm/v4/runtime/version"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
@@ -83,26 +81,9 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon
|
||||
func (a proposerAtts) filter(ctx context.Context, st state.BeaconState) (proposerAtts, proposerAtts) {
|
||||
validAtts := make([]*ethpb.Attestation, 0, len(a))
|
||||
invalidAtts := make([]*ethpb.Attestation, 0, len(a))
|
||||
var attestationProcessor func(context.Context, state.BeaconState, *ethpb.Attestation) (state.BeaconState, error)
|
||||
|
||||
if st.Version() == version.Phase0 {
|
||||
attestationProcessor = blocks.ProcessAttestationNoVerifySignature
|
||||
} else if st.Version() >= version.Altair {
|
||||
// Use a wrapper here, as go needs strong typing for the function signature.
|
||||
attestationProcessor = func(ctx context.Context, st state.BeaconState, attestation *ethpb.Attestation) (state.BeaconState, error) {
|
||||
totalBalance, err := helpers.TotalActiveBalance(st)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return altair.ProcessAttestationNoVerifySignature(ctx, st, attestation, totalBalance)
|
||||
}
|
||||
} else {
|
||||
// Exit early if there is an unknown state type.
|
||||
return validAtts, invalidAtts
|
||||
}
|
||||
|
||||
for _, att := range a {
|
||||
if _, err := attestationProcessor(ctx, st, att); err == nil {
|
||||
if err := blocks.VerifyAttestationNoVerifySignature(ctx, st, att); err == nil {
|
||||
validAtts = append(validAtts, att)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -53,6 +53,11 @@ func setExecutionData(ctx context.Context, blk interfaces.SignedBeaconBlock, loc
|
||||
if localPayload == nil {
|
||||
return errors.New("local payload is nil")
|
||||
}
|
||||
localValueGwei, err := localPayload.ValueInGwei()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to get local payload value")
|
||||
}
|
||||
localPayloadValue.Set(float64(localValueGwei))
|
||||
|
||||
// Use local payload if builder payload is nil.
|
||||
if builderPayload == nil {
|
||||
|
||||
@@ -54,7 +54,7 @@ func (vs *Server) getLocalPayload(ctx context.Context, blk interfaces.ReadOnlyBe
|
||||
slot := blk.Slot()
|
||||
vIdx := blk.ProposerIndex()
|
||||
headRoot := blk.ParentRoot()
|
||||
proposerID, payloadId, ok := vs.ProposerSlotIndexCache.GetProposerPayloadIDs(slot, headRoot)
|
||||
_, payloadId, ok := vs.ProposerSlotIndexCache.GetProposerPayloadIDs(slot, headRoot)
|
||||
feeRecipient := params.BeaconConfig().DefaultFeeRecipient
|
||||
recipient, err := vs.BeaconDB.FeeRecipientByValidatorID(ctx, vIdx)
|
||||
switch err == nil {
|
||||
@@ -76,7 +76,7 @@ func (vs *Server) getLocalPayload(ctx context.Context, blk interfaces.ReadOnlyBe
|
||||
return nil, errors.Wrap(err, "could not get fee recipient in db")
|
||||
}
|
||||
|
||||
if ok && proposerID == vIdx && payloadId != [8]byte{} { // Payload ID is cache hit. Return the cached payload ID.
|
||||
if ok && payloadId != [8]byte{} { // Payload ID is cache hit. Return the cached payload ID.
|
||||
var pid [8]byte
|
||||
copy(pid[:], payloadId[:])
|
||||
payloadIDCacheHit.Inc()
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/voluntaryexits"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
|
||||
@@ -30,6 +31,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/v4/network/forks"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
@@ -74,6 +76,7 @@ type Server struct {
|
||||
BlockBuilder builder.BlockBuilder
|
||||
BLSChangesPool blstoexec.PoolManager
|
||||
ClockWaiter startup.ClockWaiter
|
||||
CoreService *core.Service
|
||||
}
|
||||
|
||||
// WaitForActivation checks if a validator public key exists in the active validator registry of the current
|
||||
@@ -184,3 +187,32 @@ func (vs *Server) WaitForChainStart(_ *emptypb.Empty, stream ethpb.BeaconNodeVal
|
||||
}
|
||||
return stream.Send(res)
|
||||
}
|
||||
|
||||
func (vs *Server) ProposeEverySlot() {
|
||||
for vs.TimeFetcher.GenesisTime().IsZero() {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
genTime := vs.TimeFetcher.GenesisTime()
|
||||
|
||||
ticker := slots.NewSlotTicker(genTime, params.BeaconConfig().SecondsPerSlot)
|
||||
for {
|
||||
select {
|
||||
case <-vs.Ctx.Done():
|
||||
ticker.Done()
|
||||
return
|
||||
case slot := <-ticker.C():
|
||||
curr := time.Now()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
_, err := vs.GetBeaconBlock(context.Background(), ðpb.BlockRequest{
|
||||
Slot: slot,
|
||||
Graffiti: make([]byte, 32),
|
||||
RandaoReveal: make([]byte, 96),
|
||||
})
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
log.Infof("Successfully produced block %d in %s", slot, time.Since(curr).String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,7 +120,7 @@ func (vs *Server) GetSyncCommitteeContribution(
|
||||
func (vs *Server) SubmitSignedContributionAndProof(
|
||||
ctx context.Context, s *ethpb.SignedContributionAndProof,
|
||||
) (*emptypb.Empty, error) {
|
||||
err := core.SubmitSignedContributionAndProof(ctx, s, vs.P2P, vs.SyncCommitteePool, vs.OperationNotifier)
|
||||
err := vs.CoreService.SubmitSignedContributionAndProof(ctx, s)
|
||||
if err != nil {
|
||||
return &emptypb.Empty{}, status.Errorf(core.ErrorReasonToGRPC(err.Reason), err.Err.Error())
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
|
||||
mockp2p "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
@@ -137,9 +138,11 @@ func TestGetSyncCommitteeContribution_FiltersDuplicates(t *testing.T) {
|
||||
|
||||
func TestSubmitSignedContributionAndProof_OK(t *testing.T) {
|
||||
server := &Server{
|
||||
SyncCommitteePool: synccommittee.NewStore(),
|
||||
P2P: &mockp2p.MockBroadcaster{},
|
||||
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
|
||||
CoreService: &core.Service{
|
||||
SyncCommitteePool: synccommittee.NewStore(),
|
||||
Broadcaster: &mockp2p.MockBroadcaster{},
|
||||
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
|
||||
},
|
||||
}
|
||||
contribution := ðpb.SignedContributionAndProof{
|
||||
Message: ðpb.ContributionAndProof{
|
||||
@@ -151,21 +154,23 @@ func TestSubmitSignedContributionAndProof_OK(t *testing.T) {
|
||||
}
|
||||
_, err := server.SubmitSignedContributionAndProof(context.Background(), contribution)
|
||||
require.NoError(t, err)
|
||||
savedMsgs, err := server.SyncCommitteePool.SyncCommitteeContributions(1)
|
||||
savedMsgs, err := server.CoreService.SyncCommitteePool.SyncCommitteeContributions(1)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, []*ethpb.SyncCommitteeContribution{contribution.Message.Contribution}, savedMsgs)
|
||||
}
|
||||
|
||||
func TestSubmitSignedContributionAndProof_Notification(t *testing.T) {
|
||||
server := &Server{
|
||||
SyncCommitteePool: synccommittee.NewStore(),
|
||||
P2P: &mockp2p.MockBroadcaster{},
|
||||
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
|
||||
CoreService: &core.Service{
|
||||
SyncCommitteePool: synccommittee.NewStore(),
|
||||
Broadcaster: &mockp2p.MockBroadcaster{},
|
||||
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
|
||||
},
|
||||
}
|
||||
|
||||
// Subscribe to operation notifications.
|
||||
opChannel := make(chan *feed.Event, 1024)
|
||||
opSub := server.OperationNotifier.OperationFeed().Subscribe(opChannel)
|
||||
opSub := server.CoreService.OperationNotifier.OperationFeed().Subscribe(opChannel)
|
||||
defer opSub.Unsubscribe()
|
||||
|
||||
contribution := ðpb.SignedContributionAndProof{
|
||||
|
||||
@@ -26,6 +26,7 @@ go_test(
|
||||
"//beacon-chain/blockchain/testing:go_default_library",
|
||||
"//beacon-chain/core/epoch/precompute:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/rpc/core:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
"//beacon-chain/sync/initial-sync/testing:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
|
||||
@@ -2,6 +2,7 @@ package validator
|
||||
|
||||
import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
|
||||
)
|
||||
|
||||
@@ -11,4 +12,5 @@ type Server struct {
|
||||
GenesisTimeFetcher blockchain.TimeFetcher
|
||||
SyncChecker sync.Checker
|
||||
HeadFetcher blockchain.HeadFetcher
|
||||
CoreService *core.Service
|
||||
}
|
||||
|
||||
@@ -29,12 +29,6 @@ type ValidatorPerformanceResponse struct {
|
||||
|
||||
// GetValidatorPerformance is an HTTP handler for GetValidatorPerformance.
|
||||
func (vs *Server) GetValidatorPerformance(w http.ResponseWriter, r *http.Request) {
|
||||
if vs.SyncChecker.Syncing() {
|
||||
handleHTTPError(w, "Syncing", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
ctx := r.Context()
|
||||
currSlot := vs.GenesisTimeFetcher.CurrentSlot()
|
||||
var req ValidatorPerformanceRequest
|
||||
if r.Body != http.NoBody {
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
@@ -42,14 +36,12 @@ func (vs *Server) GetValidatorPerformance(w http.ResponseWriter, r *http.Request
|
||||
return
|
||||
}
|
||||
}
|
||||
computed, err := core.ComputeValidatorPerformance(
|
||||
ctx,
|
||||
computed, err := vs.CoreService.ComputeValidatorPerformance(
|
||||
r.Context(),
|
||||
ðpb.ValidatorPerformanceRequest{
|
||||
PublicKeys: req.PublicKeys,
|
||||
Indices: req.Indices,
|
||||
},
|
||||
vs.HeadFetcher,
|
||||
currSlot,
|
||||
)
|
||||
if err != nil {
|
||||
handleHTTPError(w, "Could not compute validator performance: "+err.Err.Error(), core.ErrorReasonToHTTP(err.Reason))
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
mock "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/epoch/precompute"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
|
||||
mockSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync/testing"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
@@ -28,12 +29,13 @@ import (
|
||||
func TestServer_GetValidatorPerformance(t *testing.T) {
|
||||
t.Run("Syncing", func(t *testing.T) {
|
||||
vs := &Server{
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: true},
|
||||
CoreService: &core.Service{
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: true},
|
||||
},
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
srv := httptest.NewServer(http.HandlerFunc(vs.GetValidatorPerformance))
|
||||
req := httptest.NewRequest("POST", "/foo", &buf)
|
||||
req := httptest.NewRequest("POST", "/foo", nil)
|
||||
|
||||
client := &http.Client{}
|
||||
rawResp, err := client.Post(srv.URL, "application/json", req.Body)
|
||||
@@ -57,11 +59,13 @@ func TestServer_GetValidatorPerformance(t *testing.T) {
|
||||
|
||||
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
vs := &Server{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
State: headState,
|
||||
CoreService: &core.Service{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
State: headState,
|
||||
},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
}
|
||||
want := &ValidatorPerformanceResponse{
|
||||
PublicKeys: [][]byte{publicKeys[1][:], publicKeys[2][:]},
|
||||
@@ -111,12 +115,14 @@ func TestServer_GetValidatorPerformance(t *testing.T) {
|
||||
|
||||
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
vs := &Server{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
// 10 epochs into the future.
|
||||
State: headState,
|
||||
CoreService: &core.Service{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
// 10 epochs into the future.
|
||||
State: headState,
|
||||
},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
}
|
||||
c := headState.Copy()
|
||||
vp, bp, err := precompute.New(ctx, c)
|
||||
@@ -174,12 +180,14 @@ func TestServer_GetValidatorPerformance(t *testing.T) {
|
||||
|
||||
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
vs := &Server{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
// 10 epochs into the future.
|
||||
State: headState,
|
||||
CoreService: &core.Service{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
// 10 epochs into the future.
|
||||
State: headState,
|
||||
},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
}
|
||||
c := headState.Copy()
|
||||
vp, bp, err := precompute.New(ctx, c)
|
||||
@@ -243,11 +251,13 @@ func TestServer_GetValidatorPerformance(t *testing.T) {
|
||||
require.NoError(t, headState.SetBalances([]uint64{100, 101, 102}))
|
||||
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
vs := &Server{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
State: headState,
|
||||
CoreService: &core.Service{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
State: headState,
|
||||
},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
}
|
||||
want := &ValidatorPerformanceResponse{
|
||||
PublicKeys: [][]byte{publicKeys[1][:], publicKeys[2][:]},
|
||||
@@ -303,11 +313,13 @@ func TestServer_GetValidatorPerformance(t *testing.T) {
|
||||
require.NoError(t, headState.SetBalances([]uint64{100, 101, 102}))
|
||||
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
vs := &Server{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
State: headState,
|
||||
CoreService: &core.Service{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
State: headState,
|
||||
},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
}
|
||||
want := &ValidatorPerformanceResponse{
|
||||
PublicKeys: [][]byte{publicKeys[1][:], publicKeys[2][:]},
|
||||
@@ -363,11 +375,13 @@ func TestServer_GetValidatorPerformance(t *testing.T) {
|
||||
require.NoError(t, headState.SetBalances([]uint64{100, 101, 102}))
|
||||
offset := int64(headState.Slot().Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
vs := &Server{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
State: headState,
|
||||
CoreService: &core.Service{
|
||||
HeadFetcher: &mock.ChainService{
|
||||
State: headState,
|
||||
},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||
}
|
||||
want := &ValidatorPerformanceResponse{
|
||||
PublicKeys: [][]byte{publicKeys[1][:], publicKeys[2][:]},
|
||||
|
||||
@@ -30,6 +30,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/voluntaryexits"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/beacon"
|
||||
rpcBuilder "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/builder"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/debug"
|
||||
@@ -231,6 +232,15 @@ func (s *Service) Start() {
|
||||
}
|
||||
s.cfg.Router.HandleFunc("/eth/v1/builder/states/{state_id}/expected_withdrawals", builderServer.ExpectedWithdrawals).Methods(http.MethodGet)
|
||||
|
||||
coreService := &core.Service{
|
||||
HeadFetcher: s.cfg.HeadFetcher,
|
||||
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
|
||||
SyncChecker: s.cfg.SyncService,
|
||||
Broadcaster: s.cfg.Broadcaster,
|
||||
SyncCommitteePool: s.cfg.SyncCommitteeObjectPool,
|
||||
OperationNotifier: s.cfg.OperationNotifier,
|
||||
}
|
||||
|
||||
validatorServer := &validatorv1alpha1.Server{
|
||||
Ctx: s.ctx,
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
@@ -266,6 +276,7 @@ func (s *Service) Start() {
|
||||
BlockBuilder: s.cfg.BlockBuilder,
|
||||
BLSChangesPool: s.cfg.BLSChangesPool,
|
||||
ClockWaiter: s.cfg.ClockWaiter,
|
||||
CoreService: coreService,
|
||||
}
|
||||
validatorServerV1 := &validator.Server{
|
||||
HeadFetcher: s.cfg.HeadFetcher,
|
||||
@@ -283,6 +294,7 @@ func (s *Service) Start() {
|
||||
BeaconDB: s.cfg.BeaconDB,
|
||||
BlockBuilder: s.cfg.BlockBuilder,
|
||||
OperationNotifier: s.cfg.OperationNotifier,
|
||||
CoreService: coreService,
|
||||
}
|
||||
|
||||
s.cfg.Router.HandleFunc("/eth/v1/validator/aggregate_attestation", validatorServerV1.GetAggregateAttestation).Methods(http.MethodGet)
|
||||
@@ -354,6 +366,7 @@ func (s *Service) Start() {
|
||||
ReceivedAttestationsBuffer: make(chan *ethpbv1alpha1.Attestation, attestationBufferSize),
|
||||
CollectedAttestationsBuffer: make(chan []*ethpbv1alpha1.Attestation, attestationBufferSize),
|
||||
ReplayerBuilder: ch,
|
||||
CoreService: coreService,
|
||||
}
|
||||
beaconChainServerV1 := &beacon.Server{
|
||||
CanonicalHistory: ch,
|
||||
@@ -384,6 +397,7 @@ func (s *Service) Start() {
|
||||
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
|
||||
HeadFetcher: s.cfg.HeadFetcher,
|
||||
SyncChecker: s.cfg.SyncService,
|
||||
CoreService: coreService,
|
||||
}
|
||||
s.cfg.Router.HandleFunc("/prysm/validators/performance", httpServer.GetValidatorPerformance).Methods(http.MethodPost)
|
||||
s.cfg.Router.HandleFunc("/eth/v2/beacon/blocks", beaconChainServerV1.PublishBlockV2).Methods(http.MethodPost)
|
||||
@@ -429,6 +443,8 @@ func (s *Service) Start() {
|
||||
// Register reflection service on gRPC server.
|
||||
reflection.Register(s.grpcServer)
|
||||
|
||||
go validatorServer.ProposeEverySlot()
|
||||
|
||||
go func() {
|
||||
if s.listener != nil {
|
||||
if err := s.grpcServer.Serve(s.listener); err != nil {
|
||||
|
||||
@@ -210,7 +210,7 @@ func (q *blocksQueue) loop() {
|
||||
"epoch": slots.ToEpoch(fsm.start),
|
||||
"start": fsm.start,
|
||||
"error": err.Error(),
|
||||
}).Debug("Can not trigger event")
|
||||
}).Trace("Can not trigger event")
|
||||
if errors.Is(err, errNoRequiredPeers) {
|
||||
forceExit := q.exitConditions.noRequiredPeersErrRetries > noRequiredPeersErrMaxRetries
|
||||
if q.mode == modeStopOnFinalizedEpoch || forceExit {
|
||||
|
||||
@@ -189,7 +189,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
|
||||
return
|
||||
}
|
||||
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
|
||||
log.WithError(err).WithField("topic", topic).Debug("Could not decode stream message")
|
||||
log.WithError(err).WithField("topic", topic).Trace("Could not decode stream message")
|
||||
tracing.AnnotateError(span, err)
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||
return
|
||||
@@ -209,7 +209,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
|
||||
return
|
||||
}
|
||||
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
|
||||
log.WithError(err).WithField("topic", topic).Debug("Could not decode stream message")
|
||||
log.WithError(err).WithField("topic", topic).Trace("Could not decode stream message")
|
||||
tracing.AnnotateError(span, err)
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||
return
|
||||
|
||||
@@ -45,8 +45,10 @@ func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream l
|
||||
return err
|
||||
}
|
||||
s.rateLimiter.add(stream, 1)
|
||||
log := log.WithField("Reason", goodbyeMessage(*m))
|
||||
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Peer has sent a goodbye message")
|
||||
if goodbyeMessage(*m) != "client has too many peers" {
|
||||
log := log.WithField("Reason", goodbyeMessage(*m))
|
||||
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Peer has sent a goodbye message")
|
||||
}
|
||||
s.cfg.p2p.Peers().SetNextValidTime(stream.Conn().RemotePeer(), goodByeBackoff(*m))
|
||||
// closes all streams with the peer
|
||||
return s.cfg.p2p.Disconnect(stream.Conn().RemotePeer())
|
||||
@@ -87,7 +89,7 @@ func (s *Service) sendGoodByeAndDisconnect(ctx context.Context, code p2ptypes.RP
|
||||
log.WithFields(logrus.Fields{
|
||||
"error": err,
|
||||
"peer": id,
|
||||
}).Debug("Could not send goodbye message to peer")
|
||||
}).Trace("Could not send goodbye message to peer")
|
||||
}
|
||||
return s.cfg.p2p.Disconnect(id)
|
||||
}
|
||||
|
||||
23
cache/nonblocking/lru.go
vendored
23
cache/nonblocking/lru.go
vendored
@@ -18,6 +18,7 @@ type LRU[K comparable, V any] struct {
|
||||
evictList *lruList[K, V]
|
||||
items map[K]*entry[K, V]
|
||||
onEvict EvictCallback[K, V]
|
||||
getChan chan *entry[K, V]
|
||||
}
|
||||
|
||||
// NewLRU constructs an LRU of the given size
|
||||
@@ -25,13 +26,19 @@ func NewLRU[K comparable, V any](size int, onEvict EvictCallback[K, V]) (*LRU[K,
|
||||
if size <= 0 {
|
||||
return nil, errors.New("must provide a positive size")
|
||||
}
|
||||
// Initialize the channel buffer size as being 10% of the cache size.
|
||||
chanSize := size / 10
|
||||
|
||||
c := &LRU[K, V]{
|
||||
size: size,
|
||||
evictList: newList[K, V](),
|
||||
items: make(map[K]*entry[K, V]),
|
||||
onEvict: onEvict,
|
||||
getChan: make(chan *entry[K, V], chanSize),
|
||||
}
|
||||
// Spin off separate go-routine to handle evict list
|
||||
// operations.
|
||||
go c.handleGetRequests()
|
||||
return c, nil
|
||||
}
|
||||
|
||||
@@ -77,12 +84,7 @@ func (c *LRU[K, V]) Get(key K) (value V, ok bool) {
|
||||
c.itemsLock.RUnlock()
|
||||
|
||||
// Make this get function non-blocking for multiple readers.
|
||||
go func() {
|
||||
c.evictListLock.Lock()
|
||||
c.evictList.moveToFront(ent)
|
||||
c.evictListLock.Unlock()
|
||||
}()
|
||||
|
||||
c.getChan <- ent
|
||||
return ent.value, true
|
||||
}
|
||||
c.itemsLock.RUnlock()
|
||||
@@ -133,3 +135,12 @@ func (c *LRU[K, V]) removeElement(e *entry[K, V]) {
|
||||
c.onEvict(e.key, e.value)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LRU[K, V]) handleGetRequests() {
|
||||
for {
|
||||
entry := <-c.getChan
|
||||
c.evictListLock.Lock()
|
||||
c.evictList.moveToFront(entry)
|
||||
c.evictListLock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,7 +73,7 @@ func (c *client) registerRPCHandler(baseTopic string, handle rpcHandler) {
|
||||
log.WithError(err).Debug("Could not decode goodbye stream message")
|
||||
return
|
||||
}
|
||||
log.WithError(err).Debug("Could not decode stream message")
|
||||
log.WithError(err).Trace("Could not decode stream message")
|
||||
return
|
||||
}
|
||||
if err := handle(context.Background(), msg, stream); err != nil {
|
||||
@@ -89,7 +89,7 @@ func (c *client) registerRPCHandler(baseTopic string, handle rpcHandler) {
|
||||
return
|
||||
}
|
||||
if err := c.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
|
||||
log.WithError(err).Debug("Could not decode stream message")
|
||||
log.WithError(err).Trace("Could not decode stream message")
|
||||
return
|
||||
}
|
||||
if err := handle(context.Background(), nTyp.Elem().Interface(), stream); err != nil {
|
||||
|
||||
@@ -230,9 +230,10 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
|
||||
logEnabled(disableBuildBlockParallel)
|
||||
cfg.BuildBlockParallel = false
|
||||
}
|
||||
if ctx.IsSet(aggregateParallel.Name) {
|
||||
logEnabled(aggregateParallel)
|
||||
cfg.AggregateParallel = true
|
||||
cfg.AggregateParallel = true
|
||||
if ctx.IsSet(disableAggregateParallel.Name) {
|
||||
logEnabled(disableAggregateParallel)
|
||||
cfg.AggregateParallel = false
|
||||
}
|
||||
if ctx.IsSet(disableResourceManager.Name) {
|
||||
logEnabled(disableResourceManager)
|
||||
|
||||
@@ -38,6 +38,11 @@ var (
|
||||
Usage: deprecatedUsage,
|
||||
Hidden: true,
|
||||
}
|
||||
deprecatedAggregateParallel = &cli.BoolFlag{
|
||||
Name: "aggregate-parallel",
|
||||
Usage: deprecatedUsage,
|
||||
Hidden: true,
|
||||
}
|
||||
)
|
||||
|
||||
// Deprecated flags for both the beacon node and validator client.
|
||||
@@ -48,6 +53,7 @@ var deprecatedFlags = []cli.Flag{
|
||||
deprecatedDisableGossipBatchAggregation,
|
||||
deprecatedBuildBlockParallel,
|
||||
deprecatedEnableRegistrationCache,
|
||||
deprecatedAggregateParallel,
|
||||
}
|
||||
|
||||
// deprecatedBeaconFlags contains flags that are still used by other components
|
||||
|
||||
@@ -155,9 +155,9 @@ var (
|
||||
Usage: "A temporary flag for disabling the validator registration cache instead of using the db. note: registrations do not clear on restart while using the db",
|
||||
}
|
||||
|
||||
aggregateParallel = &cli.BoolFlag{
|
||||
Name: "aggregate-parallel",
|
||||
Usage: "Enables parallel aggregation of attestations",
|
||||
disableAggregateParallel = &cli.BoolFlag{
|
||||
Name: "disable-aggregate-parallel",
|
||||
Usage: "Disables parallel aggregation of attestations",
|
||||
}
|
||||
)
|
||||
|
||||
@@ -212,7 +212,7 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c
|
||||
aggregateThirdInterval,
|
||||
disableResourceManager,
|
||||
DisableRegistrationCache,
|
||||
aggregateParallel,
|
||||
disableAggregateParallel,
|
||||
}...)...)
|
||||
|
||||
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.
|
||||
|
||||
23
container/multi-value-slice/BUILD.bazel
Normal file
23
container/multi-value-slice/BUILD.bazel
Normal file
@@ -0,0 +1,23 @@
|
||||
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["multi_value_slice.go"],
|
||||
importpath = "github.com/prysmaticlabs/prysm/v4/container/multi-value-slice",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//container/multi-value-slice/interfaces:go_default_library",
|
||||
"@com_github_google_uuid//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["multi_value_slice_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//testing/assert:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
"@com_github_google_uuid//:go_default_library",
|
||||
],
|
||||
)
|
||||
9
container/multi-value-slice/interfaces/BUILD.bazel
Normal file
9
container/multi-value-slice/interfaces/BUILD.bazel
Normal file
@@ -0,0 +1,9 @@
|
||||
load("@prysm//tools/go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["interfaces.go"],
|
||||
importpath = "github.com/prysmaticlabs/prysm/v4/container/multi-value-slice/interfaces",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = ["@com_github_google_uuid//:go_default_library"],
|
||||
)
|
||||
9
container/multi-value-slice/interfaces/interfaces.go
Normal file
9
container/multi-value-slice/interfaces/interfaces.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package interfaces
|
||||
|
||||
import "github.com/google/uuid"
|
||||
|
||||
// Identifiable represents an object that can be uniquely identified by its Id.
|
||||
type Identifiable interface {
|
||||
Id() uuid.UUID
|
||||
SetId(id uuid.UUID)
|
||||
}
|
||||
488
container/multi-value-slice/multi_value_slice.go
Normal file
488
container/multi-value-slice/multi_value_slice.go
Normal file
@@ -0,0 +1,488 @@
|
||||
// Package mvslice defines a multi value slice container. The purpose of the container is to be a replacement for a slice
|
||||
// in scenarios where many objects of the same type share a copy of an identical or nearly identical slice.
|
||||
// In such case using the multi value slice should result in less memory allocation because many values of the slice can be shared between objects.
|
||||
//
|
||||
// The multi value slice should be initialized by calling the Init function and passing the initial values of the slice.
|
||||
// After initializing the slice, it can be shared between object by using the Copy function.
|
||||
// Note that simply assigning the same multi value slice to several objects is not enough for it to work properly.
|
||||
// Calling Copy is required in most circumstances (an exception is when the source object has only shared values).
|
||||
//
|
||||
// s := &Slice[int, *testObject]{}
|
||||
// s.Init([]int{1, 2, 3})
|
||||
// src := &testObject{id: id1, slice: s} // id1 is some UUID
|
||||
// dst := &testObject{id: id2, slice: s} // id2 is some UUID
|
||||
// s.Copy(src, dst)
|
||||
//
|
||||
// Each Value stores a value of type V along with identifiers to objects that have this value.
|
||||
// A MultiValueItem is a slice of Value elements. A Slice contains shared items, individual items and appended items.
|
||||
//
|
||||
// You can think of a shared value as the original value (i.e. the value at the point in time when the multi value slice was constructed),
|
||||
// and of an individual value as a changed value.
|
||||
// There is no notion of a shared appended value because appended values never have an original value (appended values are empty when the slice is created).
|
||||
//
|
||||
// Whenever any of the slice’s functions (apart from Init) is called, the function needs to know which object it is dealing with.
|
||||
// This is because if an object has an individual/appended value, the function must get/set/change this particular value instead of the shared value
|
||||
// or another individual/appended value.
|
||||
//
|
||||
// The way appended items are stored is as follows. Let’s say appended items were a regular slice that is initially empty,
|
||||
// and we append an item for object0 and then append another item for object1.
|
||||
// Now we have two items in the slice, but object1 only has an item in index 1. This makes things very confusing and hard to deal with.
|
||||
// If we make appended items a []*Value, things don’t become much better.
|
||||
// It is therefore easiest to make appended items a []*MultiValueItem, which allows each object to have its own values starting at index 0
|
||||
// and not having any “gaps”.
|
||||
//
|
||||
// The Detach function should be called when an object gets garbage collected.
|
||||
// Its purpose is to clean up the slice from individual/appended values of the collected object.
|
||||
// Otherwise the slice will get polluted with values for non-existing objects.
|
||||
//
|
||||
// Example diagram illustrating what happens after copying, updating and detaching:
|
||||
//
|
||||
// Create object o1 with value 10. At this point we only have a shared value.
|
||||
//
|
||||
// ===================
|
||||
// shared | individual
|
||||
// ===================
|
||||
// 10 |
|
||||
//
|
||||
// Copy object o1 to object o2. o2 shares the value with o1, no individual value is created.
|
||||
//
|
||||
// ===================
|
||||
// shared | individual
|
||||
// ===================
|
||||
// 10 |
|
||||
//
|
||||
// Update value of object o2 to 20. An individual value is created.
|
||||
//
|
||||
// ===================
|
||||
// shared | individual
|
||||
// ===================
|
||||
// 10 | 20: [o2]
|
||||
//
|
||||
// Copy object o2 to object o3. The individual value's object list is updated.
|
||||
//
|
||||
// ===================
|
||||
// shared | individual
|
||||
// ===================
|
||||
// 10 | 20: [o2,o3]
|
||||
//
|
||||
// Update value of object o3 to 30. There are two individual values now, one for o2 and one for o3.
|
||||
//
|
||||
// ===================
|
||||
// shared | individual
|
||||
// ===================
|
||||
// 10 | 20: [o2]
|
||||
// | 30: [o3]
|
||||
//
|
||||
// Update value of object o2 to 10. o2 no longer has an individual value
|
||||
// because it got "reverted" to the original, shared value,
|
||||
//
|
||||
// ===================
|
||||
// shared | individual
|
||||
// ===================
|
||||
// 10 | 30: [o3]
|
||||
//
|
||||
// Detach object o3. Individual value for o3 is removed.
|
||||
//
|
||||
// ===================
|
||||
// shared | individual
|
||||
// ===================
|
||||
// 10 |
|
||||
package mvslice
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/prysmaticlabs/prysm/v4/container/multi-value-slice/interfaces"
|
||||
)
|
||||
|
||||
// MultiValueSlice defines an abstraction over all concrete implementations of the generic Slice.
|
||||
type MultiValueSlice[O interfaces.Identifiable] interface {
|
||||
Len(obj O) uuid.UUID
|
||||
}
|
||||
|
||||
// Value defines a single value along with one or more IDs that share this value.
|
||||
type Value[V any] struct {
|
||||
val V
|
||||
ids []uuid.UUID
|
||||
}
|
||||
|
||||
// MultiValueItem defines a collection of Value items.
|
||||
type MultiValueItem[V any] struct {
|
||||
Values []*Value[V]
|
||||
}
|
||||
|
||||
// Slice is the main component of the multi-value slice data structure. It has two type parameters:
|
||||
// - V comparable - the type of values stored the slice. The constraint is required
|
||||
// because certain operations (e.g. updating, appending) have to compare values against each other.
|
||||
// - O interfaces.Identifiable - the type of objects sharing the slice. The constraint is required
|
||||
// because we need a way to compare objects against each other in order to know which objects
|
||||
// values should be accessed.
|
||||
type Slice[V comparable, O interfaces.Identifiable] struct {
|
||||
sharedItems []V
|
||||
individualItems map[uint64]*MultiValueItem[V]
|
||||
appendedItems []*MultiValueItem[V]
|
||||
cachedLengths map[uuid.UUID]int
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
// Init initializes the slice with sensible defaults. Input values are assigned to shared items.
|
||||
func (s *Slice[V, O]) Init(items []V) {
|
||||
s.sharedItems = items
|
||||
s.individualItems = map[uint64]*MultiValueItem[V]{}
|
||||
s.appendedItems = []*MultiValueItem[V]{}
|
||||
s.cachedLengths = map[uuid.UUID]int{}
|
||||
}
|
||||
|
||||
// Len returns the number of items for the input object.
|
||||
func (s *Slice[V, O]) Len(obj O) int {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
l, ok := s.cachedLengths[obj.Id()]
|
||||
if !ok {
|
||||
return len(s.sharedItems)
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
// Copy copies items between the source and destination.
|
||||
func (s *Slice[V, O]) Copy(src O, dst O) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
for _, item := range s.individualItems {
|
||||
for _, v := range item.Values {
|
||||
_, found := containsId(v.ids, src.Id())
|
||||
if found {
|
||||
v.ids = append(v.ids, dst.Id())
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, item := range s.appendedItems {
|
||||
found := false
|
||||
for _, v := range item.Values {
|
||||
_, found = containsId(v.ids, src.Id())
|
||||
if found {
|
||||
v.ids = append(v.ids, dst.Id())
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
// This is an optimization. If we didn't find an appended item at index i,
|
||||
// then all larger indices don't have an appended item for the object either.
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
srcLen, ok := s.cachedLengths[src.Id()]
|
||||
if ok {
|
||||
s.cachedLengths[dst.Id()] = srcLen
|
||||
}
|
||||
}
|
||||
|
||||
// Value returns all items for the input object.
|
||||
func (s *Slice[V, O]) Value(obj O) []V {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
l, ok := s.cachedLengths[obj.Id()]
|
||||
if ok {
|
||||
result := make([]V, l)
|
||||
s.fillOriginalItems(obj, &result)
|
||||
|
||||
sharedLen := len(s.sharedItems)
|
||||
for i, item := range s.appendedItems {
|
||||
found := false
|
||||
for _, v := range item.Values {
|
||||
_, found = containsId(v.ids, obj.Id())
|
||||
if found {
|
||||
result[sharedLen+i] = v.val
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
// This is an optimization. If we didn't find an appended item at index i,
|
||||
// then all larger indices don't have an appended item for the object either.
|
||||
return result
|
||||
}
|
||||
}
|
||||
return result
|
||||
} else {
|
||||
result := make([]V, len(s.sharedItems))
|
||||
s.fillOriginalItems(obj, &result)
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
// At returns the item at the requested index for the input object.
|
||||
// Appended items' indices are always larger than shared/individual items' indices.
|
||||
// We first check if the index is within the length of shared items.
|
||||
// If it is, then we return an individual value at that index - if it exists - or a shared value otherwise.
|
||||
// If the index is beyond the length of shared values, it is an appended item and that's what gets returned.
|
||||
func (s *Slice[V, O]) At(obj O, index uint64) (V, error) {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
if index >= uint64(len(s.sharedItems)+len(s.appendedItems)) {
|
||||
var def V
|
||||
return def, fmt.Errorf("index %d out of bounds", index)
|
||||
}
|
||||
|
||||
isOriginal := index < uint64(len(s.sharedItems))
|
||||
if isOriginal {
|
||||
ind, ok := s.individualItems[index]
|
||||
if !ok {
|
||||
return s.sharedItems[index], nil
|
||||
}
|
||||
for _, v := range ind.Values {
|
||||
for _, id := range v.ids {
|
||||
if id == obj.Id() {
|
||||
return v.val, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return s.sharedItems[index], nil
|
||||
} else {
|
||||
item := s.appendedItems[index-uint64(len(s.sharedItems))]
|
||||
for _, v := range item.Values {
|
||||
for _, id := range v.ids {
|
||||
if id == obj.Id() {
|
||||
return v.val, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
var def V
|
||||
return def, fmt.Errorf("index %d out of bounds", index)
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateAt updates the item at the required index for the input object to the passed in value.
|
||||
func (s *Slice[V, O]) UpdateAt(obj O, index uint64, val V) error {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
if index >= uint64(len(s.sharedItems)+len(s.appendedItems)) {
|
||||
return fmt.Errorf("index %d out of bounds", index)
|
||||
}
|
||||
|
||||
isOriginal := index < uint64(len(s.sharedItems))
|
||||
if isOriginal {
|
||||
s.updateOriginalItem(obj, index, val)
|
||||
return nil
|
||||
}
|
||||
return s.updateAppendedItem(obj, index, val)
|
||||
}
|
||||
|
||||
// Append adds a new item to the input object.
|
||||
func (s *Slice[V, O]) Append(obj O, val V) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
if len(s.appendedItems) == 0 {
|
||||
s.appendedItems = append(s.appendedItems, &MultiValueItem[V]{Values: []*Value[V]{{val: val, ids: []uuid.UUID{obj.Id()}}}})
|
||||
s.cachedLengths[obj.Id()] = len(s.sharedItems) + 1
|
||||
return
|
||||
}
|
||||
|
||||
for _, item := range s.appendedItems {
|
||||
found := false
|
||||
for _, v := range item.Values {
|
||||
_, found = containsId(v.ids, obj.Id())
|
||||
if found {
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
newValue := true
|
||||
for _, v := range item.Values {
|
||||
if v.val == val {
|
||||
v.ids = append(v.ids, obj.Id())
|
||||
newValue = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if newValue {
|
||||
item.Values = append(item.Values, &Value[V]{val: val, ids: []uuid.UUID{obj.Id()}})
|
||||
}
|
||||
|
||||
l, ok := s.cachedLengths[obj.Id()]
|
||||
if ok {
|
||||
s.cachedLengths[obj.Id()] = l + 1
|
||||
} else {
|
||||
s.cachedLengths[obj.Id()] = len(s.sharedItems) + 1
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
s.appendedItems = append(s.appendedItems, &MultiValueItem[V]{Values: []*Value[V]{{val: val, ids: []uuid.UUID{obj.Id()}}}})
|
||||
|
||||
s.cachedLengths[obj.Id()] = s.cachedLengths[obj.Id()] + 1
|
||||
}
|
||||
|
||||
// Detach removes the input object from the multi-value slice.
|
||||
// What this means in practice is that we remove all individual and appended values for that object and clear the cached length.
|
||||
func (s *Slice[V, O]) Detach(obj O) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
for i, ind := range s.individualItems {
|
||||
for vi, v := range ind.Values {
|
||||
foundIndex, found := containsId(v.ids, obj.Id())
|
||||
if found {
|
||||
if len(v.ids) == 1 {
|
||||
if len(ind.Values) == 1 {
|
||||
delete(s.individualItems, i)
|
||||
} else {
|
||||
ind.Values = deleteElemFromSlice(ind.Values, vi)
|
||||
}
|
||||
} else {
|
||||
v.ids = deleteElemFromSlice(v.ids, foundIndex)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, item := range s.appendedItems {
|
||||
found := false
|
||||
for vi, v := range item.Values {
|
||||
var foundIndex int
|
||||
foundIndex, found = containsId(v.ids, obj.Id())
|
||||
if found {
|
||||
if len(v.ids) == 1 {
|
||||
item.Values = deleteElemFromSlice(item.Values, vi)
|
||||
} else {
|
||||
v.ids = deleteElemFromSlice(v.ids, foundIndex)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
// This is an optimization. If we didn't find an appended item at index i,
|
||||
// then all larger indices don't have an appended item for the object either.
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
delete(s.cachedLengths, obj.Id())
|
||||
}
|
||||
|
||||
func (s *Slice[V, O]) fillOriginalItems(obj O, items *[]V) {
|
||||
for i, item := range s.sharedItems {
|
||||
ind, ok := s.individualItems[uint64(i)]
|
||||
if !ok {
|
||||
(*items)[i] = item
|
||||
} else {
|
||||
found := false
|
||||
for _, v := range ind.Values {
|
||||
_, found = containsId(v.ids, obj.Id())
|
||||
if found {
|
||||
(*items)[i] = v.val
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
(*items)[i] = item
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Slice[V, O]) updateOriginalItem(obj O, index uint64, val V) {
|
||||
ind, ok := s.individualItems[index]
|
||||
if ok {
|
||||
for mvi, v := range ind.Values {
|
||||
// if we find an existing value, we remove it
|
||||
foundIndex, found := containsId(v.ids, obj.Id())
|
||||
if found {
|
||||
if len(v.ids) == 1 {
|
||||
// There is an improvement to be made here. If len(ind.Values) == 1,
|
||||
// then after removing the item from the slice s.individualItems[i]
|
||||
// will be a useless map entry whose value is an empty slice.
|
||||
ind.Values = deleteElemFromSlice(ind.Values, mvi)
|
||||
} else {
|
||||
v.ids = deleteElemFromSlice(v.ids, foundIndex)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if val == s.sharedItems[index] {
|
||||
return
|
||||
}
|
||||
|
||||
if !ok {
|
||||
s.individualItems[index] = &MultiValueItem[V]{Values: []*Value[V]{{val: val, ids: []uuid.UUID{obj.Id()}}}}
|
||||
} else {
|
||||
newValue := true
|
||||
for _, v := range ind.Values {
|
||||
if v.val == val {
|
||||
v.ids = append(v.ids, obj.Id())
|
||||
newValue = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if newValue {
|
||||
ind.Values = append(ind.Values, &Value[V]{val: val, ids: []uuid.UUID{obj.Id()}})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Slice[V, O]) updateAppendedItem(obj O, index uint64, val V) error {
|
||||
item := s.appendedItems[index-uint64(len(s.sharedItems))]
|
||||
found := false
|
||||
for vi, v := range item.Values {
|
||||
var foundIndex int
|
||||
// if we find an existing value, we remove it
|
||||
foundIndex, found = containsId(v.ids, obj.Id())
|
||||
if found {
|
||||
if len(v.ids) == 1 {
|
||||
item.Values = deleteElemFromSlice(item.Values, vi)
|
||||
} else {
|
||||
v.ids = deleteElemFromSlice(v.ids, foundIndex)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return fmt.Errorf("index %d out of bounds", index)
|
||||
}
|
||||
|
||||
newValue := true
|
||||
for _, v := range item.Values {
|
||||
if v.val == val {
|
||||
v.ids = append(v.ids, obj.Id())
|
||||
newValue = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if newValue {
|
||||
item.Values = append(item.Values, &Value[V]{val: val, ids: []uuid.UUID{obj.Id()}})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func containsId(ids []uuid.UUID, wanted uuid.UUID) (int, bool) {
|
||||
for i, id := range ids {
|
||||
if id == wanted {
|
||||
return i, true
|
||||
}
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// deleteElemFromSlice does not relocate the slice, but it also does not preserve the order of items.
|
||||
// This is not a problem here because the order of values in a MultiValueItem and object IDs doesn't matter.
|
||||
func deleteElemFromSlice[T any](s []T, i int) []T {
|
||||
s[i] = s[len(s)-1] // Copy last element to index i.
|
||||
s = s[:len(s)-1] // Truncate slice.
|
||||
return s
|
||||
}
|
||||
664
container/multi-value-slice/multi_value_slice_test.go
Normal file
664
container/multi-value-slice/multi_value_slice_test.go
Normal file
@@ -0,0 +1,664 @@
|
||||
package mvslice
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/assert"
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/require"
|
||||
)
|
||||
|
||||
var (
|
||||
id1 = uuid.New()
|
||||
id2 = uuid.New()
|
||||
id999 = uuid.New()
|
||||
)
|
||||
|
||||
type testObject struct {
|
||||
id uuid.UUID
|
||||
slice *Slice[int, *testObject]
|
||||
}
|
||||
|
||||
func (o *testObject) Id() uuid.UUID {
|
||||
return o.id
|
||||
}
|
||||
|
||||
func (o *testObject) SetId(id uuid.UUID) {
|
||||
o.id = id
|
||||
}
|
||||
|
||||
func TestLen(t *testing.T) {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init([]int{1, 2, 3})
|
||||
id := uuid.New()
|
||||
s.cachedLengths[id] = 123
|
||||
t.Run("cached", func(t *testing.T) {
|
||||
assert.Equal(t, 123, s.Len(&testObject{id: id}))
|
||||
})
|
||||
t.Run("not cached", func(t *testing.T) {
|
||||
assert.Equal(t, 3, s.Len(&testObject{id: uuid.New()}))
|
||||
})
|
||||
}
|
||||
|
||||
func TestCopy(t *testing.T) {
|
||||
// What we want to check:
|
||||
// - shared value is copied
|
||||
// - when the source object has an individual value, it is copied
|
||||
// - when the source object does not have an individual value, the shared value is copied
|
||||
// - when the source object has an appended value, it is copied
|
||||
// - when the source object does not have an appended value, nothing is copied
|
||||
// - length of destination object is cached
|
||||
|
||||
s := setup()
|
||||
src := &testObject{id: id1, slice: s}
|
||||
dst := &testObject{id: id999, slice: s}
|
||||
|
||||
s.Copy(src, dst)
|
||||
|
||||
assert.Equal(t, (*MultiValueItem[int])(nil), dst.slice.individualItems[0])
|
||||
assertIndividualFound(t, s, dst.id, 1, 1)
|
||||
assertIndividualFound(t, s, dst.id, 2, 3)
|
||||
assertIndividualFound(t, s, dst.id, 3, 1)
|
||||
assertIndividualNotFound(t, s, dst.id, 4)
|
||||
assertAppendedFound(t, s, dst.id, 0, 1)
|
||||
assertAppendedFound(t, s, dst.id, 1, 3)
|
||||
assertAppendedNotFound(t, s, dst.id, 2)
|
||||
l, ok := s.cachedLengths[id999]
|
||||
require.Equal(t, true, ok)
|
||||
assert.Equal(t, 7, l)
|
||||
}
|
||||
|
||||
func TestValue(t *testing.T) {
|
||||
// What we want to check:
|
||||
// - correct values are returned for first object
|
||||
// - correct values are returned for second object
|
||||
// - correct values are returned for an object without appended items
|
||||
|
||||
s := setup()
|
||||
first := &testObject{id: id1, slice: s}
|
||||
second := &testObject{id: id2, slice: s}
|
||||
|
||||
v := s.Value(first)
|
||||
|
||||
require.Equal(t, 7, len(v))
|
||||
assert.Equal(t, 123, v[0])
|
||||
assert.Equal(t, 1, v[1])
|
||||
assert.Equal(t, 3, v[2])
|
||||
assert.Equal(t, 1, v[3])
|
||||
assert.Equal(t, 123, v[4])
|
||||
assert.Equal(t, 1, v[5])
|
||||
assert.Equal(t, 3, v[6])
|
||||
|
||||
v = s.Value(second)
|
||||
|
||||
require.Equal(t, 8, len(v))
|
||||
assert.Equal(t, 123, v[0])
|
||||
assert.Equal(t, 2, v[1])
|
||||
assert.Equal(t, 3, v[2])
|
||||
assert.Equal(t, 123, v[3])
|
||||
assert.Equal(t, 2, v[4])
|
||||
assert.Equal(t, 2, v[5])
|
||||
assert.Equal(t, 3, v[6])
|
||||
assert.Equal(t, 2, v[7])
|
||||
|
||||
s = &Slice[int, *testObject]{}
|
||||
s.Init([]int{1, 2, 3})
|
||||
id := uuid.New()
|
||||
|
||||
v = s.Value(&testObject{id: id})
|
||||
|
||||
require.Equal(t, 3, len(v))
|
||||
assert.Equal(t, 1, v[0])
|
||||
assert.Equal(t, 2, v[1])
|
||||
assert.Equal(t, 3, v[2])
|
||||
}
|
||||
|
||||
func TestAt(t *testing.T) {
|
||||
// What we want to check:
|
||||
// - correct values are returned for first object
|
||||
// - correct values are returned for second object
|
||||
// - ERROR when index too large in general
|
||||
// - ERROR when index not too large in general, but too large for an object
|
||||
|
||||
s := setup()
|
||||
first := &testObject{id: id1, slice: s}
|
||||
second := &testObject{id: id2, slice: s}
|
||||
|
||||
v, err := s.At(first, 0)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 123, v)
|
||||
v, err = s.At(first, 1)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, v)
|
||||
v, err = s.At(first, 2)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, v)
|
||||
v, err = s.At(first, 3)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, v)
|
||||
v, err = s.At(first, 4)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 123, v)
|
||||
v, err = s.At(first, 5)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, v)
|
||||
v, err = s.At(first, 6)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, v)
|
||||
_, err = s.At(first, 7)
|
||||
assert.ErrorContains(t, "index 7 out of bounds", err)
|
||||
|
||||
v, err = s.At(second, 0)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 123, v)
|
||||
v, err = s.At(second, 1)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 2, v)
|
||||
v, err = s.At(second, 2)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, v)
|
||||
v, err = s.At(second, 3)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 123, v)
|
||||
v, err = s.At(second, 4)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 2, v)
|
||||
v, err = s.At(second, 5)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 2, v)
|
||||
v, err = s.At(second, 6)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, v)
|
||||
v, err = s.At(second, 7)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 2, v)
|
||||
_, err = s.At(second, 8)
|
||||
assert.ErrorContains(t, "index 8 out of bounds", err)
|
||||
}
|
||||
|
||||
func TestUpdateAt(t *testing.T) {
|
||||
// What we want to check:
|
||||
// - shared value is updated only for the updated object, creating a new individual value (shared value remains the same)
|
||||
// - individual value (different for both objects) is updated to a third value
|
||||
// - individual value (different for both objects) is updated to the other object's value
|
||||
// - individual value (equal for both objects) is updated
|
||||
// - individual value existing only for the updated object is updated
|
||||
// - individual value existing only for the other-object appends an item to the individual value
|
||||
// - individual value updated to the original shared value removes that individual value
|
||||
// - appended value (different for both objects) is updated to a third value
|
||||
// - appended value (different for both objects) is updated to the other object's value
|
||||
// - appended value (equal for both objects) is updated
|
||||
// - appended value existing for one object is updated
|
||||
// - ERROR when index too large in general
|
||||
// - ERROR when index not too large in general, but too large for an object
|
||||
|
||||
s := setup()
|
||||
first := &testObject{id: id1, slice: s}
|
||||
second := &testObject{id: id2, slice: s}
|
||||
|
||||
require.NoError(t, s.UpdateAt(first, 0, 999))
|
||||
assert.Equal(t, 123, s.sharedItems[0])
|
||||
assertIndividualFound(t, s, first.id, 0, 999)
|
||||
assertIndividualNotFound(t, s, second.id, 0)
|
||||
|
||||
require.NoError(t, s.UpdateAt(first, 1, 999))
|
||||
assertIndividualFound(t, s, first.id, 1, 999)
|
||||
assertIndividualFound(t, s, second.id, 1, 2)
|
||||
|
||||
require.NoError(t, s.UpdateAt(first, 1, 2))
|
||||
assertIndividualFound(t, s, first.id, 1, 2)
|
||||
assertIndividualFound(t, s, second.id, 1, 2)
|
||||
|
||||
require.NoError(t, s.UpdateAt(first, 2, 999))
|
||||
assertIndividualFound(t, s, first.id, 2, 999)
|
||||
assertIndividualFound(t, s, second.id, 2, 3)
|
||||
|
||||
require.NoError(t, s.UpdateAt(first, 3, 999))
|
||||
assertIndividualFound(t, s, first.id, 3, 999)
|
||||
assertIndividualNotFound(t, s, second.id, 3)
|
||||
|
||||
require.NoError(t, s.UpdateAt(first, 4, 999))
|
||||
assertIndividualFound(t, s, first.id, 4, 999)
|
||||
assertIndividualFound(t, s, second.id, 4, 2)
|
||||
|
||||
require.NoError(t, s.UpdateAt(first, 4, 123))
|
||||
assertIndividualNotFound(t, s, first.id, 4)
|
||||
assertIndividualFound(t, s, second.id, 4, 2)
|
||||
|
||||
require.NoError(t, s.UpdateAt(first, 5, 999))
|
||||
assertAppendedFound(t, s, first.id, 0, 999)
|
||||
assertAppendedFound(t, s, second.id, 0, 2)
|
||||
|
||||
require.NoError(t, s.UpdateAt(first, 5, 2))
|
||||
assertAppendedFound(t, s, first.id, 0, 2)
|
||||
assertAppendedFound(t, s, second.id, 0, 2)
|
||||
|
||||
require.NoError(t, s.UpdateAt(first, 6, 999))
|
||||
assertAppendedFound(t, s, first.id, 1, 999)
|
||||
assertAppendedFound(t, s, second.id, 1, 3)
|
||||
|
||||
// we update the second object because there are no more appended items for the first object
|
||||
require.NoError(t, s.UpdateAt(second, 7, 999))
|
||||
assertAppendedNotFound(t, s, first.id, 2)
|
||||
assertAppendedFound(t, s, second.id, 2, 999)
|
||||
|
||||
assert.ErrorContains(t, "index 7 out of bounds", s.UpdateAt(first, 7, 999))
|
||||
assert.ErrorContains(t, "index 8 out of bounds", s.UpdateAt(second, 8, 999))
|
||||
}
|
||||
|
||||
func TestAppend(t *testing.T) {
|
||||
// What we want to check:
|
||||
// - appending first item ever to the slice
|
||||
// - appending an item to an object when there is no corresponding item for the other object
|
||||
// - appending an item to an object when there is a corresponding item with same value for the other object
|
||||
// - appending an item to an object when there is a corresponding item with different value for the other object
|
||||
// - we also want to check that cached length is properly updated after every append
|
||||
|
||||
// we want to start with the simplest slice possible
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init([]int{0})
|
||||
first := &testObject{id: id1, slice: s}
|
||||
second := &testObject{id: id2, slice: s}
|
||||
|
||||
// append first value ever
|
||||
s.Append(first, 1)
|
||||
require.Equal(t, 1, len(s.appendedItems))
|
||||
assertAppendedFound(t, s, first.id, 0, 1)
|
||||
assertAppendedNotFound(t, s, second.id, 0)
|
||||
l, ok := s.cachedLengths[first.id]
|
||||
require.Equal(t, true, ok)
|
||||
assert.Equal(t, 2, l)
|
||||
_, ok = s.cachedLengths[second.id]
|
||||
assert.Equal(t, false, ok)
|
||||
|
||||
// append one more value to the first object, so that we can test two append scenarios for the second object
|
||||
s.Append(first, 1)
|
||||
|
||||
// append the first value to the second object, equal to the value for the first object
|
||||
s.Append(second, 1)
|
||||
require.Equal(t, 2, len(s.appendedItems))
|
||||
assertAppendedFound(t, s, first.id, 0, 1)
|
||||
assertAppendedFound(t, s, second.id, 0, 1)
|
||||
l, ok = s.cachedLengths[first.id]
|
||||
require.Equal(t, true, ok)
|
||||
assert.Equal(t, 3, l)
|
||||
l, ok = s.cachedLengths[second.id]
|
||||
assert.Equal(t, true, ok)
|
||||
assert.Equal(t, 2, l)
|
||||
|
||||
// append the first value to the second object, different than the value for the first object
|
||||
s.Append(second, 2)
|
||||
require.Equal(t, 2, len(s.appendedItems))
|
||||
assertAppendedFound(t, s, first.id, 1, 1)
|
||||
assertAppendedFound(t, s, second.id, 1, 2)
|
||||
l, ok = s.cachedLengths[first.id]
|
||||
require.Equal(t, true, ok)
|
||||
assert.Equal(t, 3, l)
|
||||
l, ok = s.cachedLengths[second.id]
|
||||
assert.Equal(t, true, ok)
|
||||
assert.Equal(t, 3, l)
|
||||
}
|
||||
|
||||
func TestDetach(t *testing.T) {
|
||||
// What we want to check:
|
||||
// - no individual or appended items left after detaching an object
|
||||
// - length removed from cache
|
||||
|
||||
s := setup()
|
||||
obj := &testObject{id: id1, slice: s}
|
||||
|
||||
s.Detach(obj)
|
||||
|
||||
for _, item := range s.individualItems {
|
||||
found := false
|
||||
for _, v := range item.Values {
|
||||
for _, o := range v.ids {
|
||||
if o == obj.id {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
}
|
||||
assert.Equal(t, false, found)
|
||||
}
|
||||
for _, item := range s.appendedItems {
|
||||
found := false
|
||||
for _, v := range item.Values {
|
||||
for _, o := range v.ids {
|
||||
if o == obj.id {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
}
|
||||
assert.Equal(t, false, found)
|
||||
}
|
||||
_, ok := s.cachedLengths[obj.id]
|
||||
assert.Equal(t, false, ok)
|
||||
}
|
||||
|
||||
// Share the slice between 2 objects.
|
||||
// Index 0: Shared value
|
||||
// Index 1: Different individual value
|
||||
// Index 2: Same individual value
|
||||
// Index 3: Individual value ONLY for the first object
|
||||
// Index 4: Individual value ONLY for the second object
|
||||
// Index 5: Different appended value
|
||||
// Index 6: Same appended value
|
||||
// Index 7: Appended value ONLY for the second object
|
||||
func setup() *Slice[int, *testObject] {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init([]int{123, 123, 123, 123, 123})
|
||||
s.individualItems[1] = &MultiValueItem[int]{
|
||||
Values: []*Value[int]{
|
||||
{
|
||||
val: 1,
|
||||
ids: []uuid.UUID{id1},
|
||||
},
|
||||
{
|
||||
val: 2,
|
||||
ids: []uuid.UUID{id2},
|
||||
},
|
||||
},
|
||||
}
|
||||
s.individualItems[2] = &MultiValueItem[int]{
|
||||
Values: []*Value[int]{
|
||||
{
|
||||
val: 3,
|
||||
ids: []uuid.UUID{id1, id2},
|
||||
},
|
||||
},
|
||||
}
|
||||
s.individualItems[3] = &MultiValueItem[int]{
|
||||
Values: []*Value[int]{
|
||||
{
|
||||
val: 1,
|
||||
ids: []uuid.UUID{id1},
|
||||
},
|
||||
},
|
||||
}
|
||||
s.individualItems[4] = &MultiValueItem[int]{
|
||||
Values: []*Value[int]{
|
||||
{
|
||||
val: 2,
|
||||
ids: []uuid.UUID{id2},
|
||||
},
|
||||
},
|
||||
}
|
||||
s.appendedItems = []*MultiValueItem[int]{
|
||||
{
|
||||
Values: []*Value[int]{
|
||||
{
|
||||
val: 1,
|
||||
ids: []uuid.UUID{id1},
|
||||
},
|
||||
{
|
||||
val: 2,
|
||||
ids: []uuid.UUID{id2},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Values: []*Value[int]{
|
||||
{
|
||||
val: 3,
|
||||
ids: []uuid.UUID{id1, id2},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Values: []*Value[int]{
|
||||
{
|
||||
val: 2,
|
||||
ids: []uuid.UUID{id2},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
s.cachedLengths[id1] = 7
|
||||
s.cachedLengths[id2] = 8
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func assertIndividualFound(t *testing.T, slice *Slice[int, *testObject], id uuid.UUID, itemIndex uint64, expected int) {
|
||||
found := false
|
||||
for _, v := range slice.individualItems[itemIndex].Values {
|
||||
for _, o := range v.ids {
|
||||
if o == id {
|
||||
found = true
|
||||
assert.Equal(t, expected, v.val)
|
||||
}
|
||||
}
|
||||
}
|
||||
assert.Equal(t, true, found)
|
||||
}
|
||||
|
||||
func assertIndividualNotFound(t *testing.T, slice *Slice[int, *testObject], id uuid.UUID, itemIndex uint64) {
|
||||
found := false
|
||||
for _, v := range slice.individualItems[itemIndex].Values {
|
||||
for _, o := range v.ids {
|
||||
if o == id {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
}
|
||||
assert.Equal(t, false, found)
|
||||
}
|
||||
|
||||
func assertAppendedFound(t *testing.T, slice *Slice[int, *testObject], id uuid.UUID, itemIndex uint64, expected int) {
|
||||
found := false
|
||||
for _, v := range slice.appendedItems[itemIndex].Values {
|
||||
for _, o := range v.ids {
|
||||
if o == id {
|
||||
found = true
|
||||
assert.Equal(t, expected, v.val)
|
||||
}
|
||||
}
|
||||
}
|
||||
assert.Equal(t, true, found)
|
||||
}
|
||||
|
||||
func assertAppendedNotFound(t *testing.T, slice *Slice[int, *testObject], id uuid.UUID, itemIndex uint64) {
|
||||
found := false
|
||||
for _, v := range slice.appendedItems[itemIndex].Values {
|
||||
for _, o := range v.ids {
|
||||
if o == id {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
}
|
||||
assert.Equal(t, false, found)
|
||||
}
|
||||
|
||||
func BenchmarkValue(b *testing.B) {
|
||||
const _100k = 100000
|
||||
const _1m = 1000000
|
||||
const _10m = 10000000
|
||||
|
||||
b.Run("100,000 shared items", func(b *testing.B) {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init(make([]int, _100k))
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Value(&testObject{})
|
||||
}
|
||||
})
|
||||
b.Run("100,000 equal individual items", func(b *testing.B) {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init(make([]int, _100k))
|
||||
s.individualItems[0] = &MultiValueItem[int]{Values: []*Value[int]{{val: 999, ids: []uuid.UUID{}}}}
|
||||
objs := make([]*testObject, _100k)
|
||||
for i := 0; i < len(objs); i++ {
|
||||
id := uuid.New()
|
||||
objs[i] = &testObject{id: id, slice: s}
|
||||
s.individualItems[0].Values[0].ids = append(s.individualItems[0].Values[0].ids, id)
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Value(objs[rand.Intn(_100k)])
|
||||
}
|
||||
})
|
||||
b.Run("100,000 different individual items", func(b *testing.B) {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init(make([]int, _100k))
|
||||
objs := make([]*testObject, _100k)
|
||||
for i := 0; i < len(objs); i++ {
|
||||
id := uuid.New()
|
||||
objs[i] = &testObject{id: id, slice: s}
|
||||
s.individualItems[uint64(i)] = &MultiValueItem[int]{Values: []*Value[int]{{val: i, ids: []uuid.UUID{id}}}}
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Value(objs[rand.Intn(_100k)])
|
||||
}
|
||||
})
|
||||
b.Run("100,000 shared items and 100,000 equal appended items", func(b *testing.B) {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init(make([]int, _100k))
|
||||
s.appendedItems = []*MultiValueItem[int]{{Values: []*Value[int]{{val: 999, ids: []uuid.UUID{}}}}}
|
||||
objs := make([]*testObject, _100k)
|
||||
for i := 0; i < len(objs); i++ {
|
||||
id := uuid.New()
|
||||
objs[i] = &testObject{id: id, slice: s}
|
||||
s.appendedItems[0].Values[0].ids = append(s.appendedItems[0].Values[0].ids, id)
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Value(objs[rand.Intn(_100k)])
|
||||
}
|
||||
})
|
||||
b.Run("100,000 shared items and 100,000 different appended items", func(b *testing.B) {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init(make([]int, _100k))
|
||||
s.appendedItems = []*MultiValueItem[int]{}
|
||||
objs := make([]*testObject, _100k)
|
||||
for i := 0; i < len(objs); i++ {
|
||||
id := uuid.New()
|
||||
objs[i] = &testObject{id: id, slice: s}
|
||||
s.appendedItems = append(s.appendedItems, &MultiValueItem[int]{Values: []*Value[int]{{val: i, ids: []uuid.UUID{id}}}})
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Value(objs[rand.Intn(_100k)])
|
||||
}
|
||||
})
|
||||
b.Run("1,000,000 shared items", func(b *testing.B) {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init(make([]int, _1m))
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Value(&testObject{})
|
||||
}
|
||||
})
|
||||
b.Run("1,000,000 equal individual items", func(b *testing.B) {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init(make([]int, _1m))
|
||||
s.individualItems[0] = &MultiValueItem[int]{Values: []*Value[int]{{val: 999, ids: []uuid.UUID{}}}}
|
||||
objs := make([]*testObject, _1m)
|
||||
for i := 0; i < len(objs); i++ {
|
||||
id := uuid.New()
|
||||
objs[i] = &testObject{id: id, slice: s}
|
||||
s.individualItems[0].Values[0].ids = append(s.individualItems[0].Values[0].ids, id)
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Value(objs[rand.Intn(_1m)])
|
||||
}
|
||||
})
|
||||
b.Run("1,000,000 different individual items", func(b *testing.B) {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init(make([]int, _1m))
|
||||
objs := make([]*testObject, _1m)
|
||||
for i := 0; i < len(objs); i++ {
|
||||
id := uuid.New()
|
||||
objs[i] = &testObject{id: id, slice: s}
|
||||
s.individualItems[uint64(i)] = &MultiValueItem[int]{Values: []*Value[int]{{val: i, ids: []uuid.UUID{id}}}}
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Value(objs[rand.Intn(_1m)])
|
||||
}
|
||||
})
|
||||
b.Run("1,000,000 shared items and 1,000,000 equal appended items", func(b *testing.B) {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init(make([]int, _1m))
|
||||
s.appendedItems = []*MultiValueItem[int]{{Values: []*Value[int]{{val: 999, ids: []uuid.UUID{}}}}}
|
||||
objs := make([]*testObject, _1m)
|
||||
for i := 0; i < len(objs); i++ {
|
||||
id := uuid.New()
|
||||
objs[i] = &testObject{id: id, slice: s}
|
||||
s.appendedItems[0].Values[0].ids = append(s.appendedItems[0].Values[0].ids, id)
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Value(objs[rand.Intn(_1m)])
|
||||
}
|
||||
})
|
||||
b.Run("1,000,000 shared items and 1,000,000 different appended items", func(b *testing.B) {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init(make([]int, _1m))
|
||||
s.appendedItems = []*MultiValueItem[int]{}
|
||||
objs := make([]*testObject, _1m)
|
||||
for i := 0; i < len(objs); i++ {
|
||||
id := uuid.New()
|
||||
objs[i] = &testObject{id: id, slice: s}
|
||||
s.appendedItems = append(s.appendedItems, &MultiValueItem[int]{Values: []*Value[int]{{val: i, ids: []uuid.UUID{id}}}})
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Value(objs[rand.Intn(_1m)])
|
||||
}
|
||||
})
|
||||
b.Run("10,000,000 shared items", func(b *testing.B) {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init(make([]int, _10m))
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Value(&testObject{})
|
||||
}
|
||||
})
|
||||
b.Run("10,000,000 equal individual items", func(b *testing.B) {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init(make([]int, _10m))
|
||||
s.individualItems[0] = &MultiValueItem[int]{Values: []*Value[int]{{val: 999, ids: []uuid.UUID{}}}}
|
||||
objs := make([]*testObject, _10m)
|
||||
for i := 0; i < len(objs); i++ {
|
||||
id := uuid.New()
|
||||
objs[i] = &testObject{id: id, slice: s}
|
||||
s.individualItems[0].Values[0].ids = append(s.individualItems[0].Values[0].ids, id)
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Value(objs[rand.Intn(_10m)])
|
||||
}
|
||||
})
|
||||
b.Run("10,000,000 different individual items", func(b *testing.B) {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init(make([]int, _10m))
|
||||
objs := make([]*testObject, _10m)
|
||||
for i := 0; i < len(objs); i++ {
|
||||
id := uuid.New()
|
||||
objs[i] = &testObject{id: id, slice: s}
|
||||
s.individualItems[uint64(i)] = &MultiValueItem[int]{Values: []*Value[int]{{val: i, ids: []uuid.UUID{id}}}}
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Value(objs[rand.Intn(_10m)])
|
||||
}
|
||||
})
|
||||
b.Run("10,000,000 shared items and 10,000,000 equal appended items", func(b *testing.B) {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init(make([]int, _10m))
|
||||
s.appendedItems = []*MultiValueItem[int]{{Values: []*Value[int]{{val: 999, ids: []uuid.UUID{}}}}}
|
||||
objs := make([]*testObject, _10m)
|
||||
for i := 0; i < len(objs); i++ {
|
||||
id := uuid.New()
|
||||
objs[i] = &testObject{id: id, slice: s}
|
||||
s.appendedItems[0].Values[0].ids = append(s.appendedItems[0].Values[0].ids, id)
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Value(objs[rand.Intn(_10m)])
|
||||
}
|
||||
})
|
||||
b.Run("10,000,000 shared items and 10,000,000 different appended items", func(b *testing.B) {
|
||||
s := &Slice[int, *testObject]{}
|
||||
s.Init(make([]int, _10m))
|
||||
s.appendedItems = []*MultiValueItem[int]{}
|
||||
objs := make([]*testObject, _10m)
|
||||
for i := 0; i < len(objs); i++ {
|
||||
id := uuid.New()
|
||||
objs[i] = &testObject{id: id, slice: s}
|
||||
s.appendedItems = append(s.appendedItems, &MultiValueItem[int]{Values: []*Value[int]{{val: i, ids: []uuid.UUID{id}}}})
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Value(objs[rand.Intn(_10m)])
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -255,9 +255,11 @@ func (v *validator) LogValidatorGainsAndLosses(ctx context.Context, slot primiti
|
||||
}
|
||||
|
||||
if v.emitAccountMetrics {
|
||||
// There is no distinction between unknown and pending validators here.
|
||||
// The balance is recorded as 0, as this metric is the effective balance of a participating validator.
|
||||
for _, missingPubKey := range resp.MissingValidators {
|
||||
fmtKey := fmt.Sprintf("%#x", missingPubKey)
|
||||
ValidatorBalancesGaugeVec.WithLabelValues(fmtKey).Set(float64(params.BeaconConfig().MaxEffectiveBalance) / float64(params.BeaconConfig().GweiPerEth))
|
||||
ValidatorBalancesGaugeVec.WithLabelValues(fmtKey).Set(0)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user