mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-08 23:18:15 -05:00
* Ran gopls modernize to fix everything go run golang.org/x/tools/gopls/internal/analysis/modernize/cmd/modernize@latest -fix -test ./... * Override rules_go provided dependency for golang.org/x/tools to v0.38.0. To update this, checked out rules_go, then ran `bazel run //go/tools/releaser -- upgrade-dep -mirror=false org_golang_x_tools` and copied the patches. * Fix buildtag violations and ignore buildtag violations in external * Introduce modernize analyzer package. * Add modernize "any" analyzer. * Fix violations of any analyzer * Add modernize "appendclipped" analyzer. * Fix violations of appendclipped * Add modernize "bloop" analyzer. * Add modernize "fmtappendf" analyzer. * Add modernize "forvar" analyzer. * Add modernize "mapsloop" analyzer. * Add modernize "minmax" analyzer. * Fix violations of minmax analyzer * Add modernize "omitzero" analyzer. * Add modernize "rangeint" analyzer. * Fix violations of rangeint. * Add modernize "reflecttypefor" analyzer. * Fix violations of reflecttypefor analyzer. * Add modernize "slicescontains" analyzer. * Add modernize "slicessort" analyzer. * Add modernize "slicesdelete" analyzer. This is disabled by default for now. See https://go.dev/issue/73686. * Add modernize "stringscutprefix" analyzer. * Add modernize "stringsbuilder" analyzer. * Fix violations of stringsbuilder analyzer. * Add modernize "stringsseq" analyzer. * Add modernize "testingcontext" analyzer. * Add modernize "waitgroup" analyzer. * Changelog fragment * gofmt * gazelle * Add modernize "newexpr" analyzer. * Disable newexpr until go1.26 * Add more details in WORKSPACE on how to update the override * @nalepae feedback on min() * gofmt * Fix violations of forvar
108 lines
3.5 KiB
Go
108 lines
3.5 KiB
Go
package p2p
|
|
|
|
import (
|
|
"context"
|
|
"reflect"
|
|
"runtime/debug"
|
|
"strings"
|
|
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
|
|
p2ptypes "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/types"
|
|
libp2pcore "github.com/libp2p/go-libp2p/core"
|
|
corenet "github.com/libp2p/go-libp2p/core/network"
|
|
"github.com/libp2p/go-libp2p/core/protocol"
|
|
"github.com/pkg/errors"
|
|
ssz "github.com/prysmaticlabs/fastssz"
|
|
)
|
|
|
|
type rpcHandler func(context.Context, any, libp2pcore.Stream) error
|
|
|
|
// registerRPC for a given topic with an expected protobuf message type.
|
|
func (c *client) registerRPCHandler(baseTopic string, handle rpcHandler) {
|
|
topic := baseTopic + c.Encoding().ProtocolSuffix()
|
|
c.host.SetStreamHandler(protocol.ID(topic), func(stream corenet.Stream) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.WithField("error", r).Error("Panic occurred")
|
|
log.Errorf("%s", debug.Stack())
|
|
}
|
|
}()
|
|
// Resetting after closing is a no-op so defer a reset in case something goes wrong.
|
|
// It's up to the handler to Close the stream (send an EOF) if
|
|
// it successfully writes a response. We don't blindly call
|
|
// Close here because we may have only written a partial
|
|
// response.
|
|
defer func() {
|
|
_err := stream.Reset()
|
|
_ = _err
|
|
}()
|
|
|
|
log.WithField("peer", stream.Conn().RemotePeer().String()).WithField("topic", string(stream.Protocol()))
|
|
|
|
base, ok := p2p.RPCTopicMappings[baseTopic]
|
|
if !ok {
|
|
log.Errorf("Could not retrieve base message for topic %s", baseTopic)
|
|
return
|
|
}
|
|
t := reflect.TypeOf(base)
|
|
// Copy Base
|
|
base = reflect.New(t)
|
|
|
|
// since metadata requests do not have any data in the payload, we
|
|
// do not decode anything.
|
|
if baseTopic == p2p.RPCMetaDataTopicV1 || baseTopic == p2p.RPCMetaDataTopicV2 || baseTopic == p2p.RPCMetaDataTopicV3 {
|
|
if err := handle(context.Background(), base, stream); err != nil {
|
|
if !errors.Is(err, p2ptypes.ErrWrongForkDigestVersion) {
|
|
log.WithError(err).Debug("Could not handle p2p RPC")
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// Given we have an input argument that can be pointer or the actual object, this gives us
|
|
// a way to check for its reflect.Kind and based on the result, we can decode
|
|
// accordingly.
|
|
if t.Kind() == reflect.Ptr {
|
|
msg, ok := reflect.New(t.Elem()).Interface().(ssz.Unmarshaler)
|
|
if !ok {
|
|
log.Errorf("message of %T does not support marshaller interface", msg)
|
|
return
|
|
}
|
|
if err := c.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
|
|
// Trace logs for goodbye errors
|
|
logStreamErrors(err, topic)
|
|
return
|
|
}
|
|
if err := handle(context.Background(), msg, stream); err != nil {
|
|
if !errors.Is(err, p2ptypes.ErrWrongForkDigestVersion) {
|
|
log.WithError(err).Debug("Could not handle p2p RPC")
|
|
}
|
|
}
|
|
} else {
|
|
nTyp := reflect.New(t)
|
|
msg, ok := nTyp.Interface().(ssz.Unmarshaler)
|
|
if !ok {
|
|
log.Errorf("message of %T does not support marshaller interface", msg)
|
|
return
|
|
}
|
|
if err := c.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
|
|
logStreamErrors(err, topic)
|
|
return
|
|
}
|
|
if err := handle(context.Background(), nTyp.Elem().Interface(), stream); err != nil {
|
|
if !errors.Is(err, p2ptypes.ErrWrongForkDigestVersion) {
|
|
log.WithError(err).Debug("Could not handle p2p RPC")
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func logStreamErrors(err error, topic string) {
|
|
if strings.Contains(topic, p2p.RPCGoodByeTopicV1) {
|
|
log.WithError(err).Trace("Could not decode goodbye stream message")
|
|
return
|
|
}
|
|
log.WithError(err).Debug("Could not decode stream message")
|
|
}
|