Compare commits

...

1 Commits

Author SHA1 Message Date
terence tsao
bd785e7036 Implement KZG proof batch verification for data column gossip validation 2025-08-20 16:14:24 -07:00
10 changed files with 122 additions and 10 deletions

View File

@@ -822,6 +822,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil
regularsync.WithSlasherEnabled(b.slasherEnabled),
regularsync.WithLightClientStore(b.lcStore),
regularsync.WithBatchVerifierLimit(b.cliCtx.Int(flags.BatchVerifierLimit.Name)),
regularsync.WithKzgBatchVerifierLimit(b.cliCtx.Int(flags.KzgBatchVerifierLimit.Name)),
)
return b.services.RegisterService(rs)
}

View File

@@ -4,6 +4,8 @@ import (
"context"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/crypto/bls"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
@@ -11,13 +13,21 @@ import (
"github.com/pkg/errors"
)
const signatureVerificationInterval = 50 * time.Millisecond
const (
signatureVerificationInterval = 50 * time.Millisecond
kzgVerificationInterval = 250 * time.Millisecond
)
type signatureVerifier struct {
set *bls.SignatureBatch
resChan chan error
}
type kzgVerifier struct {
dataColumns []blocks.RODataColumn
resChan chan error
}
// A routine that runs in the background to perform batch
// verifications of incoming messages from gossip.
func (s *Service) verifierRoutine() {
@@ -47,6 +57,34 @@ func (s *Service) verifierRoutine() {
}
}
// A routine that runs in the background to perform batch
// KZG verifications of data column sidecars from gossip.
func (s *Service) kzgVerifierRoutine() {
kzgBatch := make([]*kzgVerifier, 0)
ticker := time.NewTicker(kzgVerificationInterval)
for {
select {
case <-s.ctx.Done():
ticker.Stop()
for i := 0; i < len(kzgBatch); i++ {
kzgBatch[i].resChan <- s.ctx.Err()
}
return
case kzg := <-s.kzgChan:
kzgBatch = append(kzgBatch, kzg)
if len(kzgBatch) >= s.cfg.kzgBatchVerifierLimit {
verifyKzgBatch(kzgBatch)
kzgBatch = []*kzgVerifier{}
}
case <-ticker.C:
if len(kzgBatch) > 0 {
verifyKzgBatch(kzgBatch)
kzgBatch = []*kzgVerifier{}
}
}
}
}
func (s *Service) validateWithBatchVerifier(ctx context.Context, message string, set *bls.SignatureBatch) (pubsub.ValidationResult, error) {
_, span := trace.StartSpan(ctx, "sync.validateWithBatchVerifier")
defer span.End()
@@ -120,3 +158,47 @@ func performBatchAggregation(aggSet *bls.SignatureBatch) (*bls.SignatureBatch, e
}
return aggSet, nil
}
func (s *Service) validateWithKzgBatchVerifier(ctx context.Context, message string, dataColumns []blocks.RODataColumn) (pubsub.ValidationResult, error) {
_, span := trace.StartSpan(ctx, "sync.validateWithKzgBatchVerifier")
defer span.End()
resChan := make(chan error)
verificationSet := &kzgVerifier{dataColumns: dataColumns, resChan: resChan}
s.kzgChan <- verificationSet
resErr := <-resChan
close(resChan)
if resErr != nil {
log.WithError(resErr).Tracef("Could not perform batch verification of %s", message)
err := peerdas.VerifyDataColumnsSidecarKZGProofs(dataColumns)
if err != nil {
verErr := errors.Wrapf(err, "Could not verify %s", message)
tracing.AnnotateError(span, verErr)
return pubsub.ValidationReject, verErr
}
}
return pubsub.ValidationAccept, nil
}
func verifyKzgBatch(kzgBatch []*kzgVerifier) {
if len(kzgBatch) == 0 {
return
}
allDataColumns := make([]blocks.RODataColumn, 0)
for _, kzgVerifier := range kzgBatch {
allDataColumns = append(allDataColumns, kzgVerifier.dataColumns...)
}
var verificationErr error
err := peerdas.VerifyDataColumnsSidecarKZGProofs(allDataColumns)
if err != nil {
verificationErr = errors.Wrap(err, "batch KZG verification failed")
}
// Send the same result to all verifiers in the batch
for i := 0; i < len(kzgBatch); i++ {
kzgBatch[i].resChan <- verificationErr
}
}

View File

@@ -229,3 +229,11 @@ func WithBatchVerifierLimit(limit int) Option {
return nil
}
}
// WithKzgBatchVerifierLimit sets the maximum number of KZG proofs to batch verify at once.
func WithKzgBatchVerifierLimit(limit int) Option {
return func(s *Service) error {
s.cfg.kzgBatchVerifierLimit = limit
return nil
}
}

View File

