Compare commits

...

1 Commits

Author SHA1 Message Date
terence tsao
9efacb5c3c Proposer rpc builds beacon block in parallel 2023-04-07 08:33:03 -07:00
2 changed files with 94 additions and 72 deletions

View File

@@ -5,6 +5,7 @@ import (
"encoding/hex"
"fmt"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -24,6 +25,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
prysmTime "github.com/prysmaticlabs/prysm/v4/time"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
@@ -39,6 +41,11 @@ const eth1dataTimeout = 2 * time.Second
// GetBeaconBlock is called by a proposer during its assigned slot to request a block to sign
// by passing in the slot and the signed randao reveal of the slot.
func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (*ethpb.GenericBeaconBlock, error) {
startTime, err := slots.ToTime(uint64(vs.TimeFetcher.GenesisTime().Unix()), req.Slot)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get start time: %v", err)
}
ctx, span := trace.StartSpan(ctx, "ProposerServer.GetBeaconBlock")
defer span.End()
span.AddAttributes(trace.Int64Attribute("slot", int64(req.Slot)))
@@ -82,50 +89,92 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
sBlk.SetRandaoReveal(req.RandaoReveal)
sBlk.SetParentRoot(parentRoot[:])
// Set eth1 data.
eth1Data, err := vs.eth1DataMajorityVote(ctx, head)
if err != nil {
eth1Data = &ethpb.Eth1Data{DepositRoot: params.BeaconConfig().ZeroHash[:], BlockHash: params.BeaconConfig().ZeroHash[:]}
log.WithError(err).Error("Could not get eth1data")
}
sBlk.SetEth1Data(eth1Data)
var wg sync.WaitGroup
// Set deposit and attestation.
deposits, atts, err := vs.packDepositsAndAttestations(ctx, head, eth1Data) // TODO: split attestations and deposits
if err != nil {
sBlk.SetDeposits([]*ethpb.Deposit{})
sBlk.SetAttestations([]*ethpb.Attestation{})
log.WithError(err).Error("Could not pack deposits and attestations")
} else {
sBlk.SetDeposits(deposits)
sBlk.SetAttestations(atts)
}
wg.Add(1)
var eth1Data *ethpb.Eth1Data
go func() {
defer wg.Done()
eth1Data, err := vs.eth1DataMajorityVote(ctx, head)
if err != nil {
eth1Data = &ethpb.Eth1Data{DepositRoot: params.BeaconConfig().ZeroHash[:], BlockHash: params.BeaconConfig().ZeroHash[:]}
log.WithError(err).Error("Could not get eth1data")
}
sBlk.SetEth1Data(eth1Data)
}()
// Set proposer index.
idx, err := helpers.BeaconProposerIndex(ctx, head)
if err != nil {
wg.Add(1)
go func() {
defer wg.Done()
atts, err := vs.packAttestations(ctx, head)
if err != nil {
sBlk.SetAttestations([]*ethpb.Attestation{})
log.WithError(err).Error("Could not pack attestations")
} else {
sBlk.SetAttestations(atts)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
deposits, err := vs.packDeposit(ctx, head, eth1Data)
if err != nil {
sBlk.SetDeposits([]*ethpb.Deposit{})
log.WithError(err).Error("Could not pack deposits")
} else {
sBlk.SetDeposits(deposits)
}
}()
wg.Add(1)
if err := func() error {
defer wg.Done()
idx, err := helpers.BeaconProposerIndex(ctx, head)
if err != nil {
return err
}
sBlk.SetProposerIndex(idx)
return nil
}(); err != nil {
return nil, fmt.Errorf("could not calculate proposer index %v", err)
}
sBlk.SetProposerIndex(idx)
// Set slashings.
validProposerSlashings, validAttSlashings := vs.getSlashings(ctx, head)
sBlk.SetProposerSlashings(validProposerSlashings)
sBlk.SetAttesterSlashings(validAttSlashings)
wg.Add(1)
go func() {
defer wg.Done()
validProposerSlashings, validAttSlashings := vs.getSlashings(ctx, head)
sBlk.SetProposerSlashings(validProposerSlashings)
sBlk.SetAttesterSlashings(validAttSlashings)
}()
// Set exits.
sBlk.SetVoluntaryExits(vs.getExits(head, req.Slot))
wg.Add(1)
go func() {
wg.Done()
sBlk.SetVoluntaryExits(vs.getExits(head, req.Slot))
}()
// Set sync aggregate. New in Altair.
vs.setSyncAggregate(ctx, sBlk)
wg.Add(1)
go func() {
wg.Done()
vs.setSyncAggregate(ctx, sBlk)
}()
// Set execution data. New in Bellatrix.
if err := vs.setExecutionData(ctx, sBlk, head); err != nil {
wg.Add(1)
if err := func() error {
defer wg.Done()
return vs.setExecutionData(ctx, sBlk, head)
}(); err != nil {
return nil, status.Errorf(codes.Internal, "Could not set execution data: %v", err)
}
// Set bls to execution change. New in Capella.
vs.setBlsToExecData(sBlk, head)
wg.Add(1)
go func() {
wg.Done()
vs.setBlsToExecData(sBlk, head)
}()
wg.Wait()
sr, err := vs.computeStateRoot(ctx, sBlk)
if err != nil {
@@ -152,6 +201,12 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
if slots.ToEpoch(req.Slot) >= params.BeaconConfig().AltairForkEpoch {
return &ethpb.GenericBeaconBlock{Block: &ethpb.GenericBeaconBlock_Altair{Altair: pb.(*ethpb.BeaconBlockAltair)}}, nil
}
currentSlotStartTime := vs.TimeFetcher.GenesisTime().Add(time.Duration(int64(uint64(vs.TimeFetcher.CurrentSlot()) * params.BeaconConfig().SecondsPerSlot)))
log.Info("Returned block to proposer in #1", prysmTime.Since(currentSlotStartTime).Milliseconds())
log.Info("Returned block to proposer in #2", prysmTime.Since(startTime).Milliseconds())
return &ethpb.GenericBeaconBlock{Block: &ethpb.GenericBeaconBlock_Phase0{Phase0: pb.(*ethpb.BeaconBlock)}}, nil
}

View File

@@ -12,49 +12,16 @@ import (
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (vs *Server) packDepositsAndAttestations(ctx context.Context, head state.BeaconState, eth1Data *ethpb.Eth1Data) ([]*ethpb.Deposit, []*ethpb.Attestation, error) {
eg, egctx := errgroup.WithContext(ctx)
var deposits []*ethpb.Deposit
var atts []*ethpb.Attestation
eg.Go(func() error {
// Pack ETH1 deposits which have not been included in the beacon chain.
localDeposits, err := vs.deposits(egctx, head, eth1Data)
if err != nil {
return status.Errorf(codes.Internal, "Could not get ETH1 deposits: %v", err)
}
// if the original context is cancelled, then cancel this routine too
select {
case <-egctx.Done():
return egctx.Err()
default:
}
deposits = localDeposits
return nil
})
eg.Go(func() error {
// Pack aggregated attestations which have not been included in the beacon chain.
localAtts, err := vs.packAttestations(egctx, head)
if err != nil {
return status.Errorf(codes.Internal, "Could not get attestations to pack into block: %v", err)
}
// if the original context is cancelled, then cancel this routine too
select {
case <-egctx.Done():
return egctx.Err()
default:
}
atts = localAtts
return nil
})
return deposits, atts, eg.Wait()
func (vs *Server) packDeposit(ctx context.Context, head state.BeaconState, eth1Data *ethpb.Eth1Data) ([]*ethpb.Deposit, error) {
d, err := vs.deposits(ctx, head, eth1Data)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get ETH1 deposits: %v", err)
}
return d, nil
}
// deposits returns a list of pending deposits that are ready for inclusion in the next beacon