Compare commits

...

3 Commits

Author SHA1 Message Date
Manu NALEPA
792fa22099 Add the --disable-get-blobs-v2 flag and fixes #16171 (#16155)
**What type of PR is this?**
Feature + Bugfix

**What does this PR do? Why is it needed?**
Starting at Fusaka, the beacon node can pull blobs with the
`engine_getBlobsV2` API from the execution layer.
This reduces by a lot the burden on the beacon node. However, the beacon
node should be able to work 100% correctly without this execution layer
help.

This PR introduces the `--disable-get-blobs-v2` flag to simulate a 0%
success rate of this engine API.

This PR also fixes:
- https://github.com/OffchainLabs/prysm/issues/16171

Please read commit by commit with commit messages.

**How to test it:**
For the `--disable-get-blobs-v2` part:

Run the beacon node with the `--disable-get-blobs-v2` flag in DEBUG
mode.
For every block with commitments, the following log should be displayed:
```
[2025-12-19 15:36:25.49] DEBUG sync: No data column sidecars constructed from the execution client ...
```

And the following log should **never** be displayed:
```
[2026-01-05 10:19:00.55] DEBUG sync: Constructed data column sidecars from the execution client count=...
```

For the #16171 part:
- No ERROR log showed in the linked issue should never be displayed.

