Compare commits

...

16 Commits

Author SHA1 Message Date
terence tsao
92a88e1ff2 Merge branch 'proposer-verify-attestation' of github.com:prysmaticlabs/prysm into terence 2023-08-08 14:39:39 -07:00
terence tsao
f1c6f07e87 Proposer metrics 2023-08-08 06:43:41 -07:00
terence tsao
441b7ee4b7 Merge branch 'develop' of github.com:prysmaticlabs/prysm into terence 2023-08-08 06:35:26 -07:00
terence tsao
b981477ae6 Fix(proposer): verify attestations without mutating state 2023-08-07 07:59:50 -07:00
terence tsao
535c1cb5c3 Merge branch 'develop' of github.com:prysmaticlabs/prysm into terence 2023-08-05 14:01:37 -07:00
terence tsao
6619063187 Merge branch 'develop' of github.com:prysmaticlabs/prysm into terence 2023-08-04 12:40:29 -07:00
terence tsao
c5e0caf80c Rm unused 2023-08-03 13:32:08 -07:00
Potuz
6f3dec79a4 block when updating caches 2023-08-03 13:16:42 -07:00
Potuz
99491be4a0 update NSC together with epoch boundary caches 2023-08-03 13:16:42 -07:00
terence tsao
8f3f278edd Don't panic 2023-08-03 13:16:42 -07:00
terence tsao
e80bffb1f0 Add profile 2023-08-03 13:16:42 -07:00
terence tsao
3058bc1661 Change logs 2023-08-03 13:16:42 -07:00
terencechain
830d331c6d Remove: span for convert to indexed attestation (#12687)
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
2023-08-03 13:16:42 -07:00
Nishant Das
0960fba6d1 Minor Optimization on InnerShuffleList (#12690)
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
2023-08-03 13:16:42 -07:00
terencechain
2cec1b6699 Fix: use correct context for UpdateCommitteeCache (#12691) 2023-08-03 13:16:42 -07:00
terence tsao
439de222e2 Propose every slot and profile 2023-08-03 13:14:06 -07:00
14 changed files with 105 additions and 32 deletions

View File

@@ -71,6 +71,7 @@ go_library(
"//consensus-types/primitives:go_default_library",
"//crypto/bls:go_default_library",
"//encoding/bytesutil:go_default_library",
"//io/file:go_default_library",
"//math:go_default_library",
"//monitoring/tracing:go_default_library",
"//proto/engine/v1:go_default_library",

View File

@@ -3,6 +3,10 @@ package blockchain
import (
"bytes"
"context"
"fmt"
"path"
"runtime/pprof"
time2 "time"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
@@ -15,6 +19,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/io/file"
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
ethpbv1 "github.com/prysmaticlabs/prysm/v4/proto/eth/v1"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
@@ -55,6 +60,11 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
return err
}
bf := bytes.NewBuffer([]byte{})
if err := pprof.StartCPUProfile(bf); err != nil {
log.WithError(err).Error("could not start cpu profile")
}
preState, err := s.getBlockPreState(ctx, blockCopy.Block())
if err != nil {
return errors.Wrap(err, "could not get block's prestate")
@@ -163,6 +173,14 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
log.WithError(err).Error("Unable to log state transition data")
}
pprof.StopCPUProfile()
if time.Since(receivedTime) > 3/2*time2.Second {
dbPath := path.Join("/home/t", fmt.Sprintf("processing_%d_%d.profile", blockCopy.Block().Slot(), time.Since(receivedTime).Milliseconds()))
if err = file.WriteFile(dbPath, bf.Bytes()); err != nil {
log.WithError(err).Error("could not write profile")
}
}
chainServiceProcessingTime.Observe(float64(time.Since(receivedTime).Milliseconds()))
return nil

View File

@@ -97,7 +97,7 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
"direction": conn.Stat().Direction,
"multiAddr": peerMultiaddrString(conn),
"activePeers": len(s.peers.Active()),
}).Debug("Peer connected")
}).Trace("Peer connected")
}
// Do not perform handshake on inbound dials.
@@ -173,7 +173,7 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
// Only log disconnections if we were fully connected.
if priorState == peers.PeerConnected {
log.WithField("activePeers", len(s.peers.Active())).Debug("Peer disconnected")
log.WithField("activePeers", len(s.peers.Active())).Trace("Peer disconnected")
}
}()
},

View File

