Compare commits

..

2 Commits

Author SHA1 Message Date
satushh
179fc5782c changelog 2026-01-13 20:27:58 +05:30
satushh
741debab34 avoid unnecessary buffer allocation 2026-01-13 20:22:31 +05:30
19 changed files with 534 additions and 306 deletions

View File

@@ -72,7 +72,7 @@ Do NOT add new `go_repository` to the WORKSPACE file. All dependencies should li
To enable conditional compilation and custom configuration for tests (where compiled code has more
debug info, while not being completely optimized), we rely on Go's build tags/constraints mechanism
(see official docs on [build constraints](https://pkg.go.dev/go/build#hdr-Build_Constraints)).
(see official docs on [build constraints](https://golang.org/pkg/go/build/#hdr-Build_Constraints)).
Therefore, whenever using `go test`, do not forget to pass in extra build tag, eg:
```bash

View File

@@ -9,7 +9,7 @@ This README details how to setup Prysm for interop testing for usage with other
## Installation & Setup
1. Install [Bazel](https://bazel.build/install) **(Recommended)**
1. Install [Bazel](https://docs.bazel.build/versions/master/install.html) **(Recommended)**
2. `git clone https://github.com/OffchainLabs/prysm && cd prysm`
3. `bazel build //cmd/...`

View File

@@ -15,7 +15,7 @@
## 📖 Overview
This is the core repository for Prysm, a [Golang](https://go.dev/) implementation of the [Ethereum Consensus](https://ethereum.org/en/developers/docs/consensus-mechanisms/#proof-of-stake) [specification](https://github.com/ethereum/consensus-specs), developed by [Offchain Labs](https://www.offchainlabs.com).
This is the core repository for Prysm, a [Golang](https://golang.org/) implementation of the [Ethereum Consensus](https://ethereum.org/en/developers/docs/consensus-mechanisms/#proof-of-stake) [specification](https://github.com/ethereum/consensus-specs), developed by [Offchain Labs](https://www.offchainlabs.com).
See the [Changelog](https://github.com/OffchainLabs/prysm/releases) for details of the latest releases and upcoming breaking changes.
@@ -23,7 +23,7 @@ See the [Changelog](https://github.com/OffchainLabs/prysm/releases) for details
## 🚀 Getting Started
A detailed set of installation and usage instructions as well as breakdowns of each individual component are available in the **[official documentation portal](https://prysm.offchainlabs.com/docs/)**.
A detailed set of installation and usage instructions as well as breakdowns of each individual component are available in the **[official documentation portal](https://docs.prylabs.network)**.
💬 **Need help?** Join our **[Discord Community](https://discord.gg/prysm)** for support.
@@ -51,7 +51,7 @@ Prysm maintains two permanent branches:
### 🛠 Contribution Guide
Want to get involved? Check out our **[Contribution Guide](https://prysm.offchainlabs.com/docs/contribute/contribution-guidelines/)** to learn more!
Want to get involved? Check out our **[Contribution Guide](https://docs.prylabs.network/docs/contribute/contribution-guidelines/)** to learn more!
---

View File

@@ -238,14 +238,9 @@ func recomputeRootFromLayerVariable(idx int, item [32]byte, layers [][]*[32]byte
// AddInMixin describes a method from which a length mixin is added to the
// provided root.
func AddInMixin(root [32]byte, length uint64) ([32]byte, error) {
rootBuf := new(bytes.Buffer)
if err := binary.Write(rootBuf, binary.LittleEndian, length); err != nil {
return [32]byte{}, errors.Wrap(err, "could not marshal eth1data votes length")
}
// We need to mix in the length of the slice.
rootBufRoot := make([]byte, 32)
copy(rootBufRoot, rootBuf.Bytes())
return ssz.MixInLength(root, rootBufRoot), nil
var rootBufRoot [32]byte
binary.LittleEndian.PutUint64(rootBufRoot[:], length)
return ssz.MixInLength(root, rootBufRoot[:]), nil
}
// Merkleize 32-byte leaves into a Merkle trie for its adequate depth, returning

View File

@@ -4,6 +4,9 @@ import (
"context"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/verification"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/crypto/bls"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing"
@@ -53,6 +56,32 @@ func (s *Service) verifierRoutine() {
}
}
// A routine that runs in the background to perform batch
// KZG verifications by draining the channel and processing all pending requests.
func (s *Service) kzgVerifierRoutine() {
for {
kzgBatch := make([]*kzgVerifier, 0, 1)
select {
case <-s.ctx.Done():
return
case kzg := <-s.kzgChan:
kzgBatch = append(kzgBatch, kzg)
}
for {
select {
case <-s.ctx.Done():
return
case kzg := <-s.kzgChan:
kzgBatch = append(kzgBatch, kzg)
continue
default:
verifyKzgBatch(kzgBatch)
}
break
}
}
}
func (s *Service) validateWithBatchVerifier(ctx context.Context, message string, set *bls.SignatureBatch) (pubsub.ValidationResult, error) {
_, span := trace.StartSpan(ctx, "sync.validateWithBatchVerifier")
defer span.End()
@@ -125,3 +154,71 @@ func performBatchAggregation(aggSet *bls.SignatureBatch) (*bls.SignatureBatch, e
}
return aggSet, nil
}
func (s *Service) validateWithKzgBatchVerifier(ctx context.Context, dataColumns []blocks.RODataColumn) (pubsub.ValidationResult, error) {
_, span := trace.StartSpan(ctx, "sync.validateWithKzgBatchVerifier")
defer span.End()
timeout := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
resChan := make(chan error, 1)
verificationSet := &kzgVerifier{dataColumns: dataColumns, resChan: resChan}
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
select {
case s.kzgChan <- verificationSet:
case <-ctx.Done():
return pubsub.ValidationIgnore, ctx.Err()
}
select {
case <-ctx.Done():
return pubsub.ValidationIgnore, ctx.Err() // parent context canceled, give up
case err := <-resChan:
if err != nil {
log.WithError(err).Trace("Could not perform batch verification")
tracing.AnnotateError(span, err)
return s.validateUnbatchedColumnsKzg(ctx, dataColumns)
}
}
return pubsub.ValidationAccept, nil
}
func (s *Service) validateUnbatchedColumnsKzg(ctx context.Context, columns []blocks.RODataColumn) (pubsub.ValidationResult, error) {
_, span := trace.StartSpan(ctx, "sync.validateUnbatchedColumnsKzg")
defer span.End()
start := time.Now()
if err := peerdas.VerifyDataColumnsSidecarKZGProofs(columns); err != nil {
err = errors.Wrap(err, "could not verify")
tracing.AnnotateError(span, err)
return pubsub.ValidationReject, err
}
verification.DataColumnBatchKZGVerificationHistogram.WithLabelValues("fallback").Observe(float64(time.Since(start).Milliseconds()))
return pubsub.ValidationAccept, nil
}
func verifyKzgBatch(kzgBatch []*kzgVerifier) {
if len(kzgBatch) == 0 {
return
}
allDataColumns := make([]blocks.RODataColumn, 0, len(kzgBatch))
for _, kzgVerifier := range kzgBatch {
allDataColumns = append(allDataColumns, kzgVerifier.dataColumns...)
}
var verificationErr error
start := time.Now()
err := peerdas.VerifyDataColumnsSidecarKZGProofs(allDataColumns)
if err != nil {
verificationErr = errors.Wrap(err, "batch KZG verification failed")
} else {
verification.DataColumnBatchKZGVerificationHistogram.WithLabelValues("batch").Observe(float64(time.Since(start).Milliseconds()))
}
// Send the same result to all verifiers in the batch
for _, verifier := range kzgBatch {
verifier.resChan <- verificationErr
}
}

View File

@@ -1,14 +1,339 @@
package sync
import (
"context"
"sync"
"testing"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
"github.com/OffchainLabs/prysm/v7/testing/util"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
func TestValidateWithKzgBatchVerifier(t *testing.T) {
err := kzg.Start()
require.NoError(t, err)
tests := []struct {
name string
dataColumns []blocks.RODataColumn
expectedResult pubsub.ValidationResult
expectError bool
}{
{
name: "single valid data column",
dataColumns: createValidTestDataColumns(t, 1),
expectedResult: pubsub.ValidationAccept,
expectError: false,
},
{
name: "multiple valid data columns",
dataColumns: createValidTestDataColumns(t, 3),
expectedResult: pubsub.ValidationAccept,
expectError: false,
},
{
name: "single invalid data column",
dataColumns: createInvalidTestDataColumns(t, 1),
expectedResult: pubsub.ValidationReject,
expectError: true,
},
{
name: "empty data column slice",
dataColumns: []blocks.RODataColumn{},
expectedResult: pubsub.ValidationAccept,
expectError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := t.Context()
service := &Service{
ctx: ctx,
kzgChan: make(chan *kzgVerifier, 100),
}
go service.kzgVerifierRoutine()
result, err := service.validateWithKzgBatchVerifier(ctx, tt.dataColumns)
require.Equal(t, tt.expectedResult, result)
if tt.expectError {
assert.NotNil(t, err)
} else {
assert.NoError(t, err)
}
})
}
}
func TestVerifierRoutine(t *testing.T) {
err := kzg.Start()
require.NoError(t, err)
t.Run("processes single request", func(t *testing.T) {
ctx := t.Context()
service := &Service{
ctx: ctx,
kzgChan: make(chan *kzgVerifier, 100),
}
go service.kzgVerifierRoutine()
dataColumns := createValidTestDataColumns(t, 1)
resChan := make(chan error, 1)
service.kzgChan <- &kzgVerifier{dataColumns: dataColumns, resChan: resChan}
select {
case err := <-resChan:
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("timeout waiting for verification result")
}
})
t.Run("batches multiple requests", func(t *testing.T) {
ctx := t.Context()
service := &Service{
ctx: ctx,
kzgChan: make(chan *kzgVerifier, 100),
}
go service.kzgVerifierRoutine()
const numRequests = 5
resChans := make([]chan error, numRequests)
for i := range numRequests {
dataColumns := createValidTestDataColumns(t, 1)
resChan := make(chan error, 1)
resChans[i] = resChan
service.kzgChan <- &kzgVerifier{dataColumns: dataColumns, resChan: resChan}
}
for i := range numRequests {
select {
case err := <-resChans[i]:
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatalf("timeout waiting for verification result %d", i)
}
}
})
t.Run("context cancellation stops routine", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
service := &Service{
ctx: ctx,
kzgChan: make(chan *kzgVerifier, 100),
}
routineDone := make(chan struct{})
go func() {
service.kzgVerifierRoutine()
close(routineDone)
}()
cancel()
select {
case <-routineDone:
case <-time.After(time.Second):
t.Fatal("timeout waiting for routine to exit")
}
})
}
func TestVerifyKzgBatch(t *testing.T) {
err := kzg.Start()
require.NoError(t, err)
t.Run("all valid data columns succeed", func(t *testing.T) {
dataColumns := createValidTestDataColumns(t, 3)
resChan := make(chan error, 1)
kzgVerifiers := []*kzgVerifier{{dataColumns: dataColumns, resChan: resChan}}
verifyKzgBatch(kzgVerifiers)
select {
case err := <-resChan:
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("timeout waiting for batch verification")
}
})
t.Run("invalid proofs fail entire batch", func(t *testing.T) {
validColumns := createValidTestDataColumns(t, 1)
invalidColumns := createInvalidTestDataColumns(t, 1)
allColumns := append(validColumns, invalidColumns...)
resChan := make(chan error, 1)
kzgVerifiers := []*kzgVerifier{{dataColumns: allColumns, resChan: resChan}}
verifyKzgBatch(kzgVerifiers)
select {
case err := <-resChan:
assert.NotNil(t, err)
case <-time.After(time.Second):
t.Fatal("timeout waiting for batch verification")
}
})
t.Run("empty batch handling", func(t *testing.T) {
verifyKzgBatch([]*kzgVerifier{})
})
}
func TestKzgBatchVerifierConcurrency(t *testing.T) {
err := kzg.Start()
require.NoError(t, err)
ctx := t.Context()
service := &Service{
ctx: ctx,
kzgChan: make(chan *kzgVerifier, 100),
}
go service.kzgVerifierRoutine()
const numGoroutines = 10
const numRequestsPerGoroutine = 5
var wg sync.WaitGroup
wg.Add(numGoroutines)
// Multiple goroutines sending verification requests simultaneously
for i := range numGoroutines {
go func(goroutineID int) {
defer wg.Done()
for range numRequestsPerGoroutine {
dataColumns := createValidTestDataColumns(t, 1)
result, err := service.validateWithKzgBatchVerifier(ctx, dataColumns)
require.Equal(t, pubsub.ValidationAccept, result)
require.NoError(t, err)
}
}(i)
}
wg.Wait()
}
func TestKzgBatchVerifierFallback(t *testing.T) {
err := kzg.Start()
require.NoError(t, err)
t.Run("fallback handles mixed valid/invalid batch correctly", func(t *testing.T) {
ctx := t.Context()
service := &Service{
ctx: ctx,
kzgChan: make(chan *kzgVerifier, 100),
}
go service.kzgVerifierRoutine()
validColumns := createValidTestDataColumns(t, 1)
invalidColumns := createInvalidTestDataColumns(t, 1)
result, err := service.validateWithKzgBatchVerifier(ctx, validColumns)
require.Equal(t, pubsub.ValidationAccept, result)
require.NoError(t, err)
result, err = service.validateWithKzgBatchVerifier(ctx, invalidColumns)
require.Equal(t, pubsub.ValidationReject, result)
assert.NotNil(t, err)
})
t.Run("empty data columns fallback", func(t *testing.T) {
ctx := t.Context()
service := &Service{
ctx: ctx,
kzgChan: make(chan *kzgVerifier, 100),
}
go service.kzgVerifierRoutine()
result, err := service.validateWithKzgBatchVerifier(ctx, []blocks.RODataColumn{})
require.Equal(t, pubsub.ValidationAccept, result)
require.NoError(t, err)
})
}
func TestValidateWithKzgBatchVerifier_DeadlockOnTimeout(t *testing.T) {
err := kzg.Start()
require.NoError(t, err)
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig().Copy()
cfg.SecondsPerSlot = 0
params.OverrideBeaconConfig(cfg)
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
service := &Service{
ctx: ctx,
kzgChan: make(chan *kzgVerifier),
}
go service.kzgVerifierRoutine()
result, err := service.validateWithKzgBatchVerifier(context.Background(), nil)
require.Equal(t, pubsub.ValidationIgnore, result)
require.ErrorIs(t, err, context.DeadlineExceeded)
done := make(chan struct{})
go func() {
_, _ = service.validateWithKzgBatchVerifier(context.Background(), nil)
close(done)
}()
select {
case <-done:
case <-time.After(500 * time.Millisecond):
t.Fatal("validateWithKzgBatchVerifier blocked")
}
}
func TestValidateWithKzgBatchVerifier_ContextCanceledBeforeSend(t *testing.T) {
cancelledCtx, cancel := context.WithCancel(t.Context())
cancel()
service := &Service{
ctx: context.Background(),
kzgChan: make(chan *kzgVerifier),
}
done := make(chan struct{})
go func() {
result, err := service.validateWithKzgBatchVerifier(cancelledCtx, nil)
require.Equal(t, pubsub.ValidationIgnore, result)
require.ErrorIs(t, err, context.Canceled)
close(done)
}()
select {
case <-done:
case <-time.After(500 * time.Millisecond):
t.Fatal("validateWithKzgBatchVerifier did not return after context cancellation")
}
select {
case <-service.kzgChan:
t.Fatal("verificationSet was sent to kzgChan despite canceled context")
default:
}
}
func createValidTestDataColumns(t *testing.T, count int) []blocks.RODataColumn {
_, roSidecars, _ := util.GenerateTestFuluBlockWithSidecars(t, count)
if len(roSidecars) >= count {

View File

@@ -256,20 +256,6 @@ var (
Help: "Count the number of data column sidecars obtained via the execution layer.",
},
)
commitmentCount = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "commitment_count_in_gossip_processed_blocks",
Help: "The number of blob KZG commitments in the most recently processed block.",
},
)
maxBlobsPerBlock = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "max_blobs_per_block",
Help: "The maximum number of blobs allowed in a block.",
},
)
)
func (s *Service) updateMetrics() {

View File

@@ -168,6 +168,7 @@ type Service struct {
syncContributionBitsOverlapLock sync.RWMutex
syncContributionBitsOverlapCache *lru.Cache
signatureChan chan *signatureVerifier
kzgChan chan *kzgVerifier
clockWaiter startup.ClockWaiter
initialSyncComplete chan struct{}
verifierWaiter *verification.InitializerWaiter
@@ -208,7 +209,10 @@ func NewService(ctx context.Context, opts ...Option) *Service {
}
// Initialize signature channel with configured limit
r.signatureChan = make(chan *signatureVerifier, r.cfg.batchVerifierLimit)
// Initialize KZG channel with fixed buffer size of 100.
// This buffer size is designed to handle burst traffic of data column gossip messages:
// - Data columns arrive less frequently than attestations (default batchVerifierLimit=1000)
r.kzgChan = make(chan *kzgVerifier, 100)
// Correctly remove it from our seen pending block map.
// The eviction method always assumes that the mutex is held.
r.slotToPendingBlocks.OnEvicted(func(s string, i any) {
@@ -261,6 +265,7 @@ func (s *Service) Start() {
s.newColumnsVerifier = newDataColumnsVerifierFromInitializer(v)
go s.verifierRoutine()
go s.kzgVerifierRoutine()
go s.startDiscoveryAndSubscriptions()
go s.processDataColumnLogs()

View File

@@ -81,18 +81,6 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
return errors.Wrap(err, "process pending atts for block")
}
if block.Body() == nil {
log.Errorf("nil block body for root %#x", root)
}
commitments, err := block.Body().BlobKzgCommitments()
if err != nil {
return errors.Wrap(err, "blob Kzg commitments")
}
commitmentCount.Set(float64(len(commitments)))
maxBlobsPerBlock.Set(float64(params.BeaconConfig().MaxBlobsPerBlock(block.Slot())))
return nil
}

View File

@@ -144,9 +144,12 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
}
// [REJECT] The sidecar's column data is valid as verified by `verify_data_column_sidecar_kzg_proofs(sidecar)`.
if err := verifier.SidecarKzgProofVerified(); err != nil {
return pubsub.ValidationReject, err
validationResult, err := s.validateWithKzgBatchVerifier(ctx, roDataColumns)
if validationResult != pubsub.ValidationAccept {
return validationResult, err
}
// Mark KZG verification as satisfied since we did it via batch verifier
verifier.SatisfyRequirement(verification.RequireSidecarKzgProofVerified)
// [IGNORE] The sidecar is the first sidecar for the tuple `(block_header.slot, block_header.proposer_index, sidecar.index)`
// with valid header signature, sidecar inclusion proof, and kzg proof.

View File

@@ -71,7 +71,10 @@ func TestValidateDataColumn(t *testing.T) {
ctx: ctx,
newColumnsVerifier: newDataColumnsVerifier,
seenDataColumnCache: newSlotAwareCache(seenDataColumnSize),
kzgChan: make(chan *kzgVerifier, 100),
}
// Start the KZG verifier routine for batch verification
go service.kzgVerifierRoutine()
// Encode a `beaconBlock` message instead of expected.
buf := new(bytes.Buffer)

View File

@@ -588,12 +588,6 @@ func fcReturnsTargetRoot(root [32]byte) func([32]byte, primitives.Epoch) ([32]by
}
}
func fcReturnsDependentRoot() func([32]byte, primitives.Epoch) ([32]byte, error) {
return func(root [32]byte, epoch primitives.Epoch) ([32]byte, error) {
return root, nil
}
}
type mockSignatureCache struct {
svCalledForSig map[signatureData]bool
svcb func(sig signatureData) (bool, error)

View File

@@ -1,6 +1,7 @@
package verification
import (
"bytes"
"context"
"crypto/sha256"
"fmt"
@@ -18,7 +19,6 @@ import (
"github.com/OffchainLabs/prysm/v7/runtime/logging"
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var (
@@ -293,57 +293,55 @@ func (dv *RODataColumnsVerifier) ValidProposerSignature(ctx context.Context) (er
// The returned state is guaranteed to be at the same epoch as the data column's epoch, and have the same randao mix and active
// validator indices as the data column's parent state advanced to the data column's slot.
func (dv *RODataColumnsVerifier) getVerifyingState(ctx context.Context, dataColumn blocks.RODataColumn) (state.ReadOnlyBeaconState, error) {
dataColumnSlot := dataColumn.Slot()
dataColumnEpoch := slots.ToEpoch(dataColumnSlot)
if dataColumnEpoch == 0 {
return dv.hsp.HeadStateReadOnly(ctx)
}
parentRoot := dataColumn.ParentRoot()
dcDependentRoot, err := dv.fc.DependentRootForEpoch(parentRoot, dataColumnEpoch-1)
if err != nil {
return nil, err
}
headRoot, err := dv.hsp.HeadRoot(ctx)
if err != nil {
return nil, err
}
headDependentRoot, err := dv.fc.DependentRootForEpoch(bytesutil.ToBytes32(headRoot), dataColumnEpoch-1)
if err != nil {
return nil, err
}
if dcDependentRoot == headDependentRoot {
headSlot := dv.hsp.HeadSlot()
headEpoch := slots.ToEpoch(headSlot)
if headEpoch == dataColumnEpoch || headEpoch == dataColumnEpoch-1 {
parentRoot := dataColumn.ParentRoot()
dataColumnSlot := dataColumn.Slot()
dataColumnEpoch := slots.ToEpoch(dataColumnSlot)
headSlot := dv.hsp.HeadSlot()
headEpoch := slots.ToEpoch(headSlot)
// Use head if it's the parent
if bytes.Equal(parentRoot[:], headRoot) {
// If they are in the same epoch, then we can return the head state directly
if dataColumnEpoch == headEpoch {
return dv.hsp.HeadStateReadOnly(ctx)
}
if headEpoch+1 < dataColumnEpoch {
headState, err := dv.hsp.HeadState(ctx)
if err != nil {
return nil, err
}
return transition.ProcessSlotsUsingNextSlotCache(ctx, headState, headRoot, dataColumnSlot)
// Otherwise, we need to process the head state to the data column's slot
headState, err := dv.hsp.HeadState(ctx)
if err != nil {
return nil, err
}
return transition.ProcessSlotsUsingNextSlotCache(ctx, headState, headRoot, dataColumnSlot)
}
// If head and data column are in the same epoch and head is compatible with the parent's depdendent root, then use head
if dataColumnEpoch == headEpoch {
headDependent, err := dv.fc.DependentRootForEpoch(bytesutil.ToBytes32(headRoot), dataColumnEpoch)
if err != nil {
return nil, err
}
parentDependent, err := dv.fc.DependentRootForEpoch(parentRoot, dataColumnEpoch)
if err != nil {
return nil, err
}
if bytes.Equal(headDependent[:], parentDependent[:]) {
return dv.hsp.HeadStateReadOnly(ctx)
}
}
logrus.WithFields(logrus.Fields{
"slot": dataColumnSlot,
"parentRoot": fmt.Sprintf("%#x", parentRoot),
"headRoot": fmt.Sprintf("%#x", headRoot),
}).Debug("Replying state for data column verification")
targetRoot, err := dv.fc.TargetRootForEpoch(parentRoot, dataColumnEpoch)
// Otherwise retrieve the parent state and advance it to the data column's slot
parentState, err := dv.sr.StateByRoot(ctx, parentRoot)
if err != nil {
return nil, err
}
targetState, err := dv.sr.StateByRoot(ctx, targetRoot)
if err != nil {
return nil, err
parentEpoch := slots.ToEpoch(parentState.Slot())
if dataColumnEpoch == parentEpoch {
return parentState, nil
}
targetEpoch := slots.ToEpoch(targetState.Slot())
if targetEpoch == dataColumnEpoch || targetEpoch == dataColumnEpoch-1 {
return targetState, nil
}
return transition.ProcessSlotsUsingNextSlotCache(ctx, targetState, parentRoot[:], dataColumnSlot)
return transition.ProcessSlotsUsingNextSlotCache(ctx, parentState, parentRoot[:], dataColumnSlot)
}
func (dv *RODataColumnsVerifier) SidecarParentSeen(parentSeen func([fieldparams.RootLength]byte) bool) (err error) {

View File

@@ -1,7 +1,6 @@
package verification
import (
"context"
"reflect"
"testing"
"time"
@@ -10,7 +9,6 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
forkchoicetypes "github.com/OffchainLabs/prysm/v7/beacon-chain/forkchoice/types"
"github.com/OffchainLabs/prysm/v7/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
@@ -283,7 +281,7 @@ func TestColumnSlotAboveFinalized(t *testing.T) {
func TestValidProposerSignature(t *testing.T) {
const (
columnSlot = 97
columnSlot = 0
blobCount = 1
)
@@ -296,83 +294,59 @@ func TestValidProposerSignature(t *testing.T) {
// The signature data does not depend on the data column itself, so we can use the first one.
expectedSignatureData := columnToSignatureData(firstColumn)
// Create a proper Fulu state for verification.
// We need enough validators to cover the proposer index.
numValidators := max(uint64(firstColumn.ProposerIndex()+1), 64)
fuluState, _ := util.DeterministicGenesisStateFulu(t, numValidators)
// Head state provider that returns the fuluState via HeadStateReadOnly path.
headStateWithState := &mockHeadStateProvider{
headRoot: parentRoot[:],
headSlot: columnSlot,
headStateReadOnly: fuluState,
}
// Head state provider that will fail (headStateReadOnly is nil).
headStateNotFound := &mockHeadStateProvider{
headRoot: parentRoot[:],
headSlot: columnSlot,
}
testCases := []struct {
isError bool
vscbShouldError bool
svcbReturn bool
stateByRooter StateByRooter
headStateProvider *mockHeadStateProvider
vscbError error
svcbError error
name string
isError bool
vscbShouldError bool
svcbReturn bool
stateByRooter StateByRooter
vscbError error
svcbError error
name string
}{
{
name: "cache hit - success",
svcbReturn: true,
svcbError: nil,
vscbShouldError: true,
vscbError: nil,
stateByRooter: &mockStateByRooter{sbr: sbrErrorIfCalled(t)},
headStateProvider: headStateWithState,
isError: false,
name: "cache hit - success",
svcbReturn: true,
svcbError: nil,
vscbShouldError: true,
vscbError: nil,
stateByRooter: &mockStateByRooter{sbr: sbrErrorIfCalled(t)},
isError: false,
},
{
name: "cache hit - error",
svcbReturn: true,
svcbError: errors.New("derp"),
vscbShouldError: true,
vscbError: nil,
stateByRooter: &mockStateByRooter{sbr: sbrErrorIfCalled(t)},
headStateProvider: headStateWithState,
isError: true,
name: "cache hit - error",
svcbReturn: true,
svcbError: errors.New("derp"),
vscbShouldError: true,
vscbError: nil,
stateByRooter: &mockStateByRooter{sbr: sbrErrorIfCalled(t)},
isError: true,
},
{
name: "cache miss - success",
svcbReturn: false,
svcbError: nil,
vscbShouldError: false,
vscbError: nil,
stateByRooter: sbrForValOverrideWithT(t, firstColumn.ProposerIndex(), validator),
headStateProvider: headStateWithState,
isError: false,
name: "cache miss - success",
svcbReturn: false,
svcbError: nil,
vscbShouldError: false,
vscbError: nil,
stateByRooter: sbrForValOverrideWithT(t, firstColumn.ProposerIndex(), validator),
isError: false,
},
{
name: "cache miss - state not found",
svcbReturn: false,
svcbError: nil,
vscbShouldError: false,
vscbError: nil,
stateByRooter: sbrNotFound(t, expectedSignatureData.Parent),
headStateProvider: headStateNotFound,
isError: true,
name: "cache miss - state not found",
svcbReturn: false,
svcbError: nil,
vscbShouldError: false,
vscbError: nil,
stateByRooter: sbrNotFound(t, expectedSignatureData.Parent),
isError: true,
},
{
name: "cache miss - signature failure",
svcbReturn: false,
svcbError: nil,
vscbShouldError: false,
vscbError: errors.New("signature, not so good!"),
stateByRooter: sbrForValOverrideWithT(t, firstColumn.ProposerIndex(), validator),
headStateProvider: headStateWithState,
isError: true,
name: "cache miss - signature failure",
svcbReturn: false,
svcbError: nil,
vscbShouldError: false,
vscbError: errors.New("signature, not so good!"),
stateByRooter: sbrForValOverrideWithT(t, firstColumn.ProposerIndex(), validator),
isError: true,
},
}
@@ -403,10 +377,9 @@ func TestValidProposerSignature(t *testing.T) {
shared: &sharedResources{
sc: signatureCache,
sr: tc.stateByRooter,
hsp: tc.headStateProvider,
hsp: &mockHeadStateProvider{},
fc: &mockForkchoicer{
DependentRootForEpochCB: fcReturnsDependentRoot(),
TargetRootForEpochCB: fcReturnsTargetRoot([fieldparams.RootLength]byte{}),
TargetRootForEpochCB: fcReturnsTargetRoot([fieldparams.RootLength]byte{}),
},
},
}
@@ -432,7 +405,7 @@ func TestValidProposerSignature(t *testing.T) {
func TestDataColumnsSidecarParentSeen(t *testing.T) {
const (
columnSlot = 97
columnSlot = 0
blobCount = 1
)
@@ -536,7 +509,7 @@ func TestDataColumnsSidecarParentValid(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
const (
columnSlot = 97
columnSlot = 0
blobCount = 1
)
@@ -657,7 +630,7 @@ func TestDataColumnsSidecarDescendsFromFinalized(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
const (
columnSlot = 97
columnSlot = 0
blobCount = 1
)
@@ -720,7 +693,7 @@ func TestDataColumnsSidecarInclusionProven(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
const (
columnSlot = 97
columnSlot = 0
blobCount = 1
)
@@ -775,7 +748,7 @@ func TestDataColumnsSidecarKzgProofVerified(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
const (
columnSlot = 97
columnSlot = 0
blobCount = 1
)
@@ -951,135 +924,3 @@ func TestColumnRequirementSatisfaction(t *testing.T) {
require.NoError(t, err)
}
func TestGetVerifyingStateEdgeCases(t *testing.T) {
const (
columnSlot = 97 // epoch 3
blobCount = 1
)
parentRoot := [fieldparams.RootLength]byte{}
columns := GenerateTestDataColumns(t, parentRoot, columnSlot, blobCount)
// Create a proper Fulu state for verification.
numValidators := max(uint64(columns[0].ProposerIndex()+1), 64)
fuluState, _ := util.DeterministicGenesisStateFulu(t, numValidators)
t.Run("different dependent roots - uses StateByRoot path", func(t *testing.T) {
// Parent and head are on different forks with different dependent roots.
// This forces the code to use TargetRootForEpoch -> StateByRoot path.
signatureCache := &mockSignatureCache{
svcb: func(signatureData signatureData) (bool, error) {
return false, nil // Cache miss
},
vscb: func(signatureData signatureData, _ validatorAtIndexer) (err error) {
return nil // Signature valid
},
}
// StateByRoot will be called because dependent roots differ
stateByRootCalled := false
stateByRooter := &mockStateByRooter{
sbr: func(_ context.Context, root [32]byte) (state.BeaconState, error) {
stateByRootCalled = true
return fuluState, nil
},
}
initializer := Initializer{
shared: &sharedResources{
sc: signatureCache,
sr: stateByRooter,
hsp: &mockHeadStateProvider{
headRoot: []byte{0xff}, // Different from parentRoot
headSlot: columnSlot,
},
fc: &mockForkchoicer{
// Return different roots for parent vs head to simulate different forks
DependentRootForEpochCB: func(root [32]byte, epoch primitives.Epoch) ([32]byte, error) {
return root, nil // Returns input, so parent [0...] != head [0xff...]
},
TargetRootForEpochCB: fcReturnsTargetRoot([fieldparams.RootLength]byte{}),
},
},
}
verifier := initializer.NewDataColumnsVerifier(columns, GossipDataColumnSidecarRequirements)
err := verifier.ValidProposerSignature(t.Context())
require.NoError(t, err)
require.Equal(t, true, stateByRootCalled, "StateByRoot should be called when dependent roots differ")
})
t.Run("same dependent root head far ahead - uses head state with ProcessSlots", func(t *testing.T) {
// Parent is ancestor of head on same chain, but head is in epoch 1 while column is in epoch 3.
// headEpoch (1) + 1 < dataColumnEpoch (3), so ProcessSlots is called on head state.
signatureCache := &mockSignatureCache{
svcb: func(signatureData signatureData) (bool, error) {
return false, nil // Cache miss
},
vscb: func(signatureData signatureData, _ validatorAtIndexer) (err error) {
return nil // Signature valid
},
}
headStateCalled := false
initializer := Initializer{
shared: &sharedResources{
sc: signatureCache,
sr: &mockStateByRooter{sbr: sbrErrorIfCalled(t)}, // Should not be called
hsp: &mockHeadStateProvider{
headRoot: parentRoot[:], // Same as parent
headSlot: 32, // Epoch 1
headState: fuluState.Copy(), // HeadState (not ReadOnly) for ProcessSlots
headStateReadOnly: nil, // Should not use ReadOnly path
},
fc: &mockForkchoicer{
// Return same root for both to simulate same chain
DependentRootForEpochCB: func(root [32]byte, epoch primitives.Epoch) ([32]byte, error) {
return [32]byte{0xaa}, nil // Same for all inputs
},
TargetRootForEpochCB: fcReturnsTargetRoot([fieldparams.RootLength]byte{}),
},
},
}
// Wrap to detect HeadState call
originalHsp := initializer.shared.hsp.(*mockHeadStateProvider)
wrappedHsp := &mockHeadStateProvider{
headRoot: originalHsp.headRoot,
headSlot: originalHsp.headSlot,
headState: originalHsp.headState,
}
initializer.shared.hsp = &headStateCallTracker{
mockHeadStateProvider: wrappedHsp,
headStateCalled: &headStateCalled,
}
verifier := initializer.NewDataColumnsVerifier(columns, GossipDataColumnSidecarRequirements)
err := verifier.ValidProposerSignature(t.Context())
require.NoError(t, err)
require.Equal(t, true, headStateCalled, "HeadState should be called when head is far ahead")
})
}
// headStateCallTracker wraps mockHeadStateProvider to track HeadState calls.
type headStateCallTracker struct {
*mockHeadStateProvider
headStateCalled *bool
}
func (h *headStateCallTracker) HeadState(ctx context.Context) (state.BeaconState, error) {
*h.headStateCalled = true
return h.mockHeadStateProvider.HeadState(ctx)
}
func (h *headStateCallTracker) HeadRoot(ctx context.Context) ([]byte, error) {
return h.mockHeadStateProvider.HeadRoot(ctx)
}
func (h *headStateCallTracker) HeadSlot() primitives.Slot {
return h.mockHeadStateProvider.HeadSlot()
}
func (h *headStateCallTracker) HeadStateReadOnly(ctx context.Context) (state.ReadOnlyBeaconState, error) {
return h.mockHeadStateProvider.HeadStateReadOnly(ctx)
}

View File

@@ -1,3 +0,0 @@
### Removed
- Batching of KZG verification for incoming via gossip data column sidecars

View File

@@ -1,2 +0,0 @@
### Added
- `commitment_count_in_gossip_processed_blocks` gauge metric to track the number of blob KZG commitments in processed beacon blocks.

View File

@@ -1,3 +0,0 @@
### Fixed
- fixed broken and old links to actual

View File

@@ -1,2 +0,0 @@
### Changed
- Use dependent root and target root to verify data column proposer index.

View File

@@ -0,0 +1,3 @@
### Changed
- Avoid unnessary heap allocation while calling MixInLength