mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-08 23:18:15 -05:00
Ensure SubmitAttesterSlashingsV2 returns immediately when the Eth-Consensus-Version header is missing. Without this early return the handler calls version.FromString with an empty value and writes a second JSON error to the response, producing invalid JSON and duplicating error output. This change aligns the handler with the error-handling pattern used in other endpoints that validate the version header.
878 lines
30 KiB
Go
878 lines
30 KiB
Go
package beacon
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/OffchainLabs/prysm/v7/api"
|
|
"github.com/OffchainLabs/prysm/v7/api/server"
|
|
"github.com/OffchainLabs/prysm/v7/api/server/structs"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/blocks"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/operation"
|
|
corehelpers "github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/eth/shared"
|
|
"github.com/OffchainLabs/prysm/v7/config/features"
|
|
"github.com/OffchainLabs/prysm/v7/config/params"
|
|
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
|
mvslice "github.com/OffchainLabs/prysm/v7/container/multi-value-slice"
|
|
"github.com/OffchainLabs/prysm/v7/crypto/bls"
|
|
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
|
|
"github.com/OffchainLabs/prysm/v7/network/httputil"
|
|
eth "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
|
"github.com/OffchainLabs/prysm/v7/runtime/version"
|
|
"github.com/OffchainLabs/prysm/v7/time/slots"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const broadcastBLSChangesRateLimit = 128
|
|
|
|
// ListAttestationsV2 retrieves attestations known by the node but
|
|
// not necessarily incorporated into any block. Allows filtering by committee index or slot.
|
|
func (s *Server) ListAttestationsV2(w http.ResponseWriter, r *http.Request) {
|
|
_, span := trace.StartSpan(r.Context(), "beacon.ListAttestationsV2")
|
|
defer span.End()
|
|
|
|
rawSlot, slot, ok := shared.UintFromQuery(w, r, "slot", false)
|
|
if !ok {
|
|
return
|
|
}
|
|
rawCommitteeIndex, committeeIndex, ok := shared.UintFromQuery(w, r, "committee_index", false)
|
|
if !ok {
|
|
return
|
|
}
|
|
v := slots.ToForkVersion(primitives.Slot(slot))
|
|
if rawSlot == "" {
|
|
v = slots.ToForkVersion(s.TimeFetcher.CurrentSlot())
|
|
}
|
|
|
|
var attestations []eth.Att
|
|
if features.Get().EnableExperimentalAttestationPool {
|
|
attestations = s.AttestationCache.GetAll()
|
|
} else {
|
|
attestations = s.AttestationsPool.AggregatedAttestations()
|
|
unaggAtts := s.AttestationsPool.UnaggregatedAttestations()
|
|
attestations = append(attestations, unaggAtts...)
|
|
}
|
|
|
|
filteredAtts := make([]any, 0, len(attestations))
|
|
for _, att := range attestations {
|
|
var includeAttestation bool
|
|
if v >= version.Electra && att.Version() >= version.Electra {
|
|
attElectra, ok := att.(*eth.AttestationElectra)
|
|
if !ok {
|
|
httputil.HandleError(w, fmt.Sprintf("Unable to convert attestation of type %T", att), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
includeAttestation = shouldIncludeAttestation(attElectra, rawSlot, slot, rawCommitteeIndex, committeeIndex)
|
|
if includeAttestation {
|
|
attStruct := structs.AttElectraFromConsensus(attElectra)
|
|
filteredAtts = append(filteredAtts, attStruct)
|
|
}
|
|
} else if v < version.Electra && att.Version() < version.Electra {
|
|
attPhase0, ok := att.(*eth.Attestation)
|
|
if !ok {
|
|
httputil.HandleError(w, fmt.Sprintf("Unable to convert attestation of type %T", att), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
includeAttestation = shouldIncludeAttestation(attPhase0, rawSlot, slot, rawCommitteeIndex, committeeIndex)
|
|
if includeAttestation {
|
|
attStruct := structs.AttFromConsensus(attPhase0)
|
|
filteredAtts = append(filteredAtts, attStruct)
|
|
}
|
|
}
|
|
}
|
|
|
|
attsData, err := json.Marshal(filteredAtts)
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not marshal attestations: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
w.Header().Set(api.VersionHeader, version.String(v))
|
|
httputil.WriteJson(w, &structs.ListAttestationsResponse{
|
|
Version: version.String(v),
|
|
Data: attsData,
|
|
})
|
|
}
|
|
|
|
// Helper function to determine if an attestation should be included
|
|
func shouldIncludeAttestation(
|
|
att eth.Att,
|
|
rawSlot string,
|
|
slot uint64,
|
|
rawCommitteeIndex string,
|
|
committeeIndex uint64,
|
|
) bool {
|
|
committeeIndexMatch := true
|
|
slotMatch := true
|
|
if rawCommitteeIndex != "" && att.GetCommitteeIndex() != primitives.CommitteeIndex(committeeIndex) {
|
|
committeeIndexMatch = false
|
|
}
|
|
if rawSlot != "" && att.GetData().Slot != primitives.Slot(slot) {
|
|
slotMatch = false
|
|
}
|
|
return committeeIndexMatch && slotMatch
|
|
}
|
|
|
|
// SubmitAttestationsV2 submits an attestation object to node. If the attestation passes all validation
|
|
// constraints, node MUST publish the attestation on an appropriate subnet.
|
|
func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) {
|
|
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitAttestationsV2")
|
|
defer span.End()
|
|
|
|
versionHeader := r.Header.Get(api.VersionHeader)
|
|
if versionHeader == "" {
|
|
httputil.HandleError(w, api.VersionHeader+" header is required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
v, err := version.FromString(versionHeader)
|
|
if err != nil {
|
|
httputil.HandleError(w, "Invalid version: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
var req structs.SubmitAttestationsRequest
|
|
err = json.NewDecoder(r.Body).Decode(&req.Data)
|
|
switch {
|
|
case errors.Is(err, io.EOF):
|
|
httputil.HandleError(w, "No data submitted", http.StatusBadRequest)
|
|
return
|
|
case err != nil:
|
|
httputil.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
var attFailures []*server.IndexedError
|
|
var failedBroadcasts []*server.IndexedError
|
|
|
|
if v >= version.Electra {
|
|
attFailures, failedBroadcasts, err = s.handleAttestationsElectra(ctx, req.Data)
|
|
} else {
|
|
attFailures, failedBroadcasts, err = s.handleAttestations(ctx, req.Data)
|
|
}
|
|
if err != nil {
|
|
httputil.HandleError(w, fmt.Sprintf("Failed to handle attestations: %v", err), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
if len(attFailures) > 0 {
|
|
failuresErr := &server.IndexedErrorContainer{
|
|
Code: http.StatusBadRequest,
|
|
Message: server.ErrIndexedValidationFail,
|
|
Failures: attFailures,
|
|
}
|
|
httputil.WriteError(w, failuresErr)
|
|
return
|
|
}
|
|
if len(failedBroadcasts) > 0 {
|
|
failuresErr := &server.IndexedErrorContainer{
|
|
Code: http.StatusInternalServerError,
|
|
Message: server.ErrIndexedBroadcastFail,
|
|
Failures: failedBroadcasts,
|
|
}
|
|
httputil.WriteError(w, failuresErr)
|
|
return
|
|
}
|
|
}
|
|
|
|
func (s *Server) handleAttestationsElectra(
|
|
ctx context.Context,
|
|
data json.RawMessage,
|
|
) (attFailures []*server.IndexedError, failedBroadcasts []*server.IndexedError, err error) {
|
|
var sourceAttestations []*structs.SingleAttestation
|
|
currentEpoch := slots.ToEpoch(s.TimeFetcher.CurrentSlot())
|
|
if currentEpoch < params.BeaconConfig().ElectraForkEpoch {
|
|
return nil, nil, errors.Errorf("electra attestations have not been enabled, current epoch %d enabled epoch %d", currentEpoch, params.BeaconConfig().ElectraForkEpoch)
|
|
}
|
|
|
|
if err = json.Unmarshal(data, &sourceAttestations); err != nil {
|
|
return nil, nil, errors.Wrap(err, "failed to unmarshal attestation")
|
|
}
|
|
|
|
if len(sourceAttestations) == 0 {
|
|
return nil, nil, errors.New("no data submitted")
|
|
}
|
|
|
|
var validAttestations []*eth.SingleAttestation
|
|
for i, sourceAtt := range sourceAttestations {
|
|
att, err := sourceAtt.ToConsensus()
|
|
if err != nil {
|
|
attFailures = append(attFailures, &server.IndexedError{
|
|
Index: i,
|
|
Message: "Could not convert request attestation to consensus attestation: " + err.Error(),
|
|
})
|
|
continue
|
|
}
|
|
if _, err = bls.SignatureFromBytes(att.Signature); err != nil {
|
|
attFailures = append(attFailures, &server.IndexedError{
|
|
Index: i,
|
|
Message: "Incorrect attestation signature: " + err.Error(),
|
|
})
|
|
continue
|
|
}
|
|
validAttestations = append(validAttestations, att)
|
|
}
|
|
|
|
// We store the error for the first failed broadcast and use it in the log message in case
|
|
// there are broadcast issues. Having a single log at the end instead of logging
|
|
// for every failed broadcast prevents log noise in case there are many failures.
|
|
// Even though we only retain the first error, there is a very good chance that all
|
|
// broadcasts fail for the same reason, so this should be sufficient in most cases.
|
|
var broadcastErr error
|
|
|
|
for i, singleAtt := range validAttestations {
|
|
s.OperationNotifier.OperationFeed().Send(&feed.Event{
|
|
Type: operation.SingleAttReceived,
|
|
Data: &operation.SingleAttReceivedData{
|
|
Attestation: singleAtt,
|
|
},
|
|
})
|
|
|
|
targetState, err := s.AttestationStateFetcher.AttestationTargetState(ctx, singleAtt.Data.Target)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrap(err, "could not get target state for attestation")
|
|
}
|
|
committee, err := corehelpers.BeaconCommitteeFromState(ctx, targetState, singleAtt.Data.Slot, singleAtt.CommitteeId)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrap(err, "could not get committee for attestation")
|
|
}
|
|
att := singleAtt.ToAttestationElectra(committee)
|
|
|
|
wantedEpoch := slots.ToEpoch(att.Data.Slot)
|
|
vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrap(err, "could not get head validator indices")
|
|
}
|
|
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.GetCommitteeIndex(), att.Data.Slot)
|
|
if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, singleAtt); err != nil {
|
|
failedBroadcasts = append(failedBroadcasts, &server.IndexedError{
|
|
Index: i,
|
|
Message: server.NewBroadcastFailedError("SingleAttestation", err).Error(),
|
|
})
|
|
if broadcastErr == nil {
|
|
broadcastErr = err
|
|
}
|
|
continue
|
|
}
|
|
|
|
if features.Get().EnableExperimentalAttestationPool {
|
|
if err = s.AttestationCache.Add(att); err != nil {
|
|
log.WithError(err).Error("Could not save attestation")
|
|
}
|
|
} else {
|
|
if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
|
|
log.WithError(err).Error("Could not save attestation")
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(failedBroadcasts) > 0 {
|
|
log.WithFields(logrus.Fields{
|
|
"failedCount": len(failedBroadcasts),
|
|
"totalCount": len(validAttestations),
|
|
}).WithError(broadcastErr).Error("Some attestations failed to be broadcast")
|
|
}
|
|
|
|
return attFailures, failedBroadcasts, nil
|
|
}
|
|
|
|
func (s *Server) handleAttestations(
|
|
ctx context.Context,
|
|
data json.RawMessage,
|
|
) (attFailures []*server.IndexedError, failedBroadcasts []*server.IndexedError, err error) {
|
|
var sourceAttestations []*structs.Attestation
|
|
|
|
if slots.ToEpoch(s.TimeFetcher.CurrentSlot()) >= params.BeaconConfig().ElectraForkEpoch {
|
|
return nil, nil, errors.New("old attestation format, only electra attestations should be sent")
|
|
}
|
|
|
|
if err = json.Unmarshal(data, &sourceAttestations); err != nil {
|
|
return nil, nil, errors.Wrap(err, "failed to unmarshal attestation")
|
|
}
|
|
|
|
if len(sourceAttestations) == 0 {
|
|
return nil, nil, errors.New("no data submitted")
|
|
}
|
|
|
|
var validAttestations []*eth.Attestation
|
|
for i, sourceAtt := range sourceAttestations {
|
|
att, err := sourceAtt.ToConsensus()
|
|
if err != nil {
|
|
attFailures = append(attFailures, &server.IndexedError{
|
|
Index: i,
|
|
Message: "Could not convert request attestation to consensus attestation: " + err.Error(),
|
|
})
|
|
continue
|
|
}
|
|
if _, err = bls.SignatureFromBytes(att.Signature); err != nil {
|
|
attFailures = append(attFailures, &server.IndexedError{
|
|
Index: i,
|
|
Message: "Incorrect attestation signature: " + err.Error(),
|
|
})
|
|
continue
|
|
}
|
|
validAttestations = append(validAttestations, att)
|
|
}
|
|
|
|
// We store the error for the first failed broadcast and use it in the log message in case
|
|
// there are broadcast issues. Having a single log at the end instead of logging
|
|
// for every failed broadcast prevents log noise in case there are many failures.
|
|
// Even though we only retain the first error, there is a very good chance that all
|
|
// broadcasts fail for the same reason, so this should be sufficient in most cases.
|
|
var broadcastErr error
|
|
|
|
for i, att := range validAttestations {
|
|
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
|
|
// of a received unaggregated attestation.
|
|
// Note we can't send for aggregated att because we don't have selection proof.
|
|
if !att.IsAggregated() {
|
|
s.OperationNotifier.OperationFeed().Send(&feed.Event{
|
|
Type: operation.UnaggregatedAttReceived,
|
|
Data: &operation.UnAggregatedAttReceivedData{
|
|
Attestation: att,
|
|
},
|
|
})
|
|
}
|
|
|
|
wantedEpoch := slots.ToEpoch(att.Data.Slot)
|
|
vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrap(err, "could not get head validator indices")
|
|
}
|
|
|
|
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.Data.CommitteeIndex, att.Data.Slot)
|
|
if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, att); err != nil {
|
|
failedBroadcasts = append(failedBroadcasts, &server.IndexedError{
|
|
Index: i,
|
|
Message: server.NewBroadcastFailedError("Attestation", err).Error(),
|
|
})
|
|
if broadcastErr == nil {
|
|
broadcastErr = err
|
|
}
|
|
continue
|
|
}
|
|
|
|
if features.Get().EnableExperimentalAttestationPool {
|
|
if err = s.AttestationCache.Add(att); err != nil {
|
|
log.WithError(err).Error("Could not save attestation")
|
|
}
|
|
} else if att.IsAggregated() {
|
|
if err = s.AttestationsPool.SaveAggregatedAttestation(att); err != nil {
|
|
log.WithError(err).Error("Could not save aggregated attestation")
|
|
}
|
|
} else {
|
|
if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
|
|
log.WithError(err).Error("Could not save unaggregated attestation")
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(failedBroadcasts) > 0 {
|
|
log.WithFields(logrus.Fields{
|
|
"failedCount": len(failedBroadcasts),
|
|
"totalCount": len(validAttestations),
|
|
}).WithError(broadcastErr).Error("Some attestations failed to be broadcast")
|
|
}
|
|
|
|
return attFailures, failedBroadcasts, nil
|
|
}
|
|
|
|
// ListVoluntaryExits retrieves voluntary exits known by the node but
|
|
// not necessarily incorporated into any block.
|
|
func (s *Server) ListVoluntaryExits(w http.ResponseWriter, r *http.Request) {
|
|
_, span := trace.StartSpan(r.Context(), "beacon.ListVoluntaryExits")
|
|
defer span.End()
|
|
|
|
sourceExits, err := s.VoluntaryExitsPool.PendingExits()
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not get exits from the pool: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
exits := make([]*structs.SignedVoluntaryExit, len(sourceExits))
|
|
for i, e := range sourceExits {
|
|
exits[i] = structs.SignedExitFromConsensus(e)
|
|
}
|
|
|
|
httputil.WriteJson(w, &structs.ListVoluntaryExitsResponse{Data: exits})
|
|
}
|
|
|
|
// SubmitVoluntaryExit submits a SignedVoluntaryExit object to node's pool
|
|
// and if passes validation node MUST broadcast it to network.
|
|
func (s *Server) SubmitVoluntaryExit(w http.ResponseWriter, r *http.Request) {
|
|
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitVoluntaryExit")
|
|
defer span.End()
|
|
|
|
var req structs.SignedVoluntaryExit
|
|
err := json.NewDecoder(r.Body).Decode(&req)
|
|
switch {
|
|
case errors.Is(err, io.EOF):
|
|
httputil.HandleError(w, "No data submitted", http.StatusBadRequest)
|
|
return
|
|
case err != nil:
|
|
httputil.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
exit, err := req.ToConsensus()
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not convert request exit to consensus exit: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
headState, err := s.ChainInfoFetcher.HeadState(ctx)
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not get head state: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
epochStart, err := slots.EpochStart(exit.Exit.Epoch)
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not get epoch start: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
headState, err = transition.ProcessSlotsIfPossible(ctx, headState, epochStart)
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not process slots: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
val, err := headState.ValidatorAtIndexReadOnly(exit.Exit.ValidatorIndex)
|
|
if err != nil {
|
|
if errors.Is(err, mvslice.ErrOutOfBounds) {
|
|
httputil.HandleError(w, "Could not get validator: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
httputil.HandleError(w, "Could not get validator: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
if err = blocks.VerifyExitAndSignature(val, headState, exit); err != nil {
|
|
httputil.HandleError(w, "Invalid exit: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
s.VoluntaryExitsPool.InsertVoluntaryExit(exit)
|
|
if err = s.Broadcaster.Broadcast(ctx, exit); err != nil {
|
|
httputil.HandleError(w, "Could not broadcast exit: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
}
|
|
|
|
// SubmitSyncCommitteeSignatures submits sync committee signature objects to the node.
|
|
func (s *Server) SubmitSyncCommitteeSignatures(w http.ResponseWriter, r *http.Request) {
|
|
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitPoolSyncCommitteeSignatures")
|
|
defer span.End()
|
|
|
|
var req structs.SubmitSyncCommitteeSignaturesRequest
|
|
err := json.NewDecoder(r.Body).Decode(&req.Data)
|
|
switch {
|
|
case errors.Is(err, io.EOF):
|
|
httputil.HandleError(w, "No data submitted", http.StatusBadRequest)
|
|
return
|
|
case err != nil:
|
|
httputil.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
if len(req.Data) == 0 {
|
|
httputil.HandleError(w, "No data submitted", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
var validMessages []*eth.SyncCommitteeMessage
|
|
var msgFailures []*server.IndexedError
|
|
for i, sourceMsg := range req.Data {
|
|
msg, err := sourceMsg.ToConsensus()
|
|
if err != nil {
|
|
msgFailures = append(msgFailures, &server.IndexedError{
|
|
Index: i,
|
|
Message: "Could not convert request message to consensus message: " + err.Error(),
|
|
})
|
|
continue
|
|
}
|
|
validMessages = append(validMessages, msg)
|
|
}
|
|
|
|
for _, msg := range validMessages {
|
|
if rpcerr := s.CoreService.SubmitSyncMessage(ctx, msg); rpcerr != nil {
|
|
httputil.HandleError(w, "Could not submit message: "+rpcerr.Err.Error(), core.ErrorReasonToHTTP(rpcerr.Reason))
|
|
return
|
|
}
|
|
}
|
|
|
|
if len(msgFailures) > 0 {
|
|
failuresErr := &server.IndexedErrorContainer{
|
|
Code: http.StatusBadRequest,
|
|
Message: "One or more messages failed validation",
|
|
Failures: msgFailures,
|
|
}
|
|
httputil.WriteError(w, failuresErr)
|
|
}
|
|
}
|
|
|
|
// SubmitBLSToExecutionChanges submits said object to the node's pool
|
|
// if it passes validation the node must broadcast it to the network.
|
|
func (s *Server) SubmitBLSToExecutionChanges(w http.ResponseWriter, r *http.Request) {
|
|
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitBLSToExecutionChanges")
|
|
defer span.End()
|
|
st, err := s.ChainInfoFetcher.HeadStateReadOnly(ctx)
|
|
if err != nil {
|
|
httputil.HandleError(w, fmt.Sprintf("Could not get head state: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
var failures []*server.IndexedError
|
|
var toBroadcast []*eth.SignedBLSToExecutionChange
|
|
|
|
var req []*structs.SignedBLSToExecutionChange
|
|
err = json.NewDecoder(r.Body).Decode(&req)
|
|
switch {
|
|
case errors.Is(err, io.EOF):
|
|
httputil.HandleError(w, "No data submitted", http.StatusBadRequest)
|
|
return
|
|
case err != nil:
|
|
httputil.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
if len(req) == 0 {
|
|
httputil.HandleError(w, "No data submitted", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
for i, change := range req {
|
|
sbls, err := change.ToConsensus()
|
|
if err != nil {
|
|
failures = append(failures, &server.IndexedError{
|
|
Index: i,
|
|
Message: "Unable to decode SignedBLSToExecutionChange: " + err.Error(),
|
|
})
|
|
continue
|
|
}
|
|
_, err = blocks.ValidateBLSToExecutionChange(st, sbls)
|
|
if err != nil {
|
|
failures = append(failures, &server.IndexedError{
|
|
Index: i,
|
|
Message: "Could not validate SignedBLSToExecutionChange: " + err.Error(),
|
|
})
|
|
continue
|
|
}
|
|
if err := blocks.VerifyBLSChangeSignature(st, sbls); err != nil {
|
|
failures = append(failures, &server.IndexedError{
|
|
Index: i,
|
|
Message: "Could not validate signature: " + err.Error(),
|
|
})
|
|
continue
|
|
}
|
|
s.OperationNotifier.OperationFeed().Send(&feed.Event{
|
|
Type: operation.BLSToExecutionChangeReceived,
|
|
Data: &operation.BLSToExecutionChangeReceivedData{
|
|
Change: sbls,
|
|
},
|
|
})
|
|
s.BLSChangesPool.InsertBLSToExecChange(sbls)
|
|
if st.Version() >= version.Capella {
|
|
toBroadcast = append(toBroadcast, sbls)
|
|
}
|
|
}
|
|
go s.broadcastBLSChanges(context.Background(), toBroadcast)
|
|
if len(failures) > 0 {
|
|
failuresErr := &server.IndexedErrorContainer{
|
|
Code: http.StatusBadRequest,
|
|
Message: server.ErrIndexedValidationFail,
|
|
Failures: failures,
|
|
}
|
|
httputil.WriteError(w, failuresErr)
|
|
}
|
|
}
|
|
|
|
// broadcastBLSBatch broadcasts the first `broadcastBLSChangesRateLimit` messages from the slice pointed to by ptr.
|
|
// It validates the messages again because they could have been invalidated by being included in blocks since the last validation.
|
|
// It removes the messages from the slice and modifies it in place.
|
|
func (s *Server) broadcastBLSBatch(ctx context.Context, ptr *[]*eth.SignedBLSToExecutionChange) {
|
|
limit := min(len(*ptr), broadcastBLSChangesRateLimit)
|
|
st, err := s.ChainInfoFetcher.HeadStateReadOnly(ctx)
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not get head state")
|
|
return
|
|
}
|
|
for _, ch := range (*ptr)[:limit] {
|
|
if ch != nil {
|
|
_, err := blocks.ValidateBLSToExecutionChange(st, ch)
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not validate BLS to execution change")
|
|
continue
|
|
}
|
|
if err := s.Broadcaster.Broadcast(ctx, ch); err != nil {
|
|
log.WithError(err).Error("Could not broadcast BLS to execution changes.")
|
|
}
|
|
}
|
|
}
|
|
*ptr = (*ptr)[limit:]
|
|
}
|
|
|
|
func (s *Server) broadcastBLSChanges(ctx context.Context, changes []*eth.SignedBLSToExecutionChange) {
|
|
s.broadcastBLSBatch(ctx, &changes)
|
|
if len(changes) == 0 {
|
|
return
|
|
}
|
|
|
|
ticker := time.NewTicker(500 * time.Millisecond)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
s.broadcastBLSBatch(ctx, &changes)
|
|
if len(changes) == 0 {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ListBLSToExecutionChanges retrieves BLS to execution changes known by the node but not necessarily incorporated into any block
|
|
func (s *Server) ListBLSToExecutionChanges(w http.ResponseWriter, r *http.Request) {
|
|
_, span := trace.StartSpan(r.Context(), "beacon.ListBLSToExecutionChanges")
|
|
defer span.End()
|
|
|
|
sourceChanges, err := s.BLSChangesPool.PendingBLSToExecChanges()
|
|
if err != nil {
|
|
httputil.HandleError(w, fmt.Sprintf("Could not get BLS to execution changes: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
httputil.WriteJson(w, &structs.BLSToExecutionChangesPoolResponse{
|
|
Data: structs.SignedBLSChangesFromConsensus(sourceChanges),
|
|
})
|
|
}
|
|
|
|
// GetAttesterSlashingsV2 retrieves attester slashings known by the node but
|
|
// not necessarily incorporated into any block, supporting both AttesterSlashing and AttesterSlashingElectra.
|
|
func (s *Server) GetAttesterSlashingsV2(w http.ResponseWriter, r *http.Request) {
|
|
ctx, span := trace.StartSpan(r.Context(), "beacon.GetAttesterSlashingsV2")
|
|
defer span.End()
|
|
|
|
v := slots.ToForkVersion(s.TimeFetcher.CurrentSlot())
|
|
headState, err := s.ChainInfoFetcher.HeadStateReadOnly(ctx)
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not get head state: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
sourceSlashings := s.SlashingsPool.PendingAttesterSlashings(ctx, headState, true /* return unlimited slashings */)
|
|
attStructs := make([]any, 0, len(sourceSlashings))
|
|
for _, slashing := range sourceSlashings {
|
|
var attStruct any
|
|
if v >= version.Electra && slashing.Version() >= version.Electra {
|
|
a, ok := slashing.(*eth.AttesterSlashingElectra)
|
|
if !ok {
|
|
httputil.HandleError(w, fmt.Sprintf("Unable to convert slashing of type %T to an Electra slashing", slashing), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
attStruct = structs.AttesterSlashingElectraFromConsensus(a)
|
|
} else if v < version.Electra && slashing.Version() < version.Electra {
|
|
a, ok := slashing.(*eth.AttesterSlashing)
|
|
if !ok {
|
|
httputil.HandleError(w, fmt.Sprintf("Unable to convert slashing of type %T to a Phase0 slashing", slashing), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
attStruct = structs.AttesterSlashingFromConsensus(a)
|
|
} else {
|
|
continue
|
|
}
|
|
attStructs = append(attStructs, attStruct)
|
|
}
|
|
|
|
attBytes, err := json.Marshal(attStructs)
|
|
if err != nil {
|
|
httputil.HandleError(w, fmt.Sprintf("Failed to marshal slashing: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
resp := &structs.GetAttesterSlashingsResponse{
|
|
Version: version.String(v),
|
|
Data: attBytes,
|
|
}
|
|
w.Header().Set(api.VersionHeader, version.String(v))
|
|
httputil.WriteJson(w, resp)
|
|
}
|
|
|
|
// SubmitAttesterSlashingsV2 submits an attester slashing object to node's pool and
|
|
// if passes validation node MUST broadcast it to network.
|
|
func (s *Server) SubmitAttesterSlashingsV2(w http.ResponseWriter, r *http.Request) {
|
|
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitAttesterSlashingsV2")
|
|
defer span.End()
|
|
|
|
versionHeader := r.Header.Get(api.VersionHeader)
|
|
if versionHeader == "" {
|
|
httputil.HandleError(w, api.VersionHeader+" header is required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
v, err := version.FromString(versionHeader)
|
|
if err != nil {
|
|
httputil.HandleError(w, "Invalid version: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
if v >= version.Electra {
|
|
var req structs.AttesterSlashingElectra
|
|
err := json.NewDecoder(r.Body).Decode(&req)
|
|
switch {
|
|
case errors.Is(err, io.EOF):
|
|
httputil.HandleError(w, "No data submitted", http.StatusBadRequest)
|
|
return
|
|
case err != nil:
|
|
httputil.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
slashing, err := req.ToConsensus()
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not convert request slashing to consensus slashing: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
s.submitAttesterSlashing(w, ctx, slashing)
|
|
} else {
|
|
var req structs.AttesterSlashing
|
|
err := json.NewDecoder(r.Body).Decode(&req)
|
|
switch {
|
|
case errors.Is(err, io.EOF):
|
|
httputil.HandleError(w, "No data submitted", http.StatusBadRequest)
|
|
return
|
|
case err != nil:
|
|
httputil.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
slashing, err := req.ToConsensus()
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not convert request slashing to consensus slashing: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
s.submitAttesterSlashing(w, ctx, slashing)
|
|
}
|
|
}
|
|
|
|
func (s *Server) submitAttesterSlashing(
|
|
w http.ResponseWriter,
|
|
ctx context.Context,
|
|
slashing eth.AttSlashing,
|
|
) {
|
|
headState, err := s.ChainInfoFetcher.HeadState(ctx)
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not get head state: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
headState, err = transition.ProcessSlotsIfPossible(ctx, headState, slashing.FirstAttestation().GetData().Slot)
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not process slots: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
err = blocks.VerifyAttesterSlashing(ctx, headState, slashing)
|
|
if err != nil {
|
|
httputil.HandleError(w, "Invalid attester slashing: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
err = s.SlashingsPool.InsertAttesterSlashing(ctx, headState, slashing)
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not insert attester slashing into pool: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
// notify events
|
|
s.OperationNotifier.OperationFeed().Send(&feed.Event{
|
|
Type: operation.AttesterSlashingReceived,
|
|
Data: &operation.AttesterSlashingReceivedData{
|
|
AttesterSlashing: slashing,
|
|
},
|
|
})
|
|
if !features.Get().DisableBroadcastSlashings {
|
|
if err = s.Broadcaster.Broadcast(ctx, slashing); err != nil {
|
|
httputil.HandleError(w, "Could not broadcast slashing object: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetProposerSlashings retrieves proposer slashings known by the node
|
|
// but not necessarily incorporated into any block.
|
|
func (s *Server) GetProposerSlashings(w http.ResponseWriter, r *http.Request) {
|
|
ctx, span := trace.StartSpan(r.Context(), "beacon.GetProposerSlashings")
|
|
defer span.End()
|
|
|
|
headState, err := s.ChainInfoFetcher.HeadStateReadOnly(ctx)
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not get head state: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
sourceSlashings := s.SlashingsPool.PendingProposerSlashings(ctx, headState, true /* return unlimited slashings */)
|
|
slashings := structs.ProposerSlashingsFromConsensus(sourceSlashings)
|
|
|
|
httputil.WriteJson(w, &structs.GetProposerSlashingsResponse{Data: slashings})
|
|
}
|
|
|
|
// SubmitProposerSlashing submits a proposer slashing object to node's pool and if
|
|
// passes validation node MUST broadcast it to network.
|
|
func (s *Server) SubmitProposerSlashing(w http.ResponseWriter, r *http.Request) {
|
|
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitProposerSlashing")
|
|
defer span.End()
|
|
|
|
var req structs.ProposerSlashing
|
|
err := json.NewDecoder(r.Body).Decode(&req)
|
|
switch {
|
|
case errors.Is(err, io.EOF):
|
|
httputil.HandleError(w, "No data submitted", http.StatusBadRequest)
|
|
return
|
|
case err != nil:
|
|
httputil.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
slashing, err := req.ToConsensus()
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not convert request slashing to consensus slashing: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
headState, err := s.ChainInfoFetcher.HeadState(ctx)
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not get head state: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
headState, err = transition.ProcessSlotsIfPossible(ctx, headState, slashing.Header_1.Header.Slot)
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not process slots: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
err = blocks.VerifyProposerSlashing(headState, slashing)
|
|
if err != nil {
|
|
httputil.HandleError(w, "Invalid proposer slashing: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
err = s.SlashingsPool.InsertProposerSlashing(ctx, headState, slashing)
|
|
if err != nil {
|
|
httputil.HandleError(w, "Could not insert proposer slashing into pool: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// notify events
|
|
s.OperationNotifier.OperationFeed().Send(&feed.Event{
|
|
Type: operation.ProposerSlashingReceived,
|
|
Data: &operation.ProposerSlashingReceivedData{
|
|
ProposerSlashing: slashing,
|
|
},
|
|
})
|
|
|
|
if !features.Get().DisableBroadcastSlashings {
|
|
if err = s.Broadcaster.Broadcast(ctx, slashing); err != nil {
|
|
httputil.HandleError(w, "Could not broadcast slashing object: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
}
|
|
}
|