Files
prysm/cmd/prysmctl/p2p/request_blocks.go
Preston Van Loon 2fd6bd8150 Add golang.org/x/tools modernize static analyzer and fix violations (#15946)
* Ran gopls modernize to fix everything

go run golang.org/x/tools/gopls/internal/analysis/modernize/cmd/modernize@latest -fix -test ./...

* Override rules_go provided dependency for golang.org/x/tools to v0.38.0.

To update this, checked out rules_go, then ran `bazel run //go/tools/releaser -- upgrade-dep -mirror=false org_golang_x_tools` and copied the patches.

* Fix buildtag violations and ignore buildtag violations in external

* Introduce modernize analyzer package.

* Add modernize "any" analyzer.

* Fix violations of any analyzer

* Add modernize "appendclipped" analyzer.

* Fix violations of appendclipped

* Add modernize "bloop" analyzer.

* Add modernize "fmtappendf" analyzer.

* Add modernize "forvar" analyzer.

* Add modernize "mapsloop" analyzer.

* Add modernize "minmax" analyzer.

* Fix violations of minmax analyzer

* Add modernize "omitzero" analyzer.

* Add modernize "rangeint" analyzer.

* Fix violations of rangeint.

* Add modernize "reflecttypefor" analyzer.

* Fix violations of reflecttypefor analyzer.

* Add modernize "slicescontains" analyzer.

* Add modernize "slicessort" analyzer.

* Add modernize "slicesdelete" analyzer. This is disabled by default for now. See https://go.dev/issue/73686.

* Add modernize "stringscutprefix" analyzer.

* Add modernize "stringsbuilder" analyzer.

* Fix violations of stringsbuilder analyzer.

* Add modernize "stringsseq" analyzer.

* Add modernize "testingcontext" analyzer.

* Add modernize "waitgroup" analyzer.

* Changelog fragment

* gofmt

* gazelle

* Add modernize "newexpr" analyzer.

* Disable newexpr until go1.26

* Add more details in WORKSPACE on how to update the override

* @nalepae feedback on min()

* gofmt

* Fix violations of forvar
2025-11-14 01:27:22 +00:00

260 lines
7.1 KiB
Go

package p2p
import (
"context"
"strings"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
p2ptypes "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v7/beacon-chain/sync"
"github.com/OffchainLabs/prysm/v7/cmd"
"github.com/OffchainLabs/prysm/v7/config/params"
consensus_types "github.com/OffchainLabs/prysm/v7/consensus-types"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
pb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/time/slots"
libp2pcore "github.com/libp2p/go-libp2p/core"
corenet "github.com/libp2p/go-libp2p/core/network"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
"google.golang.org/protobuf/types/known/emptypb"
)
var requestBlocksFlags = struct {
Network string
Peers string
ClientPortTCP uint
ClientPortQUIC uint
APIEndpoints string
StartSlot uint64
Count uint64
Step uint64
}{}
var requestBlocksCmd = &cli.Command{
Name: "beacon-blocks-by-range",
Usage: "Request a range of blocks from a beacon node via a p2p connection",
Action: func(cliCtx *cli.Context) error {
if err := cliActionRequestBlocks(cliCtx); err != nil {
log.WithError(err).Fatal("Could not request blocks by range")
}
return nil
},
Flags: []cli.Flag{
cmd.ChainConfigFileFlag,
&cli.StringFlag{
Name: "network",
Usage: "network to run on (mainnet, sepolia, holesky)",
Destination: &requestBlocksFlags.Network,
Value: "mainnet",
},
&cli.StringFlag{
Name: "peer-multiaddrs",
Usage: "comma-separated, peer multiaddr(s) to connect to for p2p requests",
Destination: &requestBlocksFlags.Peers,
Value: "",
},
&cli.UintFlag{
Name: "client-port-tcp",
Aliases: []string{"client-port"},
Usage: "TCP port to use for the client as a libp2p host",
Destination: &requestBlocksFlags.ClientPortTCP,
Value: 13001,
},
&cli.UintFlag{
Name: "client-port-quic",
Usage: "QUIC port to use for the client as a libp2p host",
Destination: &requestBlocksFlags.ClientPortQUIC,
Value: 13001,
},
&cli.StringFlag{
Name: "prysm-api-endpoints",
Usage: "comma-separated, gRPC API endpoint(s) for Prysm beacon node(s)",
Destination: &requestBlocksFlags.APIEndpoints,
Value: "localhost:4000",
},
&cli.Uint64Flag{
Name: "start-slot",
Usage: "start slot for blocks by range request. If unset, will use start_slot(current_epoch-1)",
Destination: &requestBlocksFlags.StartSlot,
Value: 0,
},
&cli.Uint64Flag{
Name: "count",
Usage: "number of blocks to request, (default 32)",
Destination: &requestBlocksFlags.Count,
Value: 32,
},
&cli.Uint64Flag{
Name: "step",
Usage: "number of steps of blocks in the range request, (default 1)",
Destination: &requestBlocksFlags.Step,
Value: 1,
},
},
}
func cliActionRequestBlocks(cliCtx *cli.Context) error {
switch requestBlocksFlags.Network {
case params.SepoliaName:
if err := params.SetActive(params.SepoliaConfig()); err != nil {
log.Fatal(err)
}
case params.HoleskyName:
if err := params.SetActive(params.HoleskyConfig()); err != nil {
log.Fatal(err)
}
case params.HoodiName:
if err := params.SetActive(params.HoodiConfig()); err != nil {
log.Fatal(err)
}
case params.MainnetName:
// Do nothing
default:
log.Fatalf("Unknown network provided: %s", requestBlocksFlags.Network)
}
if cliCtx.IsSet(cmd.ChainConfigFileFlag.Name) {
chainConfigFileName := cliCtx.String(cmd.ChainConfigFileFlag.Name)
if err := params.LoadChainConfigFile(chainConfigFileName, nil); err != nil {
return err
}
}
p2ptypes.InitializeDataMaps()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
allAPIEndpoints := make([]string, 0)
if requestBlocksFlags.APIEndpoints != "" {
allAPIEndpoints = strings.Split(requestBlocksFlags.APIEndpoints, ",")
}
var err error
c, err := newClient(allAPIEndpoints, requestBlocksFlags.ClientPortTCP, requestBlocksFlags.ClientPortQUIC)
if err != nil {
return err
}
defer c.Close()
allPeers := make([]string, 0)
if requestBlocksFlags.Peers != "" {
allPeers = strings.Split(requestBlocksFlags.Peers, ",")
}
if len(allPeers) == 0 {
allPeers, err = c.retrievePeerAddressesViaRPC(ctx, allAPIEndpoints)
if err != nil {
return err
}
}
if len(allPeers) == 0 {
return errors.New("no peers found")
}
log.WithField("peers", allPeers).Info("List of peers")
chain, err := c.initializeMockChainService(ctx)
if err != nil {
return err
}
c.registerHandshakeHandlers()
c.registerRPCHandler(p2p.RPCBlocksByRangeTopicV1, func(
ctx context.Context, i any, stream libp2pcore.Stream,
) error {
return nil
})
c.registerRPCHandler(p2p.RPCBlocksByRangeTopicV2, func(
ctx context.Context, i any, stream libp2pcore.Stream,
) error {
return nil
})
if err := c.connectToPeers(ctx, allPeers...); err != nil {
return err
}
startSlot := primitives.Slot(requestBlocksFlags.StartSlot)
var headSlot *primitives.Slot
if startSlot == 0 {
headResp, err := c.beaconClient.GetChainHead(ctx, &emptypb.Empty{})
if err != nil {
return err
}
startSlot, err = slots.EpochStart(headResp.HeadEpoch.Sub(1))
if err != nil {
return err
}
headSlot = &headResp.HeadSlot
}
// Submit requests.
for _, pr := range c.host.Peerstore().Peers() {
if pr.String() == c.host.ID().String() {
continue
}
req := &pb.BeaconBlocksByRangeRequest{
StartSlot: startSlot,
Count: requestBlocksFlags.Count,
Step: requestBlocksFlags.Step,
}
fields := logrus.Fields{
"startSlot": startSlot,
"count": requestBlocksFlags.Count,
"step": requestBlocksFlags.Step,
"peer": pr.String(),
}
if headSlot != nil {
fields["headSlot"] = *headSlot
}
log.WithFields(fields).Info("Sending blocks by range p2p request to peer")
start := time.Now()
blocks, err := sync.SendBeaconBlocksByRangeRequest(
ctx,
chain,
c,
pr,
req,
nil, /* no extra block processing */
)
if err != nil {
return err
}
end := time.Since(start)
totalExecutionBlocks := 0
for _, blk := range blocks {
exec, err := blk.Block().Body().Execution()
switch {
case errors.Is(err, consensus_types.ErrUnsupportedField):
continue
case err != nil:
log.WithError(err).Error("Could not read execution data from block body")
continue
default:
}
_, err = exec.Transactions()
switch {
case errors.Is(err, consensus_types.ErrUnsupportedField):
continue
case err != nil:
log.WithError(err).Error("Could not read transactions block execution payload")
continue
default:
}
totalExecutionBlocks++
}
log.WithFields(logrus.Fields{
"numBlocks": len(blocks),
"peer": pr.String(),
"timeFromSendingToProcessingResponse": end,
"totalBlocksWithExecutionPayloads": totalExecutionBlocks,
}).Info("Received blocks from peer")
}
return nil
}
func closeStream(stream corenet.Stream) {
if err := stream.Close(); err != nil {
log.Println(err)
}
}