@@ -106,6 +106,7 @@ type config struct {
blobStorage *filesystem.BlobStorage
dataColumnStorage *filesystem.DataColumnStorage
batchVerifierLimit int
kzgBatchVerifierLimit int
}
// This defines the interface for interacting with block chain service
@@ -164,6 +165,7 @@ type Service struct {
syncContributionBitsOverlapLock sync.RWMutex
syncContributionBitsOverlapCache *lru.Cache
signatureChan chan *signatureVerifier
kzgChan chan *kzgVerifier
clockWaiter startup.ClockWaiter
initialSyncComplete chan struct{}
verifierWaiter *verification.InitializerWaiter
@@ -202,6 +204,8 @@ func NewService(ctx context.Context, opts ...Option) *Service {
}
// Initialize signature channel with configured limit
r.signatureChan = make(chan *signatureVerifier, r.cfg.batchVerifierLimit)
// Initialize KZG channel with configured limit
r.kzgChan = make(chan *kzgVerifier, r.cfg.kzgBatchVerifierLimit)
// Correctly remove it from our seen pending block map.
// The eviction method always assumes that the mutex is held.
r.slotToPendingBlocks.OnEvicted(func(s string, i interface{}) {
@@ -254,6 +258,7 @@ func (s *Service) Start() {
s.newColumnsVerifier = newDataColumnsVerifierFromInitializer(v)
go s.verifierRoutine()
go s.kzgVerifierRoutine()
go s.startTasksPostInitialSync()
go s.processDataColumnLogs()

View File

@@ -145,9 +145,12 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
}
// [REJECT] The sidecar's column data is valid as verified by `verify_data_column_sidecar_kzg_proofs(sidecar)`.
if err := verifier.SidecarKzgProofVerified(); err != nil {
return pubsub.ValidationReject, err
validationResult, err := s.validateWithKzgBatchVerifier(ctx, "data column KZG proof", roDataColumns)
if validationResult != pubsub.ValidationAccept {
return validationResult, err
}
// Mark KZG verification as satisfied since we did it via batch verifier
verifier.SatisfyRequirement(verification.RequireSidecarKzgProofVerified)
// [IGNORE] The sidecar is the first sidecar for the tuple `(block_header.slot, block_header.proposer_index, sidecar.index)`
// with valid header signature, sidecar inclusion proof, and kzg proof.

View File

@@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
@@ -25,6 +26,9 @@ import (
)
func TestValidateDataColumn(t *testing.T) {
err := kzg.Start()
require.NoError(t, err)
ctx := t.Context()
t.Run("from self", func(t *testing.T) {
@@ -63,10 +67,14 @@ func TestValidateDataColumn(t *testing.T) {
clock := startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)
service := &Service{
cfg: &config{p2p: p, initialSync: &mockSync.Sync{}, clock: clock, chain: chainService},
cfg: &config{p2p: p, initialSync: &mockSync.Sync{}, clock: clock, chain: chainService, batchVerifierLimit: 10, kzgBatchVerifierLimit: 10},
ctx: ctx,
newColumnsVerifier: newDataColumnsVerifier,
seenDataColumnCache: newSlotAwareCache(seenDataColumnSize),
kzgChan: make(chan *kzgVerifier, 10),
}
// Start the KZG verifier routine for batch verification
go service.kzgVerifierRoutine()
// Encode a `beaconBlock` message instead of expected.
buf := new(bytes.Buffer)
@@ -177,12 +185,6 @@ func TestValidateDataColumn(t *testing.T) {
expectedResult: pubsub.ValidationReject,
expectedError: genericError,
},
{
name: "sidecar kzg proof verified",
verifier: testNewDataColumnSidecarsVerifier(verification.MockDataColumnsVerifier{ErrSidecarKzgProofVerified: genericError}),
expectedResult: pubsub.ValidationReject,
expectedError: genericError,
},
{
name: "sidecar proposer expected",
verifier: testNewDataColumnSidecarsVerifier(verification.MockDataColumnsVerifier{ErrSidecarProposerExpected: genericError}),

View File

@@ -0,0 +1,3 @@
### Added
- KZG proof batch verification for data column gossip validation

View File

@@ -344,4 +344,10 @@ var (
Usage: "Maximum number of signatures to batch verify at once for beacon attestation p2p gossip.",
Value: 1000,
}
// KzgBatchVerifierLimit sets the maximum number of KZG proofs to batch verify at once.
KzgBatchVerifierLimit = &cli.IntFlag{
Name: "kzg-batch-verifier-limit",
Usage: "Maximum number of KZG proofs to batch verify at once for data column p2p gossip.",
Value: 25,
}
)

View File

@@ -151,6 +151,7 @@ var appFlags = []cli.Flag{
bflags.BackfillWorkerCount,
bflags.BackfillOldestSlot,
flags.BatchVerifierLimit,
flags.KzgBatchVerifierLimit,
}
func init() {

View File

@@ -73,6 +73,7 @@ var appHelpFlagGroups = []flagGroup{
flags.RPCHost,
flags.RPCPort,
flags.BatchVerifierLimit,
flags.KzgBatchVerifierLimit,
},
},
{