mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 13:28:01 -05:00
Compare commits
16 Commits
d929e1dcaa
...
terence
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
92a88e1ff2 | ||
|
|
f1c6f07e87 | ||
|
|
441b7ee4b7 | ||
|
|
b981477ae6 | ||
|
|
535c1cb5c3 | ||
|
|
6619063187 | ||
|
|
c5e0caf80c | ||
|
|
6f3dec79a4 | ||
|
|
99491be4a0 | ||
|
|
8f3f278edd | ||
|
|
e80bffb1f0 | ||
|
|
3058bc1661 | ||
|
|
830d331c6d | ||
|
|
0960fba6d1 | ||
|
|
2cec1b6699 | ||
|
|
439de222e2 |
@@ -71,6 +71,7 @@ go_library(
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//crypto/bls:go_default_library",
|
||||
"//encoding/bytesutil:go_default_library",
|
||||
"//io/file:go_default_library",
|
||||
"//math:go_default_library",
|
||||
"//monitoring/tracing:go_default_library",
|
||||
"//proto/engine/v1:go_default_library",
|
||||
|
||||
@@ -3,6 +3,10 @@ package blockchain
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
"runtime/pprof"
|
||||
time2 "time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
|
||||
@@ -15,6 +19,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/v4/io/file"
|
||||
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
|
||||
ethpbv1 "github.com/prysmaticlabs/prysm/v4/proto/eth/v1"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
@@ -55,6 +60,11 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
|
||||
return err
|
||||
}
|
||||
|
||||
bf := bytes.NewBuffer([]byte{})
|
||||
if err := pprof.StartCPUProfile(bf); err != nil {
|
||||
log.WithError(err).Error("could not start cpu profile")
|
||||
}
|
||||
|
||||
preState, err := s.getBlockPreState(ctx, blockCopy.Block())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get block's prestate")
|
||||
@@ -163,6 +173,14 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
|
||||
log.WithError(err).Error("Unable to log state transition data")
|
||||
}
|
||||
|
||||
pprof.StopCPUProfile()
|
||||
if time.Since(receivedTime) > 3/2*time2.Second {
|
||||
dbPath := path.Join("/home/t", fmt.Sprintf("processing_%d_%d.profile", blockCopy.Block().Slot(), time.Since(receivedTime).Milliseconds()))
|
||||
if err = file.WriteFile(dbPath, bf.Bytes()); err != nil {
|
||||
log.WithError(err).Error("could not write profile")
|
||||
}
|
||||
}
|
||||
|
||||
chainServiceProcessingTime.Observe(float64(time.Since(receivedTime).Milliseconds()))
|
||||
|
||||
return nil
|
||||
|
||||
@@ -97,7 +97,7 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
|
||||
"direction": conn.Stat().Direction,
|
||||
"multiAddr": peerMultiaddrString(conn),
|
||||
"activePeers": len(s.peers.Active()),
|
||||
}).Debug("Peer connected")
|
||||
}).Trace("Peer connected")
|
||||
}
|
||||
|
||||
// Do not perform handshake on inbound dials.
|
||||
@@ -173,7 +173,7 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
|
||||
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
|
||||
// Only log disconnections if we were fully connected.
|
||||
if priorState == peers.PeerConnected {
|
||||
log.WithField("activePeers", len(s.peers.Active())).Debug("Peer disconnected")
|
||||
log.WithField("activePeers", len(s.peers.Active())).Trace("Peer disconnected")
|
||||
}
|
||||
}()
|
||||
},
|
||||
|
||||
@@ -76,6 +76,7 @@ go_library(
|
||||
"//crypto/rand:go_default_library",
|
||||
"//encoding/bytesutil:go_default_library",
|
||||
"//encoding/ssz:go_default_library",
|
||||
"//io/file:go_default_library",
|
||||
"//monitoring/tracing:go_default_library",
|
||||
"//network/forks:go_default_library",
|
||||
"//proto/engine/v1:go_default_library",
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
package validator
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"path"
|
||||
"runtime/pprof"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -12,6 +15,8 @@ import (
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
emptypb "github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/builder"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
|
||||
@@ -25,6 +30,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v4/io/file"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -42,6 +48,17 @@ const (
|
||||
eth1dataTimeout = 2 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
blockProposalTimeMilliseconds = promauto.NewSummary(prometheus.SummaryOpts{
|
||||
Name: "block_proposal_time_milliseconds",
|
||||
Help: "Total time in milliseconds to propose a block",
|
||||
})
|
||||
localPayloadValue = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "local_payload_value_gwei",
|
||||
Help: "The value of the local payload",
|
||||
})
|
||||
)
|
||||
|
||||
// GetBeaconBlock is called by a proposer during its assigned slot to request a block to sign
|
||||
// by passing in the slot and the signed randao reveal of the slot.
|
||||
func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (*ethpb.GenericBeaconBlock, error) {
|
||||
@@ -58,6 +75,12 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
|
||||
"sinceSlotStartTime": time.Since(t),
|
||||
}).Info("Begin building block")
|
||||
|
||||
bf := bytes.NewBuffer([]byte{})
|
||||
startTime := time.Now()
|
||||
if err := pprof.StartCPUProfile(bf); err != nil {
|
||||
log.WithError(err).Error("could not start cpu profile")
|
||||
}
|
||||
|
||||
// A syncing validator should not produce a block.
|
||||
if vs.SyncChecker.Syncing() {
|
||||
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
|
||||
@@ -169,6 +192,16 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
|
||||
"validator": sBlk.Block().ProposerIndex(),
|
||||
}).Info("Finished building block")
|
||||
|
||||
blockProposalTimeMilliseconds.Observe(float64(time.Since(t).Milliseconds()))
|
||||
|
||||
pprof.StopCPUProfile()
|
||||
if time.Since(startTime) > 500*time.Millisecond {
|
||||
dbPath := path.Join("/home/t", fmt.Sprintf("%d_%d_proposer.profile", req.Slot, time.Since(startTime).Milliseconds()))
|
||||
if err = file.WriteFile(dbPath, bf.Bytes()); err != nil {
|
||||
log.WithError(err).Error("could not write profile")
|
||||
}
|
||||
}
|
||||
|
||||
pb, err := sBlk.Block().Proto()
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "Could not convert block to proto: %v", err)
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/altair"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
|
||||
@@ -15,7 +14,6 @@ import (
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/attestation/aggregation"
|
||||
attaggregation "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/attestation/aggregation/attestations"
|
||||
"github.com/prysmaticlabs/prysm/v4/runtime/version"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
@@ -83,26 +81,9 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon
|
||||
func (a proposerAtts) filter(ctx context.Context, st state.BeaconState) (proposerAtts, proposerAtts) {
|
||||
validAtts := make([]*ethpb.Attestation, 0, len(a))
|
||||
invalidAtts := make([]*ethpb.Attestation, 0, len(a))
|
||||
var attestationProcessor func(context.Context, state.BeaconState, *ethpb.Attestation) (state.BeaconState, error)
|
||||
|
||||
if st.Version() == version.Phase0 {
|
||||
attestationProcessor = blocks.ProcessAttestationNoVerifySignature
|
||||
} else if st.Version() >= version.Altair {
|
||||
// Use a wrapper here, as go needs strong typing for the function signature.
|
||||
attestationProcessor = func(ctx context.Context, st state.BeaconState, attestation *ethpb.Attestation) (state.BeaconState, error) {
|
||||
totalBalance, err := helpers.TotalActiveBalance(st)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return altair.ProcessAttestationNoVerifySignature(ctx, st, attestation, totalBalance)
|
||||
}
|
||||
} else {
|
||||
// Exit early if there is an unknown state type.
|
||||
return validAtts, invalidAtts
|
||||
}
|
||||
|
||||
for _, att := range a {
|
||||
if _, err := attestationProcessor(ctx, st, att); err == nil {
|
||||
if err := blocks.VerifyAttestationNoVerifySignature(ctx, st, att); err == nil {
|
||||
validAtts = append(validAtts, att)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -53,6 +53,11 @@ func setExecutionData(ctx context.Context, blk interfaces.SignedBeaconBlock, loc
|
||||
if localPayload == nil {
|
||||
return errors.New("local payload is nil")
|
||||
}
|
||||
localValueGwei, err := localPayload.ValueInGwei()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to get local payload value")
|
||||
}
|
||||
localPayloadValue.Set(float64(localValueGwei))
|
||||
|
||||
// Use local payload if builder payload is nil.
|
||||
if builderPayload == nil {
|
||||
|
||||
@@ -54,7 +54,7 @@ func (vs *Server) getLocalPayload(ctx context.Context, blk interfaces.ReadOnlyBe
|
||||
slot := blk.Slot()
|
||||
vIdx := blk.ProposerIndex()
|
||||
headRoot := blk.ParentRoot()
|
||||
proposerID, payloadId, ok := vs.ProposerSlotIndexCache.GetProposerPayloadIDs(slot, headRoot)
|
||||
_, payloadId, ok := vs.ProposerSlotIndexCache.GetProposerPayloadIDs(slot, headRoot)
|
||||
feeRecipient := params.BeaconConfig().DefaultFeeRecipient
|
||||
recipient, err := vs.BeaconDB.FeeRecipientByValidatorID(ctx, vIdx)
|
||||
switch err == nil {
|
||||
@@ -76,7 +76,7 @@ func (vs *Server) getLocalPayload(ctx context.Context, blk interfaces.ReadOnlyBe
|
||||
return nil, errors.Wrap(err, "could not get fee recipient in db")
|
||||
}
|
||||
|
||||
if ok && proposerID == vIdx && payloadId != [8]byte{} { // Payload ID is cache hit. Return the cached payload ID.
|
||||
if ok && payloadId != [8]byte{} { // Payload ID is cache hit. Return the cached payload ID.
|
||||
var pid [8]byte
|
||||
copy(pid[:], payloadId[:])
|
||||
payloadIDCacheHit.Inc()
|
||||
|
||||
@@ -31,6 +31,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/v4/network/forks"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
@@ -186,3 +187,32 @@ func (vs *Server) WaitForChainStart(_ *emptypb.Empty, stream ethpb.BeaconNodeVal
|
||||
}
|
||||
return stream.Send(res)
|
||||
}
|
||||
|
||||
func (vs *Server) ProposeEverySlot() {
|
||||
for vs.TimeFetcher.GenesisTime().IsZero() {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
genTime := vs.TimeFetcher.GenesisTime()
|
||||
|
||||
ticker := slots.NewSlotTicker(genTime, params.BeaconConfig().SecondsPerSlot)
|
||||
for {
|
||||
select {
|
||||
case <-vs.Ctx.Done():
|
||||
ticker.Done()
|
||||
return
|
||||
case slot := <-ticker.C():
|
||||
curr := time.Now()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
_, err := vs.GetBeaconBlock(context.Background(), ðpb.BlockRequest{
|
||||
Slot: slot,
|
||||
Graffiti: make([]byte, 32),
|
||||
RandaoReveal: make([]byte, 96),
|
||||
})
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
log.Infof("Successfully produced block %d in %s", slot, time.Since(curr).String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -443,6 +443,8 @@ func (s *Service) Start() {
|
||||
// Register reflection service on gRPC server.
|
||||
reflection.Register(s.grpcServer)
|
||||
|
||||
go validatorServer.ProposeEverySlot()
|
||||
|
||||
go func() {
|
||||
if s.listener != nil {
|
||||
if err := s.grpcServer.Serve(s.listener); err != nil {
|
||||
|
||||
@@ -210,7 +210,7 @@ func (q *blocksQueue) loop() {
|
||||
"epoch": slots.ToEpoch(fsm.start),
|
||||
"start": fsm.start,
|
||||
"error": err.Error(),
|
||||
}).Debug("Can not trigger event")
|
||||
}).Trace("Can not trigger event")
|
||||
if errors.Is(err, errNoRequiredPeers) {
|
||||
forceExit := q.exitConditions.noRequiredPeersErrRetries > noRequiredPeersErrMaxRetries
|
||||
if q.mode == modeStopOnFinalizedEpoch || forceExit {
|
||||
|
||||
@@ -189,7 +189,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
|
||||
return
|
||||
}
|
||||
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
|
||||
log.WithError(err).WithField("topic", topic).Debug("Could not decode stream message")
|
||||
log.WithError(err).WithField("topic", topic).Trace("Could not decode stream message")
|
||||
tracing.AnnotateError(span, err)
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||
return
|
||||
@@ -209,7 +209,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
|
||||
return
|
||||
}
|
||||
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
|
||||
log.WithError(err).WithField("topic", topic).Debug("Could not decode stream message")
|
||||
log.WithError(err).WithField("topic", topic).Trace("Could not decode stream message")
|
||||
tracing.AnnotateError(span, err)
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||
return
|
||||
|
||||
@@ -45,8 +45,10 @@ func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream l
|
||||
return err
|
||||
}
|
||||
s.rateLimiter.add(stream, 1)
|
||||
log := log.WithField("Reason", goodbyeMessage(*m))
|
||||
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Peer has sent a goodbye message")
|
||||
if goodbyeMessage(*m) != "client has too many peers" {
|
||||
log := log.WithField("Reason", goodbyeMessage(*m))
|
||||
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Peer has sent a goodbye message")
|
||||
}
|
||||
s.cfg.p2p.Peers().SetNextValidTime(stream.Conn().RemotePeer(), goodByeBackoff(*m))
|
||||
// closes all streams with the peer
|
||||
return s.cfg.p2p.Disconnect(stream.Conn().RemotePeer())
|
||||
@@ -87,7 +89,7 @@ func (s *Service) sendGoodByeAndDisconnect(ctx context.Context, code p2ptypes.RP
|
||||
log.WithFields(logrus.Fields{
|
||||
"error": err,
|
||||
"peer": id,
|
||||
}).Debug("Could not send goodbye message to peer")
|
||||
}).Trace("Could not send goodbye message to peer")
|
||||
}
|
||||
return s.cfg.p2p.Disconnect(id)
|
||||
}
|
||||
|
||||
@@ -73,7 +73,7 @@ func (c *client) registerRPCHandler(baseTopic string, handle rpcHandler) {
|
||||
log.WithError(err).Debug("Could not decode goodbye stream message")
|
||||
return
|
||||
}
|
||||
log.WithError(err).Debug("Could not decode stream message")
|
||||
log.WithError(err).Trace("Could not decode stream message")
|
||||
return
|
||||
}
|
||||
if err := handle(context.Background(), msg, stream); err != nil {
|
||||
@@ -89,7 +89,7 @@ func (c *client) registerRPCHandler(baseTopic string, handle rpcHandler) {
|
||||
return
|
||||
}
|
||||
if err := c.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
|
||||
log.WithError(err).Debug("Could not decode stream message")
|
||||
log.WithError(err).Trace("Could not decode stream message")
|
||||
return
|
||||
}
|
||||
if err := handle(context.Background(), nTyp.Elem().Interface(), stream); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user