@@ -76,6 +76,7 @@ go_library(
"//crypto/rand:go_default_library",
"//encoding/bytesutil:go_default_library",
"//encoding/ssz:go_default_library",
"//io/file:go_default_library",
"//monitoring/tracing:go_default_library",
"//network/forks:go_default_library",
"//proto/engine/v1:go_default_library",

View File

@@ -1,9 +1,12 @@
package validator
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"path"
"runtime/pprof"
"strings"
"sync"
"time"
@@ -12,6 +15,8 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
emptypb "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/builder"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
@@ -25,6 +30,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/io/file"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/sirupsen/logrus"
@@ -42,6 +48,17 @@ const (
eth1dataTimeout = 2 * time.Second
)
var (
blockProposalTimeMilliseconds = promauto.NewSummary(prometheus.SummaryOpts{
Name: "block_proposal_time_milliseconds",
Help: "Total time in milliseconds to propose a block",
})
localPayloadValue = promauto.NewGauge(prometheus.GaugeOpts{
Name: "local_payload_value_gwei",
Help: "The value of the local payload",
})
)
// GetBeaconBlock is called by a proposer during its assigned slot to request a block to sign
// by passing in the slot and the signed randao reveal of the slot.
func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (*ethpb.GenericBeaconBlock, error) {
@@ -58,6 +75,12 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
"sinceSlotStartTime": time.Since(t),
}).Info("Begin building block")
bf := bytes.NewBuffer([]byte{})
startTime := time.Now()
if err := pprof.StartCPUProfile(bf); err != nil {
log.WithError(err).Error("could not start cpu profile")
}
// A syncing validator should not produce a block.
if vs.SyncChecker.Syncing() {
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
@@ -169,6 +192,16 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
"validator": sBlk.Block().ProposerIndex(),
}).Info("Finished building block")
blockProposalTimeMilliseconds.Observe(float64(time.Since(t).Milliseconds()))
pprof.StopCPUProfile()
if time.Since(startTime) > 500*time.Millisecond {
dbPath := path.Join("/home/t", fmt.Sprintf("%d_%d_proposer.profile", req.Slot, time.Since(startTime).Milliseconds()))
if err = file.WriteFile(dbPath, bf.Bytes()); err != nil {
log.WithError(err).Error("could not write profile")
}
}
pb, err := sBlk.Block().Proto()
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not convert block to proto: %v", err)

View File

