Compare commits

...

12 Commits

Author SHA1 Message Date
Kasey Kirkham
9493af1cdc consistently use slot instead of version 2025-09-18 15:24:26 -05:00
Kasey Kirkham
8a270d2a5c build sidecars during block publish 2025-09-18 15:07:14 -05:00
Manu NALEPA
9b551959c4 Take notifier out of the critical section 2025-09-18 18:57:12 +02:00
Manu NALEPA
8f9026bed8 Fix deadlock-option 2. 2025-09-18 18:53:27 +02:00
Manu NALEPA
2568e2e087 Fix deadlock-option 1. 2025-09-18 18:50:00 +02:00
Manu NALEPA
62a4fca4d5 Add changelog 2025-09-18 18:16:12 +02:00
Manu NALEPA
7059cf4cf2 broadcastReceiveBlock: Add log. 2025-09-18 18:14:09 +02:00
Manu NALEPA
5c64cb9eb6 ProposeBeaconBlock: First broadcast/receive block, and then sidecars. 2025-09-18 18:07:03 +02:00
Manu NALEPA
1bf2188f81 broadcastAndReceiveDataColumns: Broadcast and receive data columns in parallel. 2025-09-18 16:03:59 +02:00
Manu NALEPA
d78cb1fd67 Broadcasted data column sidecar log: Add blobCount. 2025-09-18 16:03:33 +02:00
terence
900f162467 fix: use v2 endpoint for blinded block submission post-Fulu (#15716) 2025-09-18 00:27:46 +00:00
hyunchel
5266d34a22 Fix misleading log msg on shutdown (#13063)
* Fix misleading log msg on shutdown

gRPCServer.GracefulStop blocks until it has been shutdown. Logging
"Initiated graceful stop" after it has been completed is misleading.
Names are added to the message to discern services. Also, a minimum test
is added mainly to verify the change made with this commit.

* Add changelog fragment file

* Capitalize log messages

* Update endtoend test for fixed log messages

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2025-09-17 22:14:40 +00:00
12 changed files with 231 additions and 104 deletions

View File

@@ -30,10 +30,11 @@ import (
)
const (
getExecHeaderPath = "/eth/v1/builder/header/{{.Slot}}/{{.ParentHash}}/{{.Pubkey}}"
getStatus = "/eth/v1/builder/status"
postBlindedBeaconBlockPath = "/eth/v1/builder/blinded_blocks"
postRegisterValidatorPath = "/eth/v1/builder/validators"
getExecHeaderPath = "/eth/v1/builder/header/{{.Slot}}/{{.ParentHash}}/{{.Pubkey}}"
getStatus = "/eth/v1/builder/status"
postBlindedBeaconBlockPath = "/eth/v1/builder/blinded_blocks"
postBlindedBeaconBlockV2Path = "/eth/v2/builder/blinded_blocks"
postRegisterValidatorPath = "/eth/v1/builder/validators"
)
var (
@@ -512,7 +513,7 @@ func (c *Client) SubmitBlindedBlockPostFulu(ctx context.Context, sb interfaces.R
}
// Post the blinded block - the response should only contain a status code (no payload)
_, _, err = c.do(ctx, http.MethodPost, postBlindedBeaconBlockPath, bytes.NewBuffer(body), http.StatusAccepted, postOpts)
_, _, err = c.do(ctx, http.MethodPost, postBlindedBeaconBlockV2Path, bytes.NewBuffer(body), http.StatusAccepted, postOpts)
if err != nil {
return errors.Wrap(err, "error posting the blinded block to the builder api post-Fulu")
}

View File

@@ -1561,7 +1561,7 @@ func TestSubmitBlindedBlockPostFulu(t *testing.T) {
t.Run("success", func(t *testing.T) {
hc := &http.Client{
Transport: roundtrip(func(r *http.Request) (*http.Response, error) {
require.Equal(t, postBlindedBeaconBlockPath, r.URL.Path)
require.Equal(t, postBlindedBeaconBlockV2Path, r.URL.Path)
require.Equal(t, "bellatrix", r.Header.Get("Eth-Consensus-Version"))
require.Equal(t, api.JsonMediaType, r.Header.Get("Content-Type"))
require.Equal(t, api.JsonMediaType, r.Header.Get("Accept"))
@@ -1586,7 +1586,7 @@ func TestSubmitBlindedBlockPostFulu(t *testing.T) {
t.Run("success_ssz", func(t *testing.T) {
hc := &http.Client{
Transport: roundtrip(func(r *http.Request) (*http.Response, error) {
require.Equal(t, postBlindedBeaconBlockPath, r.URL.Path)
require.Equal(t, postBlindedBeaconBlockV2Path, r.URL.Path)
require.Equal(t, "bellatrix", r.Header.Get(api.VersionHeader))
require.Equal(t, api.OctetStreamMediaType, r.Header.Get("Content-Type"))
require.Equal(t, api.OctetStreamMediaType, r.Header.Get("Accept"))
@@ -1612,7 +1612,7 @@ func TestSubmitBlindedBlockPostFulu(t *testing.T) {
t.Run("error_response", func(t *testing.T) {
hc := &http.Client{
Transport: roundtrip(func(r *http.Request) (*http.Response, error) {
require.Equal(t, postBlindedBeaconBlockPath, r.URL.Path)
require.Equal(t, postBlindedBeaconBlockV2Path, r.URL.Path)
require.Equal(t, "bellatrix", r.Header.Get("Eth-Consensus-Version"))
message := ErrorMessage{
Code: 400,

View File

@@ -381,6 +381,7 @@ func (s *Service) internalBroadcastDataColumnSidecar(
"timeSinceSlotStart": time.Since(slotStartTime),
"root": fmt.Sprintf("%#x", dataColumnSidecar.BlockRoot()),
"columnSubnet": columnSubnet,
"blobCount": len(dataColumnSidecar.Column),
}).Debug("Broadcasted data column sidecar")
// Increase the number of successful broadcasts.

View File

@@ -279,86 +279,218 @@ func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.Signed
//
// ProposeBeaconBlock handles the proposal of beacon blocks.
func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSignedBeaconBlock) (*ethpb.ProposeResponse, error) {
var (
blobSidecars []*ethpb.BlobSidecar
dataColumnSidecars []blocks.RODataColumn
)
ctx, span := trace.StartSpan(ctx, "ProposerServer.ProposeBeaconBlock")
defer span.End()
if req == nil {
return nil, status.Errorf(codes.InvalidArgument, "empty request")
}
block, err := blocks.NewSignedBeaconBlock(req.Block)
prop, err := unblindProposalRequest(ctx, req, vs.BlockBuilder)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s: %v", "decode block failed", err)
}
root, err := block.Block().HashTreeRoot()
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not hash tree root: %v", err)
return nil, status.Errorf(codes.Internal, "could not unblind block: %v", err)
}
// For post-Fulu blinded blocks, submit to relay and return early
if block.IsBlinded() && slots.ToEpoch(block.Block().Slot()) >= params.BeaconConfig().FuluForkEpoch {
err := vs.BlockBuilder.SubmitBlindedBlockPostFulu(ctx, block)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not submit blinded block post-Fulu: %v", err)
}
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
if !prop.shouldBroadcastBlock() {
return &ethpb.ProposeResponse{BlockRoot: prop.block.RootSlice()}, nil
}
rob, err := blocks.NewROBlockWithRoot(block, root)
if block.IsBlinded() {
block, blobSidecars, err = vs.handleBlindedBlock(ctx, block)
} else if block.Version() >= version.Deneb {
blobSidecars, dataColumnSidecars, err = vs.handleUnblindedBlock(rob, req)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "%s: %v", "handle block failed", err)
}
var wg sync.WaitGroup
errChan := make(chan error, 1)
wg.Add(1)
ec := make(chan error)
go func() {
defer wg.Done()
if err := vs.broadcastReceiveBlock(ctx, block, root); err != nil {
errChan <- errors.Wrap(err, "broadcast/receive block failed")
return
if err := vs.broadcastBlock(ctx, prop.block); err != nil {
ec <- errors.Wrap(err, "broadcast block")
} else {
ec <- nil
}
errChan <- nil
}()
if err := vs.broadcastAndReceiveSidecars(ctx, block, root, blobSidecars, dataColumnSidecars); err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive sidecars: %v", err)
// build sidecars concurrently with block broadcast
if err := prop.buildSidecars(); err != nil {
return nil, status.Errorf(codes.Internal, "could not build sidecars: %v", err)
}
wg.Wait()
if err := <-errChan; err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive block: %v", err)
// wait for result of block broadcast
if err := <-ec; err != nil {
return nil, status.Errorf(codes.Internal, "broadcast block failed: %v", err)
}
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
if prop.shouldBroadcastColumns() {
cols, err := prop.buildColumns()
if err != nil {
return nil, status.Errorf(codes.Internal, "could not construct data columns: %v", err)
}
if err := vs.broadcastAndReceiveDataColumns(ctx, cols, prop.block.Root()); err != nil {
return nil, status.Errorf(codes.Internal, "could not broadcast and receive data columns: %v", err)
}
} else if prop.shouldBroadcastBlobs() {
blobs, err := prop.buildBlobs()
if err != nil {
return nil, status.Errorf(codes.Internal, "could not construct blob sidecars: %v", err)
}
if err := vs.broadcastAndReceiveBlobs(ctx, blobs, prop.block.Root()); err != nil {
return nil, status.Errorf(codes.Internal, "could not broadcast and receive blob sidecars: %v", err)
}
}
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
Type: blockfeed.ReceivedBlock,
Data: &blockfeed.ReceivedBlockData{SignedBlock: prop.block},
})
if err := vs.BlockReceiver.ReceiveBlock(ctx, prop.block, prop.block.Root(), nil); err != nil {
return nil, status.Errorf(codes.Internal, "receive block failed: %v", err)
}
return &ethpb.ProposeResponse{BlockRoot: prop.block.RootSlice()}, nil
}
type proposal struct {
block blocks.ROBlock
epoch primitives.Epoch
req *ethpb.GenericSignedBeaconBlock
blobs []*ethpb.BlobSidecar
columns []blocks.RODataColumn
}
func parsedProposal(block interfaces.SignedBeaconBlock, req *ethpb.GenericSignedBeaconBlock) (*proposal, error) {
rob, err := blocks.NewROBlock(block)
if err != nil {
return nil, errors.Wrap(err, "could not create read-only block")
}
return &proposal{
block: rob,
epoch: slots.ToEpoch(rob.Block().Slot()),
req: req,
}, nil
}
func unblindProposalRequest(ctx context.Context, req *ethpb.GenericSignedBeaconBlock, bldr builder.BlockBuilder) (*proposal, error) {
block, err := blocks.NewSignedBeaconBlock(req.Block)
if err != nil {
return nil, errors.Wrap(err, "decode block failed")
}
if slots.ToEpoch(block.Block().Slot()) < params.BeaconConfig().BellatrixForkEpoch || !block.IsBlinded() {
return parsedProposal(block, req)
}
if bldr == nil || !bldr.Configured() {
return nil, errors.New("can not propose blinded block without block builder configured")
}
if slots.ToEpoch(block.Block().Slot()) >= params.BeaconConfig().FuluForkEpoch {
if err := bldr.SubmitBlindedBlockPostFulu(ctx, block); err != nil {
return nil, status.Errorf(codes.Internal, "could not submit blinded block post-Fulu: %v", err)
}
return parsedProposal(block, req)
}
payload, bundle, err := bldr.SubmitBlindedBlock(ctx, block)
if err != nil {
return nil, errors.Wrap(err, "submit blinded block failed")
}
ublock, err := block.Copy()
if err != nil {
return nil, errors.Wrap(err, "copy block")
}
if err := ublock.Unblind(payload); err != nil {
return nil, errors.Wrap(err, "unblind failed")
}
unblinded, err := parsedProposal(ublock, req)
if err != nil {
return nil, errors.Wrap(err, "could not create unblinder")
}
sidecars, err := unblindBlobsSidecars(ublock, bundle)
if err != nil {
return nil, errors.Wrap(err, "unblind blobs sidecars: commitment value doesn't match block")
}
unblinded.blobs = sidecars
return unblinded, nil
}
func (u *proposal) buildSidecars() error {
var err error
if u.shouldBroadcastBlobs() {
u.blobs, err = u.buildBlobs()
if err != nil {
return errors.Wrap(err, "could not construct blob sidecars")
}
}
if u.shouldBroadcastColumns() {
u.columns, err = u.buildColumns()
if err != nil {
return errors.Wrap(err, "could not construct data columns")
}
}
return nil
}
func (u *proposal) buildColumns() ([]blocks.RODataColumn, error) {
if u.epoch < params.BeaconConfig().FuluForkEpoch {
return nil, nil // No data columns before fulu.
}
if len(u.columns) > 0 {
return u.columns, nil
}
rawBlobs, proofs, err := blobsAndProofs(u.req)
if err != nil {
return nil, err
}
// Compute cells and proofs from the blobs and cell proofs.
cellsAndProofs, err := peerdas.ComputeCellsAndProofsFromFlat(rawBlobs, proofs)
if err != nil {
return nil, errors.Wrap(err, "compute cells and proofs")
}
// Construct data column sidecars from the signed block and cells and proofs.
roDataColumnSidecars, err := peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(u.block))
if err != nil {
return nil, errors.Wrap(err, "data column sidcars")
}
return roDataColumnSidecars, nil
}
func (u *proposal) buildBlobs() ([]*ethpb.BlobSidecar, error) {
if u.epoch < params.BeaconConfig().DenebForkEpoch || u.epoch >= params.BeaconConfig().FuluForkEpoch {
return nil, nil // No blobs before deneb, data column sidecars after fulu
}
if len(u.blobs) > 0 {
return u.blobs, nil
}
rawBlobs, proofs, err := blobsAndProofs(u.req)
if err != nil {
return nil, err
}
return BuildBlobSidecars(u.block, rawBlobs, proofs)
}
func (u *proposal) shouldBroadcastBlock() bool {
// we should broadcast if the block is unblinded, or blinded and pre-fulu
return !u.block.IsBlinded() || u.epoch < params.BeaconConfig().FuluForkEpoch
}
func (u *proposal) shouldBroadcastBlobs() bool {
return u.epoch >= params.BeaconConfig().DenebForkEpoch && u.epoch < params.BeaconConfig().FuluForkEpoch
}
func (u *proposal) shouldBroadcastColumns() bool {
return u.epoch >= params.BeaconConfig().FuluForkEpoch
}
// broadcastAndReceiveSidecars broadcasts and receives sidecars.
func (vs *Server) broadcastAndReceiveSidecars(
ctx context.Context,
block interfaces.SignedBeaconBlock,
root [fieldparams.RootLength]byte,
block blocks.ROBlock,
blobSidecars []*ethpb.BlobSidecar,
dataColumnSidecars []blocks.RODataColumn,
) error {
if block.Version() >= version.Fulu {
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSidecars, root); err != nil {
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSidecars, block.Root()); err != nil {
return errors.Wrap(err, "broadcast and receive data columns")
}
return nil
}
if err := vs.broadcastAndReceiveBlobs(ctx, blobSidecars, root); err != nil {
if err := vs.broadcastAndReceiveBlobs(ctx, blobSidecars, block.Root()); err != nil {
return errors.Wrap(err, "broadcast and receive blobs")
}
@@ -398,41 +530,13 @@ func (vs *Server) handleBlindedBlock(ctx context.Context, block interfaces.Signe
return copiedBlock, sidecars, nil
}
func (vs *Server) handleUnblindedBlock(
block blocks.ROBlock,
req *ethpb.GenericSignedBeaconBlock,
) ([]*ethpb.BlobSidecar, []blocks.RODataColumn, error) {
rawBlobs, proofs, err := blobsAndProofs(req)
if err != nil {
return nil, nil, err
}
// broadcastReceiveBlock broadcasts a block and handles its reception.
func (vs *Server) broadcastReceiveBlock(ctx context.Context, block blocks.ROBlock) error {
if block.Version() >= version.Fulu {
// Compute cells and proofs from the blobs and cell proofs.
cellsAndProofs, err := peerdas.ComputeCellsAndProofsFromFlat(rawBlobs, proofs)
if err != nil {
return nil, nil, errors.Wrap(err, "compute cells and proofs")
}
// Construct data column sidecars from the signed block and cells and proofs.
roDataColumnSidecars, err := peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(block))
if err != nil {
return nil, nil, errors.Wrap(err, "data column sidcars")
}
return nil, roDataColumnSidecars, nil
}
blobSidecars, err := BuildBlobSidecars(block, rawBlobs, proofs)
if err != nil {
return nil, nil, errors.Wrap(err, "build blob sidecars")
}
return blobSidecars, nil, nil
return nil
}
// broadcastReceiveBlock broadcasts a block and handles its reception.
func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.SignedBeaconBlock, root [fieldparams.RootLength]byte) error {
func (vs *Server) broadcastBlock(ctx context.Context, block blocks.ROBlock) error {
protoBlock, err := block.Proto()
if err != nil {
return errors.Wrap(err, "protobuf conversion failed")
@@ -440,15 +544,20 @@ func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.Si
if err := vs.P2P.Broadcast(ctx, protoBlock); err != nil {
return errors.Wrap(err, "broadcast failed")
}
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
Type: blockfeed.ReceivedBlock,
Data: &blockfeed.ReceivedBlockData{SignedBlock: block},
})
return vs.BlockReceiver.ReceiveBlock(ctx, block, root, nil)
log.WithFields(logrus.Fields{
"slot": block.Block().Slot(),
"root": fmt.Sprintf("%#x", block.Root()),
}).Debug("Broadcasted block")
return nil
}
// broadcastAndReceiveBlobs handles the broadcasting and reception of blob sidecars.
func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethpb.BlobSidecar, root [fieldparams.RootLength]byte) error {
if len(sidecars) == 0 {
return nil
}
eg, eCtx := errgroup.WithContext(ctx)
for subIdx, sc := range sidecars {
eg.Go(func() error {
@@ -479,6 +588,9 @@ func (vs *Server) broadcastAndReceiveDataColumns(
roSidecars []blocks.RODataColumn,
root [fieldparams.RootLength]byte,
) error {
if len(roSidecars) == 0 {
return nil
}
verifiedRODataColumns := make([]blocks.VerifiedRODataColumn, 0, len(roSidecars))
eg, _ := errgroup.WithContext(ctx)
for _, roSidecar := range roSidecars {
@@ -498,10 +610,6 @@ func (vs *Server) broadcastAndReceiveDataColumns(
})
}
if err := eg.Wait(); err != nil {
return errors.Wrap(err, "wait for data columns to be broadcasted")
}
if err := vs.DataColumnReceiver.ReceiveDataColumns(verifiedRODataColumns); err != nil {
return errors.Wrap(err, "receive data column")
}
@@ -512,6 +620,11 @@ func (vs *Server) broadcastAndReceiveDataColumns(
Data: &operation.DataColumnSidecarReceivedData{DataColumn: &verifiedRODataColumn}, // #nosec G601
})
}
if err := eg.Wait(); err != nil {
return errors.Wrap(err, "wait for data columns to be broadcasted")
}
return nil
}

View File

@@ -145,7 +145,7 @@ func NewService(ctx context.Context, cfg *Config) *Service {
log.WithError(err).Errorf("Could not listen to port in Start() %s", address)
}
s.listener = lis
log.WithField("address", address).Info("gRPC server listening on port")
log.WithField("address", address).Info("Beacon chain gRPC server listening")
opts := []grpc.ServerOption{
grpc.StatsHandler(otelgrpc.NewServerHandler()),
@@ -351,7 +351,7 @@ func (s *Service) Stop() error {
s.cancel()
if s.listener != nil {
s.grpcServer.GracefulStop()
log.Debug("Initiated graceful stop of gRPC server")
log.Debug("Completed graceful stop of beacon-chain gRPC server")
}
return nil
}

View File

@@ -42,8 +42,9 @@ func TestLifecycle_OK(t *testing.T) {
rpcService.Start()
require.LogsContain(t, hook, "listening on port")
require.LogsContain(t, hook, "Beacon chain gRPC server listening")
assert.NoError(t, rpcService.Stop())
require.LogsContain(t, hook, "Completed graceful stop of beacon-chain gRPC server")
}
func TestStatus_CredentialError(t *testing.T) {
@@ -84,7 +85,7 @@ func TestRPC_InsecureEndpoint(t *testing.T) {
rpcService.Start()
require.LogsContain(t, hook, "listening on port")
require.LogsContain(t, hook, "Beacon chain gRPC server listening")
require.LogsContain(t, hook, "You are using an insecure gRPC server")
assert.NoError(t, rpcService.Stop())
}

View File

@@ -0,0 +1,3 @@
### Changed
- Clarified misleading log messages in beacon-chain/rpc/service gRPC module.

View File

@@ -0,0 +1,2 @@
### Changed
- Build DataColumnSidecars concurrently with block publishing.

View File

@@ -0,0 +1,4 @@
### Changed
- Broadcast block then sidecars, instead block and sidecars concurrently
- Broadcast and receive sidecars in concurrently, instead sequentially

View File

@@ -0,0 +1,3 @@
### Fixed
- Use v2 endpoint for blinded block submission post-Fulu

View File

@@ -308,7 +308,7 @@ func (node *BeaconNode) Start(ctx context.Context) error {
return fmt.Errorf("failed to start beacon node: %w", err)
}
if err = helpers.WaitForTextInFile(stdOutFile, "gRPC server listening on port"); err != nil {
if err = helpers.WaitForTextInFile(stdOutFile, "Beacon chain gRPC server listening"); err != nil {
return fmt.Errorf("could not find multiaddr for node %d, this means the node had issues starting: %w", index, err)
}

View File

@@ -58,5 +58,4 @@ func TestServer_InitializeRoutes(t *testing.T) {
}
}
}
}