Verified roblobs (#13190)

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
kasey
2023-11-21 12:44:38 -06:00
committed by GitHub
parent a7edec9b98
commit 1123df7432
68 changed files with 1263 additions and 913 deletions

View File

@@ -14,9 +14,11 @@ import (
p2ptypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"github.com/prysmaticlabs/prysm/v4/time/slots"
@@ -144,7 +146,7 @@ func SendBeaconBlocksByRootRequest(
return blocks, nil
}
func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle, p2pApi p2p.SenderEncoder, pid peer.ID, ctxMap ContextByteVersions, req *pb.BlobSidecarsByRangeRequest) ([]*pb.DeprecatedBlobSidecar, error) {
func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle, p2pApi p2p.SenderEncoder, pid peer.ID, ctxMap ContextByteVersions, req *pb.BlobSidecarsByRangeRequest) ([]blocks.ROBlob, error) {
topic, err := p2p.TopicFromMessage(p2p.BlobSidecarsByRangeName, slots.ToEpoch(tor.CurrentSlot()))
if err != nil {
return nil, err
@@ -166,7 +168,7 @@ func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle,
func SendBlobSidecarByRoot(
ctx context.Context, tor blockchain.TemporalOracle, p2pApi p2p.P2P, pid peer.ID,
ctxMap ContextByteVersions, req *p2ptypes.BlobSidecarsByRootReq,
) ([]*pb.DeprecatedBlobSidecar, error) {
) ([]blocks.ROBlob, error) {
if uint64(len(*req)) > params.BeaconNetworkConfig().MaxRequestBlobSidecars {
return nil, errors.Wrapf(p2ptypes.ErrMaxBlobReqExceeded, "length=%d", len(*req))
}
@@ -189,16 +191,16 @@ func SendBlobSidecarByRoot(
return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRootReq(req), max)
}
type blobResponseValidation func(*pb.DeprecatedBlobSidecar) error
type blobResponseValidation func(blocks.ROBlob) error
func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseValidation {
roots := make(map[[32]byte]bool)
for _, sc := range *req {
roots[bytesutil.ToBytes32(sc.BlockRoot)] = true
}
return func(sc *pb.DeprecatedBlobSidecar) error {
if requested := roots[bytesutil.ToBytes32(sc.BlockRoot)]; !requested {
return errors.Wrapf(errUnrequestedRoot, "root=%#x", sc.BlockRoot)
return func(sc blocks.ROBlob) error {
if requested := roots[sc.BlockRoot()]; !requested {
return errors.Wrapf(errUnrequestedRoot, "root=%#x", sc.BlockRoot())
}
return nil
}
@@ -206,16 +208,16 @@ func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseV
func blobValidatorFromRangeReq(req *pb.BlobSidecarsByRangeRequest) blobResponseValidation {
end := req.StartSlot + primitives.Slot(req.Count)
return func(sc *pb.DeprecatedBlobSidecar) error {
if sc.Slot < req.StartSlot || sc.Slot >= end {
return errors.Wrapf(errBlobResponseOutOfBounds, "req start,end:%d,%d, resp:%d", req.StartSlot, end, sc.Slot)
return func(sc blocks.ROBlob) error {
if sc.Slot() < req.StartSlot || sc.Slot() >= end {
return errors.Wrapf(errBlobResponseOutOfBounds, "req start,end:%d,%d, resp:%d", req.StartSlot, end, sc.Slot())
}
return nil
}
}
func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation, max uint64) ([]*pb.DeprecatedBlobSidecar, error) {
sidecars := make([]*pb.DeprecatedBlobSidecar, 0)
func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation, max uint64) ([]blocks.ROBlob, error) {
sidecars := make([]blocks.ROBlob, 0)
// Attempt an extra read beyond max to check if the peer is violating the spec by
// sending more than MAX_REQUEST_BLOB_SIDECARS, or more blobs than requested.
for i := uint64(0); i < max+1; i++ {
@@ -238,7 +240,9 @@ func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncodi
return sidecars, nil
}
func readChunkedBlobSidecar(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation) (*pb.DeprecatedBlobSidecar, error) {
func readChunkedBlobSidecar(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation) (blocks.ROBlob, error) {
var b blocks.ROBlob
pb := &ethpb.BlobSidecar{}
decode := encoding.DecodeWithMaxLength
var (
code uint8
@@ -246,31 +250,35 @@ func readChunkedBlobSidecar(stream network.Stream, encoding encoder.NetworkEncod
)
code, msg, err := ReadStatusCode(stream, encoding)
if err != nil {
return nil, err
return b, err
}
if code != 0 {
return nil, errors.Wrap(errBlobChunkedReadFailure, msg)
return b, errors.Wrap(errBlobChunkedReadFailure, msg)
}
ctxb, err := readContextFromStream(stream)
if err != nil {
return nil, errors.Wrap(err, "error reading chunk context bytes from stream")
return b, errors.Wrap(err, "error reading chunk context bytes from stream")
}
v, found := ctxMap[bytesutil.ToBytes4(ctxb)]
if !found {
return nil, errors.Wrapf(errBlobUnmarshal, fmt.Sprintf("unrecognized fork digest %#x", ctxb))
return b, errors.Wrapf(errBlobUnmarshal, fmt.Sprintf("unrecognized fork digest %#x", ctxb))
}
// Only deneb is supported at this time, because we lack a fork-spanning interface/union type for blobs.
if v != version.Deneb {
return nil, fmt.Errorf("unexpected context bytes for deneb BlobSidecar, ctx=%#x, v=%s", ctxb, version.String(v))
return b, fmt.Errorf("unexpected context bytes for deneb BlobSidecar, ctx=%#x, v=%s", ctxb, version.String(v))
}
sc := &pb.DeprecatedBlobSidecar{}
if err := decode(stream, sc); err != nil {
return nil, errors.Wrap(err, "failed to decode the protobuf-encoded BlobSidecar message from RPC chunk stream")
}
if err := vf(sc); err != nil {
return nil, errors.Wrap(err, "validation failure decoding blob RPC response")
if err := decode(stream, pb); err != nil {
return b, errors.Wrap(err, "failed to decode the protobuf-encoded BlobSidecar message from RPC chunk stream")
}
return sc, nil
rob, err := blocks.NewROBlob(pb)
if err != nil {
return b, errors.Wrap(err, "unexpected error initializing ROBlob")
}
if err := vf(rob); err != nil {
return b, errors.Wrap(err, "validation failure decoding blob RPC response")
}
return rob, nil
}