@@ -6,7 +6,6 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
@@ -15,7 +14,6 @@ import (
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/attestation/aggregation"
attaggregation "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/attestation/aggregation/attestations"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"go.opencensus.io/trace"
)
@@ -83,26 +81,9 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon
func (a proposerAtts) filter(ctx context.Context, st state.BeaconState) (proposerAtts, proposerAtts) {
validAtts := make([]*ethpb.Attestation, 0, len(a))
invalidAtts := make([]*ethpb.Attestation, 0, len(a))
var attestationProcessor func(context.Context, state.BeaconState, *ethpb.Attestation) (state.BeaconState, error)
if st.Version() == version.Phase0 {
attestationProcessor = blocks.ProcessAttestationNoVerifySignature
} else if st.Version() >= version.Altair {
// Use a wrapper here, as go needs strong typing for the function signature.
attestationProcessor = func(ctx context.Context, st state.BeaconState, attestation *ethpb.Attestation) (state.BeaconState, error) {
totalBalance, err := helpers.TotalActiveBalance(st)
if err != nil {
return nil, err
}
return altair.ProcessAttestationNoVerifySignature(ctx, st, attestation, totalBalance)
}
} else {
// Exit early if there is an unknown state type.
return validAtts, invalidAtts
}
for _, att := range a {
if _, err := attestationProcessor(ctx, st, att); err == nil {
if err := blocks.VerifyAttestationNoVerifySignature(ctx, st, att); err == nil {
validAtts = append(validAtts, att)
continue
}

View File

@@ -53,6 +53,11 @@ func setExecutionData(ctx context.Context, blk interfaces.SignedBeaconBlock, loc
if localPayload == nil {
return errors.New("local payload is nil")
}
localValueGwei, err := localPayload.ValueInGwei()
if err != nil {
return errors.Wrap(err, "failed to get local payload value")
}
localPayloadValue.Set(float64(localValueGwei))
// Use local payload if builder payload is nil.
if builderPayload == nil {

View File

@@ -54,7 +54,7 @@ func (vs *Server) getLocalPayload(ctx context.Context, blk interfaces.ReadOnlyBe
slot := blk.Slot()
vIdx := blk.ProposerIndex()
headRoot := blk.ParentRoot()
proposerID, payloadId, ok := vs.ProposerSlotIndexCache.GetProposerPayloadIDs(slot, headRoot)
_, payloadId, ok := vs.ProposerSlotIndexCache.GetProposerPayloadIDs(slot, headRoot)
feeRecipient := params.BeaconConfig().DefaultFeeRecipient
recipient, err := vs.BeaconDB.FeeRecipientByValidatorID(ctx, vIdx)
switch err == nil {
@@ -76,7 +76,7 @@ func (vs *Server) getLocalPayload(ctx context.Context, blk interfaces.ReadOnlyBe
return nil, errors.Wrap(err, "could not get fee recipient in db")
}
if ok && proposerID == vIdx && payloadId != [8]byte{} { // Payload ID is cache hit. Return the cached payload ID.
if ok && payloadId != [8]byte{} { // Payload ID is cache hit. Return the cached payload ID.
var pid [8]byte
copy(pid[:], payloadId[:])
payloadIDCacheHit.Inc()

View File

@@ -31,6 +31,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/network/forks"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
@@ -186,3 +187,32 @@ func (vs *Server) WaitForChainStart(_ *emptypb.Empty, stream ethpb.BeaconNodeVal
}
return stream.Send(res)
}
func (vs *Server) ProposeEverySlot() {
for vs.TimeFetcher.GenesisTime().IsZero() {
time.Sleep(time.Second)
}
genTime := vs.TimeFetcher.GenesisTime()
ticker := slots.NewSlotTicker(genTime, params.BeaconConfig().SecondsPerSlot)
for {
select {
case <-vs.Ctx.Done():
ticker.Done()
return
case slot := <-ticker.C():
curr := time.Now()
time.Sleep(50 * time.Millisecond)
_, err := vs.GetBeaconBlock(context.Background(), &ethpb.BlockRequest{
Slot: slot,
Graffiti: make([]byte, 32),
RandaoReveal: make([]byte, 96),
})
if err != nil {
log.Error(err)
continue
}
log.Infof("Successfully produced block %d in %s", slot, time.Since(curr).String())
}
}
}

View File

@@ -443,6 +443,8 @@ func (s *Service) Start() {
// Register reflection service on gRPC server.
reflection.Register(s.grpcServer)
go validatorServer.ProposeEverySlot()
go func() {
if s.listener != nil {
if err := s.grpcServer.Serve(s.listener); err != nil {

View File

@@ -210,7 +210,7 @@ func (q *blocksQueue) loop() {
"epoch": slots.ToEpoch(fsm.start),
"start": fsm.start,
"error": err.Error(),
}).Debug("Can not trigger event")
}).Trace("Can not trigger event")
if errors.Is(err, errNoRequiredPeers) {
forceExit := q.exitConditions.noRequiredPeersErrRetries > noRequiredPeersErrMaxRetries
if q.mode == modeStopOnFinalizedEpoch || forceExit {

View File

@@ -189,7 +189,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
return
}
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
log.WithError(err).WithField("topic", topic).Debug("Could not decode stream message")
log.WithError(err).WithField("topic", topic).Trace("Could not decode stream message")
tracing.AnnotateError(span, err)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
return
@@ -209,7 +209,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
return
}
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
log.WithError(err).WithField("topic", topic).Debug("Could not decode stream message")
log.WithError(err).WithField("topic", topic).Trace("Could not decode stream message")
tracing.AnnotateError(span, err)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
return

View File

@@ -45,8 +45,10 @@ func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream l
return err
}
s.rateLimiter.add(stream, 1)
log := log.WithField("Reason", goodbyeMessage(*m))
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Peer has sent a goodbye message")
if goodbyeMessage(*m) != "client has too many peers" {
log := log.WithField("Reason", goodbyeMessage(*m))
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Peer has sent a goodbye message")
}
s.cfg.p2p.Peers().SetNextValidTime(stream.Conn().RemotePeer(), goodByeBackoff(*m))
// closes all streams with the peer
return s.cfg.p2p.Disconnect(stream.Conn().RemotePeer())
@@ -87,7 +89,7 @@ func (s *Service) sendGoodByeAndDisconnect(ctx context.Context, code p2ptypes.RP
log.WithFields(logrus.Fields{
"error": err,
"peer": id,
}).Debug("Could not send goodbye message to peer")
}).Trace("Could not send goodbye message to peer")
}
return s.cfg.p2p.Disconnect(id)
}

View File

@@ -73,7 +73,7 @@ func (c *client) registerRPCHandler(baseTopic string, handle rpcHandler) {
log.WithError(err).Debug("Could not decode goodbye stream message")
return
}
log.WithError(err).Debug("Could not decode stream message")
log.WithError(err).Trace("Could not decode stream message")
return
}
if err := handle(context.Background(), msg, stream); err != nil {
@@ -89,7 +89,7 @@ func (c *client) registerRPCHandler(baseTopic string, handle rpcHandler) {
return
}
if err := c.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
log.WithError(err).Debug("Could not decode stream message")
log.WithError(err).Trace("Could not decode stream message")
return
}
if err := handle(context.Background(), nTyp.Elem().Interface(), stream); err != nil {