mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 21:38:05 -05:00
Compare commits
1 Commits
cache-atte
...
no-batchin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4abfea99ce |
@@ -4,9 +4,6 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/verification"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v7/crypto/bls"
|
||||
"github.com/OffchainLabs/prysm/v7/monitoring/tracing"
|
||||
@@ -56,32 +53,6 @@ func (s *Service) verifierRoutine() {
|
||||
}
|
||||
}
|
||||
|
||||
// A routine that runs in the background to perform batch
|
||||
// KZG verifications by draining the channel and processing all pending requests.
|
||||
func (s *Service) kzgVerifierRoutine() {
|
||||
for {
|
||||
kzgBatch := make([]*kzgVerifier, 0, 1)
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case kzg := <-s.kzgChan:
|
||||
kzgBatch = append(kzgBatch, kzg)
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case kzg := <-s.kzgChan:
|
||||
kzgBatch = append(kzgBatch, kzg)
|
||||
continue
|
||||
default:
|
||||
verifyKzgBatch(kzgBatch)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) validateWithBatchVerifier(ctx context.Context, message string, set *bls.SignatureBatch) (pubsub.ValidationResult, error) {
|
||||
_, span := trace.StartSpan(ctx, "sync.validateWithBatchVerifier")
|
||||
defer span.End()
|
||||
@@ -154,71 +125,3 @@ func performBatchAggregation(aggSet *bls.SignatureBatch) (*bls.SignatureBatch, e
|
||||
}
|
||||
return aggSet, nil
|
||||
}
|
||||
|
||||
func (s *Service) validateWithKzgBatchVerifier(ctx context.Context, dataColumns []blocks.RODataColumn) (pubsub.ValidationResult, error) {
|
||||
_, span := trace.StartSpan(ctx, "sync.validateWithKzgBatchVerifier")
|
||||
defer span.End()
|
||||
|
||||
timeout := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
|
||||
|
||||
resChan := make(chan error, 1)
|
||||
verificationSet := &kzgVerifier{dataColumns: dataColumns, resChan: resChan}
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case s.kzgChan <- verificationSet:
|
||||
case <-ctx.Done():
|
||||
return pubsub.ValidationIgnore, ctx.Err()
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return pubsub.ValidationIgnore, ctx.Err() // parent context canceled, give up
|
||||
case err := <-resChan:
|
||||
if err != nil {
|
||||
log.WithError(err).Trace("Could not perform batch verification")
|
||||
tracing.AnnotateError(span, err)
|
||||
return s.validateUnbatchedColumnsKzg(ctx, dataColumns)
|
||||
}
|
||||
}
|
||||
return pubsub.ValidationAccept, nil
|
||||
}
|
||||
|
||||
func (s *Service) validateUnbatchedColumnsKzg(ctx context.Context, columns []blocks.RODataColumn) (pubsub.ValidationResult, error) {
|
||||
_, span := trace.StartSpan(ctx, "sync.validateUnbatchedColumnsKzg")
|
||||
defer span.End()
|
||||
start := time.Now()
|
||||
if err := peerdas.VerifyDataColumnsSidecarKZGProofs(columns); err != nil {
|
||||
err = errors.Wrap(err, "could not verify")
|
||||
tracing.AnnotateError(span, err)
|
||||
return pubsub.ValidationReject, err
|
||||
}
|
||||
verification.DataColumnBatchKZGVerificationHistogram.WithLabelValues("fallback").Observe(float64(time.Since(start).Milliseconds()))
|
||||
return pubsub.ValidationAccept, nil
|
||||
}
|
||||
|
||||
func verifyKzgBatch(kzgBatch []*kzgVerifier) {
|
||||
if len(kzgBatch) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
allDataColumns := make([]blocks.RODataColumn, 0, len(kzgBatch))
|
||||
for _, kzgVerifier := range kzgBatch {
|
||||
allDataColumns = append(allDataColumns, kzgVerifier.dataColumns...)
|
||||
}
|
||||
|
||||
var verificationErr error
|
||||
start := time.Now()
|
||||
err := peerdas.VerifyDataColumnsSidecarKZGProofs(allDataColumns)
|
||||
if err != nil {
|
||||
verificationErr = errors.Wrap(err, "batch KZG verification failed")
|
||||
} else {
|
||||
verification.DataColumnBatchKZGVerificationHistogram.WithLabelValues("batch").Observe(float64(time.Since(start).Milliseconds()))
|
||||
}
|
||||
|
||||
// Send the same result to all verifiers in the batch
|
||||
for _, verifier := range kzgBatch {
|
||||
verifier.resChan <- verificationErr
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,339 +1,14 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/assert"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/require"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/util"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
)
|
||||
|
||||
func TestValidateWithKzgBatchVerifier(t *testing.T) {
|
||||
err := kzg.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
dataColumns []blocks.RODataColumn
|
||||
expectedResult pubsub.ValidationResult
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
name: "single valid data column",
|
||||
dataColumns: createValidTestDataColumns(t, 1),
|
||||
expectedResult: pubsub.ValidationAccept,
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "multiple valid data columns",
|
||||
dataColumns: createValidTestDataColumns(t, 3),
|
||||
expectedResult: pubsub.ValidationAccept,
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "single invalid data column",
|
||||
dataColumns: createInvalidTestDataColumns(t, 1),
|
||||
expectedResult: pubsub.ValidationReject,
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "empty data column slice",
|
||||
dataColumns: []blocks.RODataColumn{},
|
||||
expectedResult: pubsub.ValidationAccept,
|
||||
expectError: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
|
||||
service := &Service{
|
||||
ctx: ctx,
|
||||
kzgChan: make(chan *kzgVerifier, 100),
|
||||
}
|
||||
go service.kzgVerifierRoutine()
|
||||
|
||||
result, err := service.validateWithKzgBatchVerifier(ctx, tt.dataColumns)
|
||||
|
||||
require.Equal(t, tt.expectedResult, result)
|
||||
if tt.expectError {
|
||||
assert.NotNil(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestVerifierRoutine(t *testing.T) {
|
||||
err := kzg.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("processes single request", func(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
|
||||
service := &Service{
|
||||
ctx: ctx,
|
||||
kzgChan: make(chan *kzgVerifier, 100),
|
||||
}
|
||||
go service.kzgVerifierRoutine()
|
||||
|
||||
dataColumns := createValidTestDataColumns(t, 1)
|
||||
resChan := make(chan error, 1)
|
||||
service.kzgChan <- &kzgVerifier{dataColumns: dataColumns, resChan: resChan}
|
||||
|
||||
select {
|
||||
case err := <-resChan:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout waiting for verification result")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("batches multiple requests", func(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
|
||||
service := &Service{
|
||||
ctx: ctx,
|
||||
kzgChan: make(chan *kzgVerifier, 100),
|
||||
}
|
||||
go service.kzgVerifierRoutine()
|
||||
|
||||
const numRequests = 5
|
||||
resChans := make([]chan error, numRequests)
|
||||
|
||||
for i := range numRequests {
|
||||
dataColumns := createValidTestDataColumns(t, 1)
|
||||
resChan := make(chan error, 1)
|
||||
resChans[i] = resChan
|
||||
service.kzgChan <- &kzgVerifier{dataColumns: dataColumns, resChan: resChan}
|
||||
}
|
||||
|
||||
for i := range numRequests {
|
||||
select {
|
||||
case err := <-resChans[i]:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("timeout waiting for verification result %d", i)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("context cancellation stops routine", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
service := &Service{
|
||||
ctx: ctx,
|
||||
kzgChan: make(chan *kzgVerifier, 100),
|
||||
}
|
||||
|
||||
routineDone := make(chan struct{})
|
||||
go func() {
|
||||
service.kzgVerifierRoutine()
|
||||
close(routineDone)
|
||||
}()
|
||||
|
||||
cancel()
|
||||
|
||||
select {
|
||||
case <-routineDone:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout waiting for routine to exit")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestVerifyKzgBatch(t *testing.T) {
|
||||
err := kzg.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("all valid data columns succeed", func(t *testing.T) {
|
||||
dataColumns := createValidTestDataColumns(t, 3)
|
||||
resChan := make(chan error, 1)
|
||||
kzgVerifiers := []*kzgVerifier{{dataColumns: dataColumns, resChan: resChan}}
|
||||
|
||||
verifyKzgBatch(kzgVerifiers)
|
||||
|
||||
select {
|
||||
case err := <-resChan:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout waiting for batch verification")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("invalid proofs fail entire batch", func(t *testing.T) {
|
||||
validColumns := createValidTestDataColumns(t, 1)
|
||||
invalidColumns := createInvalidTestDataColumns(t, 1)
|
||||
allColumns := append(validColumns, invalidColumns...)
|
||||
|
||||
resChan := make(chan error, 1)
|
||||
kzgVerifiers := []*kzgVerifier{{dataColumns: allColumns, resChan: resChan}}
|
||||
|
||||
verifyKzgBatch(kzgVerifiers)
|
||||
|
||||
select {
|
||||
case err := <-resChan:
|
||||
assert.NotNil(t, err)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout waiting for batch verification")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("empty batch handling", func(t *testing.T) {
|
||||
verifyKzgBatch([]*kzgVerifier{})
|
||||
})
|
||||
}
|
||||
|
||||
func TestKzgBatchVerifierConcurrency(t *testing.T) {
|
||||
err := kzg.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := t.Context()
|
||||
|
||||
service := &Service{
|
||||
ctx: ctx,
|
||||
kzgChan: make(chan *kzgVerifier, 100),
|
||||
}
|
||||
go service.kzgVerifierRoutine()
|
||||
|
||||
const numGoroutines = 10
|
||||
const numRequestsPerGoroutine = 5
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numGoroutines)
|
||||
|
||||
// Multiple goroutines sending verification requests simultaneously
|
||||
for i := range numGoroutines {
|
||||
go func(goroutineID int) {
|
||||
defer wg.Done()
|
||||
|
||||
for range numRequestsPerGoroutine {
|
||||
dataColumns := createValidTestDataColumns(t, 1)
|
||||
result, err := service.validateWithKzgBatchVerifier(ctx, dataColumns)
|
||||
require.Equal(t, pubsub.ValidationAccept, result)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestKzgBatchVerifierFallback(t *testing.T) {
|
||||
err := kzg.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("fallback handles mixed valid/invalid batch correctly", func(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
|
||||
service := &Service{
|
||||
ctx: ctx,
|
||||
kzgChan: make(chan *kzgVerifier, 100),
|
||||
}
|
||||
go service.kzgVerifierRoutine()
|
||||
|
||||
validColumns := createValidTestDataColumns(t, 1)
|
||||
invalidColumns := createInvalidTestDataColumns(t, 1)
|
||||
|
||||
result, err := service.validateWithKzgBatchVerifier(ctx, validColumns)
|
||||
require.Equal(t, pubsub.ValidationAccept, result)
|
||||
require.NoError(t, err)
|
||||
|
||||
result, err = service.validateWithKzgBatchVerifier(ctx, invalidColumns)
|
||||
require.Equal(t, pubsub.ValidationReject, result)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
t.Run("empty data columns fallback", func(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
|
||||
service := &Service{
|
||||
ctx: ctx,
|
||||
kzgChan: make(chan *kzgVerifier, 100),
|
||||
}
|
||||
go service.kzgVerifierRoutine()
|
||||
|
||||
result, err := service.validateWithKzgBatchVerifier(ctx, []blocks.RODataColumn{})
|
||||
require.Equal(t, pubsub.ValidationAccept, result)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestValidateWithKzgBatchVerifier_DeadlockOnTimeout(t *testing.T) {
|
||||
err := kzg.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig().Copy()
|
||||
cfg.SecondsPerSlot = 0
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
ctx, cancel := context.WithCancel(t.Context())
|
||||
defer cancel()
|
||||
|
||||
service := &Service{
|
||||
ctx: ctx,
|
||||
kzgChan: make(chan *kzgVerifier),
|
||||
}
|
||||
go service.kzgVerifierRoutine()
|
||||
|
||||
result, err := service.validateWithKzgBatchVerifier(context.Background(), nil)
|
||||
require.Equal(t, pubsub.ValidationIgnore, result)
|
||||
require.ErrorIs(t, err, context.DeadlineExceeded)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
_, _ = service.validateWithKzgBatchVerifier(context.Background(), nil)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
t.Fatal("validateWithKzgBatchVerifier blocked")
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateWithKzgBatchVerifier_ContextCanceledBeforeSend(t *testing.T) {
|
||||
cancelledCtx, cancel := context.WithCancel(t.Context())
|
||||
cancel()
|
||||
|
||||
service := &Service{
|
||||
ctx: context.Background(),
|
||||
kzgChan: make(chan *kzgVerifier),
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
result, err := service.validateWithKzgBatchVerifier(cancelledCtx, nil)
|
||||
require.Equal(t, pubsub.ValidationIgnore, result)
|
||||
require.ErrorIs(t, err, context.Canceled)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
t.Fatal("validateWithKzgBatchVerifier did not return after context cancellation")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-service.kzgChan:
|
||||
t.Fatal("verificationSet was sent to kzgChan despite canceled context")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func createValidTestDataColumns(t *testing.T, count int) []blocks.RODataColumn {
|
||||
_, roSidecars, _ := util.GenerateTestFuluBlockWithSidecars(t, count)
|
||||
if len(roSidecars) >= count {
|
||||
|
||||
@@ -165,7 +165,6 @@ type Service struct {
|
||||
syncContributionBitsOverlapLock sync.RWMutex
|
||||
syncContributionBitsOverlapCache *lru.Cache
|
||||
signatureChan chan *signatureVerifier
|
||||
kzgChan chan *kzgVerifier
|
||||
clockWaiter startup.ClockWaiter
|
||||
initialSyncComplete chan struct{}
|
||||
verifierWaiter *verification.InitializerWaiter
|
||||
@@ -206,10 +205,7 @@ func NewService(ctx context.Context, opts ...Option) *Service {
|
||||
}
|
||||
// Initialize signature channel with configured limit
|
||||
r.signatureChan = make(chan *signatureVerifier, r.cfg.batchVerifierLimit)
|
||||
// Initialize KZG channel with fixed buffer size of 100.
|
||||
// This buffer size is designed to handle burst traffic of data column gossip messages:
|
||||
// - Data columns arrive less frequently than attestations (default batchVerifierLimit=1000)
|
||||
r.kzgChan = make(chan *kzgVerifier, 100)
|
||||
|
||||
// Correctly remove it from our seen pending block map.
|
||||
// The eviction method always assumes that the mutex is held.
|
||||
r.slotToPendingBlocks.OnEvicted(func(s string, i any) {
|
||||
@@ -262,7 +258,6 @@ func (s *Service) Start() {
|
||||
s.newColumnsVerifier = newDataColumnsVerifierFromInitializer(v)
|
||||
|
||||
go s.verifierRoutine()
|
||||
go s.kzgVerifierRoutine()
|
||||
go s.startDiscoveryAndSubscriptions()
|
||||
go s.processDataColumnLogs()
|
||||
|
||||
|
||||
@@ -144,12 +144,9 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
|
||||
}
|
||||
|
||||
// [REJECT] The sidecar's column data is valid as verified by `verify_data_column_sidecar_kzg_proofs(sidecar)`.
|
||||
validationResult, err := s.validateWithKzgBatchVerifier(ctx, roDataColumns)
|
||||
if validationResult != pubsub.ValidationAccept {
|
||||
return validationResult, err
|
||||
if err := verifier.SidecarKzgProofVerified(); err != nil {
|
||||
return pubsub.ValidationReject, err
|
||||
}
|
||||
// Mark KZG verification as satisfied since we did it via batch verifier
|
||||
verifier.SatisfyRequirement(verification.RequireSidecarKzgProofVerified)
|
||||
|
||||
// [IGNORE] The sidecar is the first sidecar for the tuple `(block_header.slot, block_header.proposer_index, sidecar.index)`
|
||||
// with valid header signature, sidecar inclusion proof, and kzg proof.
|
||||
|
||||
@@ -71,10 +71,7 @@ func TestValidateDataColumn(t *testing.T) {
|
||||
ctx: ctx,
|
||||
newColumnsVerifier: newDataColumnsVerifier,
|
||||
seenDataColumnCache: newSlotAwareCache(seenDataColumnSize),
|
||||
kzgChan: make(chan *kzgVerifier, 100),
|
||||
}
|
||||
// Start the KZG verifier routine for batch verification
|
||||
go service.kzgVerifierRoutine()
|
||||
|
||||
// Encode a `beaconBlock` message instead of expected.
|
||||
buf := new(bytes.Buffer)
|
||||
|
||||
3
changelog/manu-kzg-batch.md
Normal file
3
changelog/manu-kzg-batch.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Removed
|
||||
|
||||
- Batching of KZG verification for incoming via gossip data column sidecars
|
||||
Reference in New Issue
Block a user