**Acknowledgements**
- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-01-05 22:29:15 +00:00
Preston Van Loon
c5b3d3531c Added changelog for v7.1.1 (#16161)
<!-- Thanks for sending a PR! Before submitting:

1. If this is your first PR, check out our contribution guide here
https://docs.prylabs.network/docs/contribute/contribution-guidelines
You will then need to sign our Contributor License Agreement (CLA),
which will show up as a comment from a bot in this pull request after
you open it. We cannot review code without a signed CLA.
2. Please file an associated tracking issue if this pull request is
non-trivial and requires context for our team to understand. All
features and most bug fixes should have
an associated issue with a design discussed and decided upon. Small bug
   fixes and documentation improvements don't need issues.
3. New features and bug fixes must have tests. Documentation may need to
be updated. If you're unsure what to update, send the PR, and we'll
discuss
   in review.
4. Note that PRs updating dependencies and new Go versions are not
accepted.
   Please file an issue instead.
5. A changelog entry is required for user facing issues.
-->

**What type of PR is this?**

Documentation

**What does this PR do? Why is it needed?**

v7.1.1 release is coming today

**Which issues(s) does this PR fix?**


**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [ ] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-01-05 22:26:13 +00:00
Aarsh Shah
cc4510bb77 p2p: batch publish data column sidecars (#16183)
**What type of PR is this?**

Feature

What does this PR do? Why is it needed?

This PR takes @MarcoPolo 's PR at
https://github.com/OffchainLabs/prysm/pull/16130 to completion with
tests.

The description on his PR:

"""
a relatively small change to optimize network send order.

Without this, network writes tend to prioritize sending data for one
column to all peers before sending data for later columns (e.g for two
columns and 4 peers per column it would send A,A,A,A,B,B,B,B). With
batch publishing we can change the write order to round robin across
columns (e.g. A,B,A,B,A,B,A,B).

In cases where the process is sending at a rate over the network limit,
this approach allows at least some copies of the column to propagate
through the network. In early simulations with bandwidth limits of
50mbps for the publisher, this improved dissemination by ~20-30%.
"""
See the issue for some more context.

**Which issues(s) does this PR fix?**

Fixes https://github.com/OffchainLabs/prysm/issues/16129

Other notes for review

Acknowledgements

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [ ] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [ ] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).

---------

Co-authored-by: Marco Munizaga <git@marcopolo.io>
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: kasey <489222+kasey@users.noreply.github.com>
Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
2026-01-05 22:02:06 +00:00
37 changed files with 494 additions and 137 deletions

3
.gitignore vendored
View File

@@ -44,3 +44,6 @@ tmp
# spectest coverage reports
report.txt
# execution client data
execution/

View File

@@ -4,6 +4,47 @@ All notable changes to this project will be documented in this file.
The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.
## [v7.1.1](https://github.com/prysmaticlabs/prysm/compare/v7.1.0...v7.1.1) - 2025-12-18
Release highlights:
- Fixed potential deadlock scenario in data column batch verification
- Improved processing and metrics for cells and proofs
We are aware of [an issue](https://github.com/OffchainLabs/prysm/issues/16160) where Prysm struggles to sync from an out of sync state. We will have another release before the end of the year to address this issue.
Our postmortem document from the December 4th mainnet issue has been published on our [documentation site](https://prysm.offchainlabs.com/docs/misc/mainnet-postmortems/)
### Added
- Track the dependent root of the latest finalized checkpoint in forkchoice. [[PR]](https://github.com/prysmaticlabs/prysm/pull/16103)
- Proposal design document to implement graffiti. Currently it is empty by default and the idea is to have it of the form GE168dPR63af. [[PR]](https://github.com/prysmaticlabs/prysm/pull/15983)
- Add support for detecting and logging per address reachability via libp2p AutoNAT v2. [[PR]](https://github.com/prysmaticlabs/prysm/pull/16100)
- Static analyzer that ensures each `httputil.HandleError` call is followed by a `return` statement. [[PR]](https://github.com/prysmaticlabs/prysm/pull/16134)
- Prometheus histogram `cells_and_proofs_from_structured_computation_milliseconds` to track computation time for cells and proofs from structured blobs. [[PR]](https://github.com/prysmaticlabs/prysm/pull/16115)
- Prometheus histogram `get_blobs_v2_latency_milliseconds` to track RPC latency for `getBlobsV2` calls to the execution layer. [[PR]](https://github.com/prysmaticlabs/prysm/pull/16115)
### Changed
- Optimise migratetocold by not doing brute force for loop. [[PR]](https://github.com/prysmaticlabs/prysm/pull/16101)
- e2e sync committee evaluator now skips the first slot after startup, we already skip the fork epoch for checks here, this skip only applies on startup, due to altair always from 0 and validators need to warm up. [[PR]](https://github.com/prysmaticlabs/prysm/pull/16145)
- Run `ComputeCellsAndProofsFromFlat` in parallel to improve performance when computing cells and proofs. [[PR]](https://github.com/prysmaticlabs/prysm/pull/16115)
- Run `ComputeCellsAndProofsFromStructured` in parallel to improve performance when computing cells and proofs. [[PR]](https://github.com/prysmaticlabs/prysm/pull/16115)
### Removed
- Unnecessary copy is removed from Eth1DataHasEnoughSupport. [[PR]](https://github.com/prysmaticlabs/prysm/pull/16118)
### Fixed
- Incorrect constructor return type [#16084](https://github.com/OffchainLabs/prysm/pull/16084). [[PR]](https://github.com/prysmaticlabs/prysm/pull/16084)
- Fixed possible race when validating two attestations at the same time. [[PR]](https://github.com/prysmaticlabs/prysm/pull/16105)
- Fix missing return after version header check in SubmitAttesterSlashingsV2. [[PR]](https://github.com/prysmaticlabs/prysm/pull/16126)
- Fix deadlock in data column gossip KZG batch verification when a caller times out preventing result delivery. [[PR]](https://github.com/prysmaticlabs/prysm/pull/16141)
- Fixed replay state issue in rest api caused by attester and sync committee duties endpoints. [[PR]](https://github.com/prysmaticlabs/prysm/pull/16136)
- Do not error when committee has been computed correctly but updating the cache failed. [[PR]](https://github.com/prysmaticlabs/prysm/pull/16142)
- Prevent blocked sends to the KZG batch verifier when the caller context is already canceled, avoiding useless queueing and potential hangs. [[PR]](https://github.com/prysmaticlabs/prysm/pull/16144)
## [v7.1.0](https://github.com/prysmaticlabs/prysm/compare/v7.0.0...v7.1.0) - 2025-12-10
This release includes several key features/fixes. If you are running v7.0.0 then you should update to v7.0.1 or later and remove the flag `--disable-last-epoch-targets`.

View File

@@ -40,6 +40,7 @@ go_library(
"//beacon-chain/state/state-native:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",

View File

@@ -11,6 +11,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/execution/types"
"github.com/OffchainLabs/prysm/v7/beacon-chain/verification"
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
@@ -538,6 +539,10 @@ func (s *Service) GetBlobsV2(ctx context.Context, versionedHashes []common.Hash)
return nil, errors.New(fmt.Sprintf("%s is not supported", GetBlobsV2))
}
if flags.Get().DisableGetBlobsV2 {
return []*pb.BlobAndProofV2{}, nil
}
result := make([]*pb.BlobAndProofV2, len(versionedHashes))
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV2, versionedHashes)

View File

@@ -22,6 +22,7 @@ import (
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/time/slots"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/sirupsen/logrus"
@@ -357,58 +358,67 @@ func (s *Service) BroadcastDataColumnSidecars(ctx context.Context, sidecars []bl
return nil
}
// broadcastDataColumnSidecars broadcasts multiple data column sidecars to the p2p network, after ensuring
// there is at least one peer in each needed subnet. If not, it will attempt to find one before broadcasting.
// It returns when all broadcasts are complete, or the context is cancelled (whichever comes first).
// broadcastDataColumnSidecars broadcasts multiple data column sidecars to the p2p network.
// For sidecars with available peers, it uses batch publishing.
// For sidecars without peers, it finds peers first and then publishes individually.
// Both paths run in parallel. It returns when all broadcasts are complete, or the context is cancelled.
func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [fieldparams.VersionLength]byte, sidecars []blocks.VerifiedRODataColumn) {
type rootAndIndex struct {
root [fieldparams.RootLength]byte
index uint64
}
var (
wg sync.WaitGroup
timings sync.Map
)
var timings sync.Map
logLevel := logrus.GetLevel()
slotPerRoot := make(map[[fieldparams.RootLength]byte]primitives.Slot, 1)
topicFunc := func(sidecar blocks.VerifiedRODataColumn) (topic string, wrappedSubIdx uint64, subnet uint64) {
subnet = peerdas.ComputeSubnetForDataColumnSidecar(sidecar.Index)
topic = dataColumnSubnetToTopic(subnet, forkDigest)
wrappedSubIdx = subnet + dataColumnSubnetVal
return
}
sidecarsWithPeers := make([]blocks.VerifiedRODataColumn, 0, len(sidecars))
var sidecarsWithoutPeers []blocks.VerifiedRODataColumn
// Categorize sidecars by peer availability.
for _, sidecar := range sidecars {
slotPerRoot[sidecar.BlockRoot()] = sidecar.Slot()
wg.Go(func() {
// Add tracing to the function.
ctx, span := trace.StartSpan(s.ctx, "p2p.broadcastDataColumnSidecars")
topic, wrappedSubIdx, _ := topicFunc(sidecar)
// Check if we have a peer for this subnet (use RLock for read-only check).
mu := s.subnetLocker(wrappedSubIdx)
mu.RLock()
hasPeer := s.hasPeerWithSubnet(topic)
mu.RUnlock()
if hasPeer {
sidecarsWithPeers = append(sidecarsWithPeers, sidecar)
continue
}
sidecarsWithoutPeers = append(sidecarsWithoutPeers, sidecar)
}
var batchWg, individualWg sync.WaitGroup
// Batch publish sidecars that already have peers
var messageBatch pubsub.MessageBatch
for _, sidecar := range sidecarsWithPeers {
batchWg.Go(func() {
_, span := trace.StartSpan(ctx, "p2p.broadcastDataColumnSidecars")
ctx := trace.NewContext(s.ctx, span)
defer span.End()
// Compute the subnet for this data column sidecar.
subnet := peerdas.ComputeSubnetForDataColumnSidecar(sidecar.Index)
topic, _, _ := topicFunc(sidecar)
// Build the topic corresponding to subnet column subnet and this fork digest.
topic := dataColumnSubnetToTopic(subnet, forkDigest)
// Compute the wrapped subnet index.
wrappedSubIdx := subnet + dataColumnSubnetVal
// Find peers if needed.
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, DataColumnSubnetTopicFormat, forkDigest, subnet); err != nil {
if err := s.batchObject(ctx, &messageBatch, sidecar, topic); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot find peers if needed")
log.WithError(err).Error("Cannot batch data column sidecar")
return
}
// Broadcast the data column sidecar to the network.
if err := s.broadcastObject(ctx, sidecar, topic); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot broadcast data column sidecar")
return
}
// Increase the number of successful broadcasts.
dataColumnSidecarBroadcasts.Inc()
// Record the timing for log purposes.
if logLevel >= logrus.DebugLevel {
root := sidecar.BlockRoot()
timings.Store(rootAndIndex{root: root, index: sidecar.Index}, time.Now())
@@ -416,8 +426,50 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
})
}
// Wait for all broadcasts to finish.
wg.Wait()
// For sidecars without peers, find peers and publish individually (no batching).
for _, sidecar := range sidecarsWithoutPeers {
individualWg.Go(func() {
_, span := trace.StartSpan(ctx, "p2p.broadcastDataColumnSidecars")
ctx := trace.NewContext(s.ctx, span)
defer span.End()
topic, wrappedSubIdx, subnet := topicFunc(sidecar)
// Find peers for this sidecar's subnet.
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, DataColumnSubnetTopicFormat, forkDigest, subnet); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot find peers if needed")
return
}
// Publish individually (not batched) since we just found peers.
if err := s.broadcastObject(ctx, sidecar, topic); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot broadcast data column sidecar")
return
}
dataColumnSidecarBroadcasts.Inc()
if logLevel >= logrus.DebugLevel {
root := sidecar.BlockRoot()
timings.Store(rootAndIndex{root: root, index: sidecar.Index}, time.Now())
}
})
}
// Wait for batch to be populated, then publish.
batchWg.Wait()
if len(sidecarsWithPeers) > 0 {
if err := s.pubsub.PublishBatch(&messageBatch); err != nil {
log.WithError(err).Error("Cannot publish batch for data column sidecars")
} else {
dataColumnSidecarBroadcasts.Add(float64(len(sidecarsWithPeers)))
}
}
// Wait for all individual publishes to complete.
individualWg.Wait()
// The rest of this function is only for debug logging purposes.
if logLevel < logrus.DebugLevel {
@@ -504,28 +556,68 @@ func (s *Service) findPeersIfNeeded(
return nil
}
// method to broadcast messages to other peers in our gossip mesh.
// encodeGossipMessage encodes an object for gossip transmission.
// It returns the encoded bytes and the full topic with protocol suffix.
func (s *Service) encodeGossipMessage(obj ssz.Marshaler, topic string) ([]byte, string, error) {
buf := new(bytes.Buffer)
if _, err := s.Encoding().EncodeGossip(buf, obj); err != nil {
return nil, "", fmt.Errorf("could not encode message: %w", err)
}
return buf.Bytes(), topic + s.Encoding().ProtocolSuffix(), nil
}
// broadcastObject broadcasts a message to other peers in our gossip mesh.
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error {
ctx, span := trace.StartSpan(ctx, "p2p.broadcastObject")
defer span.End()
span.SetAttributes(trace.StringAttribute("topic", topic))
buf := new(bytes.Buffer)
if _, err := s.Encoding().EncodeGossip(buf, obj); err != nil {
err := errors.Wrap(err, "could not encode message")
data, fullTopic, err := s.encodeGossipMessage(obj, topic)
if err != nil {
tracing.AnnotateError(span, err)
return err
}
if span.IsRecording() {
id := hash.FastSum64(buf.Bytes())
messageLen := int64(buf.Len())
id := hash.FastSum64(data)
messageLen := int64(len(data))
// lint:ignore uintcast -- It's safe to do this for tracing.
iid := int64(id)
span = trace.AddMessageSendEvent(span, iid, messageLen /*uncompressed*/, messageLen /*compressed*/)
}
if err := s.PublishToTopic(ctx, topic+s.Encoding().ProtocolSuffix(), buf.Bytes()); err != nil {
if err := s.PublishToTopic(ctx, fullTopic, data); err != nil {
err := errors.Wrap(err, "could not publish message")
tracing.AnnotateError(span, err)
return err
}
return nil
}
// batchObject adds an object to a message batch for a future broadcast.
// The caller MUST publish the batch after all messages have been added.
func (s *Service) batchObject(ctx context.Context, batch *pubsub.MessageBatch, obj ssz.Marshaler, topic string) error {
ctx, span := trace.StartSpan(ctx, "p2p.batchObject")
defer span.End()
span.SetAttributes(trace.StringAttribute("topic", topic))
data, fullTopic, err := s.encodeGossipMessage(obj, topic)
if err != nil {
tracing.AnnotateError(span, err)
return err
}
if span.IsRecording() {
id := hash.FastSum64(data)
messageLen := int64(len(data))
// lint:ignore uintcast -- It's safe to do this for tracing.
iid := int64(id)
span = trace.AddMessageSendEvent(span, iid, messageLen /*uncompressed*/, messageLen /*compressed*/)
}
if err := s.addToBatch(ctx, batch, fullTopic, data); err != nil {
err := errors.Wrap(err, "could not publish message")
tracing.AnnotateError(span, err)
return err

View File

@@ -32,6 +32,8 @@ import (
"github.com/OffchainLabs/prysm/v7/time/slots"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"google.golang.org/protobuf/proto"
)
@@ -787,3 +789,190 @@ func TestService_BroadcastDataColumn(t *testing.T) {
require.NoError(t, service.Encoding().DecodeGossip(msg.Data, &result))
require.DeepEqual(t, &result, verifiedRoSidecar)
}
type topicInvoked struct {
topic string
pid peer.ID
}
// rpcOrderTracer is a RawTracer implementation that captures the order of SendRPC calls.
// It records the topics of messages sent via pubsub to verify round-robin ordering.
type rpcOrderTracer struct {
mu sync.Mutex
invoked []*topicInvoked
byTopic map[string][]peer.ID
}
func (t *rpcOrderTracer) SendRPC(rpc *pubsub.RPC, pid peer.ID) {
t.mu.Lock()
defer t.mu.Unlock()
for _, msg := range rpc.GetPublish() {
invoked := &topicInvoked{topic: msg.GetTopic(), pid: pid}
t.invoked = append(t.invoked, invoked)
t.byTopic[invoked.topic] = append(t.byTopic[invoked.topic], invoked.pid)
}
}
func newRpcOrderTracer() *rpcOrderTracer {
return &rpcOrderTracer{byTopic: make(map[string][]peer.ID)}
}
func (t *rpcOrderTracer) getTopics() []string {
t.mu.Lock()
defer t.mu.Unlock()
result := make([]string, len(t.invoked))
for i := range t.invoked {
result[i] = t.invoked[i].topic
}
return result
}
// No-op implementations for other RawTracer methods.
func (*rpcOrderTracer) AddPeer(peer.ID, protocol.ID) {}
func (*rpcOrderTracer) RemovePeer(peer.ID) {}
func (*rpcOrderTracer) Join(string) {}
func (*rpcOrderTracer) Leave(string) {}
func (*rpcOrderTracer) Graft(peer.ID, string) {}
func (*rpcOrderTracer) Prune(peer.ID, string) {}
func (*rpcOrderTracer) ValidateMessage(*pubsub.Message) {}
func (*rpcOrderTracer) DeliverMessage(*pubsub.Message) {}
func (*rpcOrderTracer) RejectMessage(*pubsub.Message, string) {}
func (*rpcOrderTracer) DuplicateMessage(*pubsub.Message) {}
func (*rpcOrderTracer) ThrottlePeer(peer.ID) {}
func (*rpcOrderTracer) RecvRPC(*pubsub.RPC) {}
func (*rpcOrderTracer) DropRPC(*pubsub.RPC, peer.ID) {}
func (*rpcOrderTracer) UndeliverableMessage(*pubsub.Message) {}
// TestService_BroadcastDataColumnRoundRobin verifies that when broadcasting multiple
// data column sidecars, messages are interleaved in round-robin order by column index
// rather than sending all copies of one column before the next.
//
// Without batch publishing: A,A,A,A,B,B,B,B (all peers for column A, then all for column B)
// With batch publishing: A,B,A,B,A,B,A,B (interleaved by message ID)
func TestService_BroadcastDataColumnRoundRobin(t *testing.T) {
const (
port = 2100
topicFormat = DataColumnSubnetTopicFormat
)
ctx := t.Context()
// Load the KZG trust setup.
err := kzg.Start()
require.NoError(t, err)
gFlags := new(flags.GlobalFlags)
gFlags.MinimumPeersPerSubnet = 1
flags.Init(gFlags)
defer flags.Init(new(flags.GlobalFlags))
// Create a tracer to capture the order of SendRPC calls.
tracer := newRpcOrderTracer()
// Create the publisher node with the tracer injected.
p1 := p2ptest.NewTestP2PWithPubsubOptions(t, []pubsub.Option{pubsub.WithRawTracer(tracer)})
// Create subscriber peers.
expectedPeers := []*p2ptest.TestP2P{
p2ptest.NewTestP2P(t),
p2ptest.NewTestP2P(t),
}
// Connect peers.
for _, p := range expectedPeers {
p1.Connect(p)
}
require.NotEqual(t, 0, len(p1.BHost.Network().Peers()), "No peers")
// Create a host for discovery.
_, pkey, ipAddr := createHost(t, port)
// Create a shared DB for the service.
db := testDB.SetupDB(t)
// Create and close the custody info channel immediately since custodyInfo is already set.
custodyInfoSet := make(chan struct{})
close(custodyInfoSet)
service := &Service{
ctx: ctx,
host: p1.BHost,
pubsub: p1.PubSub(),
joinedTopics: map[string]*pubsub.Topic{},
cfg: &Config{DB: db},
genesisTime: time.Now(),
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
subnetsLock: make(map[uint64]*sync.RWMutex),
subnetsLockLock: sync.Mutex{},
peers: peers.NewStatus(ctx, &peers.StatusConfig{ScorerParams: &scorers.Config{}}),
custodyInfo: &custodyInfo{},
custodyInfoSet: custodyInfoSet,
}
// Create a listener for discovery.
listener, err := service.startDiscoveryV5(ipAddr, pkey)
require.NoError(t, err)
service.dv5Listener = listener
digest, err := service.currentForkDigest()
require.NoError(t, err)
// Create multiple data column sidecars with different column indices.
// Use indices that map to different subnets: 0, 32, 64 (assuming 128 columns and 64 subnets).
columnIndices := []uint64{0, 32, 64}
params := make([]util.DataColumnParam, len(columnIndices))
for i, idx := range columnIndices {
params[i] = util.DataColumnParam{Index: idx}
}
_, verifiedRoSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, params)
expectedTopics := make(map[string]bool)
// Subscribe peers to the relevant topics.
for _, idx := range columnIndices {
subnet := peerdas.ComputeSubnetForDataColumnSidecar(idx)
topic := fmt.Sprintf(topicFormat, digest, subnet) + service.Encoding().ProtocolSuffix()
for _, p := range expectedPeers {
_, err = p.SubscribeToTopic(topic)
require.NoError(t, err)
}
expectedTopics[topic] = true
}
// libp2p needs some time to establish mesh connections.
time.Sleep(100 * time.Millisecond)
// Broadcast all sidecars.
err = service.BroadcastDataColumnSidecars(ctx, verifiedRoSidecars)
require.NoError(t, err)
// Give some time for messages to be sent.
time.Sleep(100 * time.Millisecond)
topics := tracer.getTopics()
if len(topics) == 0 {
t.Fatal("Expected at least one message for each topic to be sent to each peer")
}
unseen := make(map[string]bool)
for k := range expectedTopics {
unseen[k] = true
}
// Verify round-robin invariant: before all message IDs are seen, no message ID may be repeated.
// In round-robin order, we should see each topic once before any topic repeats.
for _, topic := range topics {
if !expectedTopics[topic] {
continue
}
if !unseen[topic] {
t.Errorf("Topic %s repeated before all topics were seen once. This violates round-robin ordering.", topic)
}
delete(unseen, topic)
if len(unseen) == 0 {
break // all have been seen
}
}
require.Equal(t, 0, len(unseen))
// Verify that we actually saw all expected topics.
for topic := range expectedTopics {
require.Equal(t, len(expectedPeers), len(tracer.byTopic[topic]))
}
}

View File

@@ -99,6 +99,27 @@ func (s *Service) PublishToTopic(ctx context.Context, topic string, data []byte,
}
}
// addToBatch joins (if necessary) a topic and adds the message to a message batch.
func (s *Service) addToBatch(ctx context.Context, batch *pubsub.MessageBatch, topic string, data []byte, opts ...pubsub.PubOpt) error {
topicHandle, err := s.JoinTopic(topic)
if err != nil {
return fmt.Errorf("joining topic: %w", err)
}
// Wait for at least 1 peer to be available to receive the published message.
for {
if flags.Get().MinimumSyncPeers == 0 || len(topicHandle.ListPeers()) > 0 {
return topicHandle.AddToBatch(ctx, batch, data, opts...)
}
select {
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), "unable to find requisite number of peers for topic %s, 0 peers found to publish to", topic)
case <-time.After(100 * time.Millisecond):
// reenter the for loop after 100ms
}
}
}
// SubscribeToTopic joins (if necessary) and subscribes to PubSub topic.
func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub.Subscription, error) {
s.awaitStateInitialized() // Genesis time and genesis validators root are required to subscribe.

View File

@@ -70,6 +70,11 @@ type TestP2P struct {
// NewTestP2P initializes a new p2p test service.
func NewTestP2P(t *testing.T, userOptions ...config.Option) *TestP2P {
return NewTestP2PWithPubsubOptions(t, nil, userOptions...)
}
// NewTestP2PWithPubsubOptions initializes a new p2p test service with custom pubsub options.
func NewTestP2PWithPubsubOptions(t *testing.T, pubsubOpts []pubsub.Option, userOptions ...config.Option) *TestP2P {
ctx := context.Background()
options := []config.Option{
libp2p.ResourceManager(&network.NullResourceManager{}),
@@ -84,10 +89,14 @@ func NewTestP2P(t *testing.T, userOptions ...config.Option) *TestP2P {
h, err := libp2p.New(options...)
require.NoError(t, err)
ps, err := pubsub.NewFloodSub(ctx, h,
defaultPubsubOpts := []pubsub.Option{
pubsub.WithMessageSigning(false),
pubsub.WithStrictSignatureVerification(false),
)
}
allPubsubOpts := append(defaultPubsubOpts, pubsubOpts...)
ps, err := pubsub.NewGossipSub(ctx, h, allPubsubOpts...)
if err != nil {
t.Fatal(err)
}

View File

@@ -48,7 +48,14 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
return errors.Wrap(err, "new ro block with root")
}
go s.processSidecarsFromExecutionFromBlock(ctx, roBlock)
go func() {
if err := s.processSidecarsFromExecutionFromBlock(ctx, roBlock); err != nil {
log.WithError(err).WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"slot": block.Slot(),
}).Error("Failed to process sidecars from execution from block")
}
}()
if err := s.cfg.chain.ReceiveBlock(ctx, signed, root, nil); err != nil {
if blockchain.IsInvalidBlock(err) {
@@ -69,28 +76,37 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
}
return err
}
if err := s.processPendingAttsForBlock(ctx, root); err != nil {
return errors.Wrap(err, "process pending atts for block")
}
return nil
}
// processSidecarsFromExecutionFromBlock retrieves (if available) sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) {
func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) error {
if roBlock.Version() >= version.Fulu {
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromBlock(roBlock)); err != nil {
log.WithError(err).Error("Failed to process data column sidecars from execution")
return
// Do not log if the context was cancelled on purpose.
// (Still log other context errors such as deadlines exceeded).
if errors.Is(err, context.Canceled) {
return nil
}
return errors.Wrap(err, "process data column sidecars from execution")
}
return
return nil
}
if roBlock.Version() >= version.Deneb {
s.processBlobSidecarsFromExecution(ctx, roBlock)
return
return nil
}
return nil
}
// processBlobSidecarsFromExecution retrieves (if available) blob sidecars data from the execution client,
@@ -168,7 +184,6 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
key := fmt.Sprintf("%#x", source.Root())
if _, err, _ := s.columnSidecarsExecSingleFlight.Do(key, func() (any, error) {
const delay = 250 * time.Millisecond
secondsPerHalfSlot := time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second
commitments, err := source.Commitments()
if err != nil {
@@ -186,9 +201,6 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
return nil, errors.Wrap(err, "column indices to sample")
}
ctx, cancel := context.WithTimeout(ctx, secondsPerHalfSlot)
defer cancel()
log := log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", source.Root()),
"slot": source.Slot(),
@@ -209,6 +221,11 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
return nil, nil
}
// Return if the context is done.
if ctx.Err() != nil {
return nil, ctx.Err()
}
if iteration == 0 {
dataColumnsRecoveredFromELAttempts.Inc()
}
@@ -220,20 +237,10 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
}
// No sidecars are retrieved from the EL, retry later
constructedSidecarCount = uint64(len(constructedSidecars))
if constructedSidecarCount == 0 {
if ctx.Err() != nil {
return nil, ctx.Err()
}
time.Sleep(delay)
continue
}
dataColumnsRecoveredFromELTotal.Inc()
constructedCount := uint64(len(constructedSidecars))
// Boundary check.
if constructedSidecarCount != fieldparams.NumberOfColumns {
if constructedSidecarCount > 0 && constructedSidecarCount != fieldparams.NumberOfColumns {
return nil, errors.Errorf("reconstruct data column sidecars returned %d sidecars, expected %d - should never happen", constructedSidecarCount, fieldparams.NumberOfColumns)
}
@@ -242,14 +249,24 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
return nil, errors.Wrap(err, "broadcast and receive unseen data column sidecars")
}
log.WithFields(logrus.Fields{
"count": len(unseenIndices),
"indices": helpers.SortedPrettySliceFromMap(unseenIndices),
}).Debug("Constructed data column sidecars from the execution client")
if constructedCount > 0 {
dataColumnsRecoveredFromELTotal.Inc()
dataColumnSidecarsObtainedViaELCount.Observe(float64(len(unseenIndices)))
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", source.Root()),
"slot": source.Slot(),
"proposerIndex": source.ProposerIndex(),
"iteration": iteration,
"type": source.Type(),
"count": len(unseenIndices),
"indices": helpers.SortedPrettySliceFromMap(unseenIndices),
}).Debug("Constructed data column sidecars from the execution client")
return nil, nil
return nil, nil
}
// Wait before retrying.
time.Sleep(delay)
}
}); err != nil {
return err
@@ -284,6 +301,11 @@ func (s *Service) broadcastAndReceiveUnseenDataColumnSidecars(
unseenIndices[sidecar.Index] = true
}
// Exit early if there are no nothing to broadcast or receive.
if len(unseenSidecars) == 0 {
return nil, nil
}
// Broadcast all the data column sidecars we reconstructed but did not see via gossip (non blocking).
if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, unseenSidecars); err != nil {
return nil, errors.Wrap(err, "broadcast data column sidecars")

View File

@@ -194,7 +194,8 @@ func TestProcessSidecarsFromExecutionFromBlock(t *testing.T) {
},
seenBlobCache: lruwrpr.New(1),
}
s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
err := s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
require.NoError(t, err)
require.Equal(t, tt.expectedBlobCount, len(chainService.Blobs))
})
}
@@ -293,7 +294,8 @@ func TestProcessSidecarsFromExecutionFromBlock(t *testing.T) {
roBlock, err := blocks.NewROBlock(sb)
require.NoError(t, err)
s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
err = s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
require.NoError(t, err)
require.Equal(t, tt.expectedDataColumnCount, len(chainService.DataColumns))
})
}

View File

@@ -25,12 +25,12 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
}
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
return errors.Wrap(err, "receive data column sidecar")
return wrapDataColumnError(sidecar, "receive data column sidecar", err)
}
wg.Go(func() error {
if err := s.processDataColumnSidecarsFromReconstruction(ctx, sidecar); err != nil {
return errors.Wrap(err, "process data column sidecars from reconstruction")
return wrapDataColumnError(sidecar, "process data column sidecars from reconstruction", err)
}
return nil
@@ -38,7 +38,13 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
wg.Go(func() error {
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromSidecar(sidecar)); err != nil {
return errors.Wrap(err, "process data column sidecars from execution")
if errors.Is(err, context.Canceled) {
// Do not log if the context was cancelled on purpose.
// (Still log other context errors such as deadlines exceeded).
return nil
}
return wrapDataColumnError(sidecar, "process data column sidecars from execution", err)
}
return nil
@@ -110,3 +116,7 @@ func (s *Service) allDataColumnSubnets(_ primitives.Slot) map[uint64]bool {
return allSubnets
}
func wrapDataColumnError(sidecar blocks.VerifiedRODataColumn, message string, err error) error {
return fmt.Errorf("%s - slot %d, root %s: %w", message, sidecar.SignedBlockHeader.Header.Slot, fmt.Sprintf("%#x", sidecar.BlockRoot()), err)
}

View File

@@ -1,3 +0,0 @@
## Fixed
- Fix missing return after version header check in SubmitAttesterSlashingsV2.

View File

@@ -1,3 +0,0 @@
## Fixed
- incorrect constructor return type [#16084](https://github.com/OffchainLabs/prysm/pull/16084)

View File

@@ -1,2 +0,0 @@
### Ignored
- Reverts AutoNatV2 change introduced in https://github.com/OffchainLabs/prysm/pull/16100 as the libp2p upgrade fails inter-op testing.

View File

@@ -1,3 +0,0 @@
### Fixed
- Prevent blocked sends to the KZG batch verifier when the caller context is already canceled, avoiding useless queueing and potential hangs.

View File

@@ -1,3 +0,0 @@
### Fixed
- Fix deadlock in data column gossip KZG batch verification when a caller times out preventing result delivery.

View File

@@ -1,3 +0,0 @@
### Fixed
- fixed replay state issue in rest api caused by attester and sync committee duties endpoints

View File

@@ -1,3 +0,0 @@
### Changed
- e2e sync committee evaluator now skips the first slot after startup, we already skip the fork epoch for checks here, this skip only applies on startup, due to altair always from 0 and validators need to warm up.

View File

@@ -1,2 +0,0 @@
### Ignored
- Added test requirement to `PULL_REQUEST_TEMPLATE.md`

View File

@@ -0,0 +1,2 @@
### Added
- `--disable-get-blobs-v2` flag.

View File

@@ -1,7 +0,0 @@
### Added
- prometheus histogram `cells_and_proofs_from_structured_computation_milliseconds` to track computation time for cells and proofs from structured blobs.
- prometheus histogram `get_blobs_v2_latency_milliseconds` to track RPC latency for `getBlobsV2` calls to the execution layer.
### Changed
- Run `ComputeCellsAndProofsFromFlat` in parallel to improve performance when computing cells and proofs.
- Run `ComputeCellsAndProofsFromStructured` in parallel to improve performance when computing cells and proofs.

View File

@@ -0,0 +1,3 @@
### Added
- Batch publish data columns for faster data propogation.

View File

@@ -1,3 +0,0 @@
### Fixed
- Fixed possible race when validating two attestations at the same time.

View File

@@ -1,3 +0,0 @@
### Added
- Track the dependent root of the latest finalized checkpoint in forkchoice.

View File

@@ -1,3 +0,0 @@
### Fixed
- Do not error when committee has been computed correctly but updating the cache failed.

View File

@@ -1,3 +0,0 @@
### Ignored
- Updated CHANGELOG.md for v7.0.1 patch release

View File

@@ -1,3 +0,0 @@
### Ignored
- Changelog for v7.1.0

3
changelog/pvl-v7.1.1.md Normal file
View File

@@ -0,0 +1,3 @@
### Ignored
- Added changelog for v7.1.1

View File

@@ -1,3 +0,0 @@
### Added
- Static analyzer that ensures each `httputil.HandleError` call is followed by a `return` statement.

View File

@@ -1,3 +0,0 @@
### Ignored
- Use `WriteStateFetchError` in API handlers whenever possible.

View File

@@ -1,3 +0,0 @@
### Removed
- Unnecessary copy is removed from Eth1DataHasEnoughSupport

View File

@@ -1,3 +0,0 @@
### Added
- Proposal design document to implement graffiti. Currently it is empty by default and the idea is to have it of the form GE168dPR63af

View File

@@ -1,3 +0,0 @@
### Changed
- Optimise migratetocold by not doing brute force for loop

View File

@@ -356,4 +356,9 @@ var (
Usage: "A comma-separated list of exponents (of 2) in decreasing order, defining the state diff hierarchy levels. The last exponent must be greater than or equal to 5.",
Value: cli.NewIntSlice(21, 18, 16, 13, 11, 9, 5),
}
// DisableGetBlobsV2 disables the engine_getBlobsV2 usage.
DisableGetBlobsV2 = &cli.BoolFlag{
Name: "disable-get-blobs-v2",
Usage: "Disables the engine_getBlobsV2 usage.",
}
)

View File

@@ -17,6 +17,7 @@ type GlobalFlags struct {
SubscribeToAllSubnets bool
Supernode bool
SemiSupernode bool
DisableGetBlobsV2 bool
MinimumSyncPeers int
MinimumPeersPerSubnet int
MaxConcurrentDials int
@@ -72,6 +73,11 @@ func ConfigureGlobalFlags(ctx *cli.Context) error {
cfg.SemiSupernode = true
}
if ctx.Bool(DisableGetBlobsV2.Name) {
log.Warning("Disabling `engine_getBlobsV2` API")
cfg.DisableGetBlobsV2 = true
}
// State-diff-exponents
cfg.StateDiffExponents = ctx.IntSlice(StateDiffExponents.Name)
if features.Get().EnableStateDiff {

View File

@@ -148,6 +148,7 @@ var appFlags = []cli.Flag{
flags.SlasherDirFlag,
flags.SlasherFlag,
flags.JwtId,
flags.DisableGetBlobsV2,
storage.BlobStoragePathFlag,
storage.DataColumnStoragePathFlag,
storage.BlobStorageLayout,

View File

@@ -169,6 +169,7 @@ var appHelpFlagGroups = []flagGroup{
flags.ExecutionJWTSecretFlag,
flags.JwtId,
flags.InteropMockEth1DataVotesFlag,
flags.DisableGetBlobsV2,
},
},
{ // Flags relevant to configuring beacon chain monitoring.