Refactor batch verifier for sharing across packages (#13812)

* refactor batch verifier to share with pending queue

* unit test for batch verifier

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
This commit is contained in:
kasey
2024-03-27 07:36:17 -05:00
committed by GitHub
parent 97edffaff5
commit cdd1d819df
9 changed files with 224 additions and 32 deletions

View File

@@ -12,14 +12,12 @@ go_library(
"log.go", "log.go",
"round_robin.go", "round_robin.go",
"service.go", "service.go",
"verification.go",
], ],
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/initial-sync", importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/initial-sync",
visibility = ["//beacon-chain:__subpackages__"], visibility = ["//beacon-chain:__subpackages__"],
deps = [ deps = [
"//async/abool:go_default_library", "//async/abool:go_default_library",
"//beacon-chain/blockchain:go_default_library", "//beacon-chain/blockchain:go_default_library",
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/core/feed/block:go_default_library", "//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/transition:go_default_library", "//beacon-chain/core/transition:go_default_library",
@@ -41,7 +39,6 @@ go_library(
"//consensus-types/primitives:go_default_library", "//consensus-types/primitives:go_default_library",
"//container/leaky-bucket:go_default_library", "//container/leaky-bucket:go_default_library",
"//crypto/rand:go_default_library", "//crypto/rand:go_default_library",
"//encoding/bytesutil:go_default_library",
"//math:go_default_library", "//math:go_default_library",
"//proto/prysm/v1alpha1:go_default_library", "//proto/prysm/v1alpha1:go_default_library",
"//runtime:go_default_library", "//runtime:go_default_library",

View File

@@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das" "github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync" "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
@@ -167,7 +168,7 @@ func (s *Service) processFetchedDataRegSync(
if len(bwb) == 0 { if len(bwb) == 0 {
return return
} }
bv := newBlobBatchVerifier(s.newBlobVerifier) bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv) avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
batchFields := logrus.Fields{ batchFields := logrus.Fields{
"firstSlot": data.bwb[0].Block.Block().Slot(), "firstSlot": data.bwb[0].Block.Block().Slot(),
@@ -326,7 +327,7 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
errParentDoesNotExist, first.Block().ParentRoot(), first.Block().Slot()) errParentDoesNotExist, first.Block().ParentRoot(), first.Block().Slot())
} }
bv := newBlobBatchVerifier(s.newBlobVerifier) bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv) avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
s.logBatchSyncStatus(genesis, first, len(bwb)) s.logBatchSyncStatus(genesis, first, len(bwb))
for _, bb := range bwb { for _, bb := range bwb {

View File

@@ -340,7 +340,7 @@ func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
if len(sidecars) != len(req) { if len(sidecars) != len(req) {
continue continue
} }
bv := newBlobBatchVerifier(s.newBlobVerifier) bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv) avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
current := s.clock.CurrentSlot() current := s.clock.CurrentSlot()
if err := avs.Persist(current, sidecars...); err != nil { if err := avs.Persist(current, sidecars...); err != nil {
@@ -362,3 +362,9 @@ func shufflePeers(pids []peer.ID) {
pids[i], pids[j] = pids[j], pids[i] pids[i], pids[j] = pids[j], pids[i]
}) })
} }
func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.NewBlobVerifier {
return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier {
return ini.NewBlobVerifier(b, reqs)
}
}

View File

@@ -151,14 +151,14 @@ func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.Blo
if len(sidecars) != len(request) { if len(sidecars) != len(request) {
return fmt.Errorf("received %d blob sidecars, expected %d for RPC", len(sidecars), len(request)) return fmt.Errorf("received %d blob sidecars, expected %d for RPC", len(sidecars), len(request))
} }
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.PendingQueueSidecarRequirements)
for _, sidecar := range sidecars { for _, sidecar := range sidecars {
if err := verify.BlobAlignsWithBlock(sidecar, RoBlock); err != nil { if err := verify.BlobAlignsWithBlock(sidecar, RoBlock); err != nil {
return err return err
} }
log.WithFields(blobFields(sidecar)).Debug("Received blob sidecar RPC") log.WithFields(blobFields(sidecar)).Debug("Received blob sidecar RPC")
} }
vscs, err := bv.VerifiedROBlobs(ctx, RoBlock, sidecars)
vscs, err := verification.BlobSidecarSliceNoop(sidecars)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -3,6 +3,7 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"batch.go",
"blob.go", "blob.go",
"cache.go", "cache.go",
"error.go", "error.go",
@@ -45,6 +46,7 @@ go_library(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"batch_test.go",
"blob_test.go", "blob_test.go",
"cache_test.go", "cache_test.go",
"initializer_test.go", "initializer_test.go",
@@ -69,5 +71,6 @@ go_test(
"//testing/util:go_default_library", "//testing/util:go_default_library",
"//time/slots:go_default_library", "//time/slots:go_default_library",
"@com_github_pkg_errors//:go_default_library", "@com_github_pkg_errors//:go_default_library",
"@com_github_stretchr_testify//require:go_default_library",
], ],
) )

View File

@@ -1,12 +1,10 @@
package initialsync package verification
import ( import (
"context" "context"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg" "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
) )
@@ -20,21 +18,17 @@ var (
ErrBatchBlockRootMismatch = errors.New("Sidecar block header root does not match signed block") ErrBatchBlockRootMismatch = errors.New("Sidecar block header root does not match signed block")
) )
func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.NewBlobVerifier { // NewBlobBatchVerifier initializes a blob batch verifier. It requires the caller to correctly specify
return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { // verification Requirements and to also pass in a NewBlobVerifier, which is a callback function that
return ini.NewBlobVerifier(b, reqs) // returns a new BlobVerifier for handling a single blob in the batch.
} func NewBlobBatchVerifier(newVerifier NewBlobVerifier, reqs []Requirement) *BlobBatchVerifier {
}
func newBlobBatchVerifier(newVerifier verification.NewBlobVerifier) *BlobBatchVerifier {
return &BlobBatchVerifier{ return &BlobBatchVerifier{
verifyKzg: kzg.Verify, verifyKzg: kzg.Verify,
newVerifier: newVerifier, newVerifier: newVerifier,
reqs: reqs,
} }
} }
type kzgVerifier func(b ...blocks.ROBlob) error
// BlobBatchVerifier solves problems that come from verifying batches of blobs from RPC. // BlobBatchVerifier solves problems that come from verifying batches of blobs from RPC.
// First: we only update forkchoice after the entire batch has completed, so the n+1 elements in the batch // First: we only update forkchoice after the entire batch has completed, so the n+1 elements in the batch
// won't be in forkchoice yet. // won't be in forkchoice yet.
@@ -42,18 +36,17 @@ type kzgVerifier func(b ...blocks.ROBlob) error
// method to BlobVerifier to verify the kzg commitments of all blob sidecars for a block together, then using the cached // method to BlobVerifier to verify the kzg commitments of all blob sidecars for a block together, then using the cached
// result of the batch verification when verifying the individual blobs. // result of the batch verification when verifying the individual blobs.
type BlobBatchVerifier struct { type BlobBatchVerifier struct {
verifyKzg kzgVerifier verifyKzg roblobCommitmentVerifier
newVerifier verification.NewBlobVerifier newVerifier NewBlobVerifier
reqs []Requirement
} }
var _ das.BlobBatchVerifier = &BlobBatchVerifier{} // VerifiedROBlobs satisfies the das.BlobBatchVerifier interface, used by das.AvailabilityStore.
func (batch *BlobBatchVerifier) VerifiedROBlobs(ctx context.Context, blk blocks.ROBlock, scs []blocks.ROBlob) ([]blocks.VerifiedROBlob, error) { func (batch *BlobBatchVerifier) VerifiedROBlobs(ctx context.Context, blk blocks.ROBlock, scs []blocks.ROBlob) ([]blocks.VerifiedROBlob, error) {
if len(scs) == 0 { if len(scs) == 0 {
return nil, nil return nil, nil
} }
// We assume the proposer was validated wrt the block in batch block processing before performing the DA check. // We assume the proposer is validated wrt the block in batch block processing before performing the DA check.
// So at this stage we just need to make sure the value being signed and signature bytes match the block. // So at this stage we just need to make sure the value being signed and signature bytes match the block.
for i := range scs { for i := range scs {
if blk.Signature() != bytesutil.ToBytes96(scs[i].SignedBlockHeader.Signature) { if blk.Signature() != bytesutil.ToBytes96(scs[i].SignedBlockHeader.Signature) {
@@ -71,7 +64,7 @@ func (batch *BlobBatchVerifier) VerifiedROBlobs(ctx context.Context, blk blocks.
} }
vs := make([]blocks.VerifiedROBlob, len(scs)) vs := make([]blocks.VerifiedROBlob, len(scs))
for i := range scs { for i := range scs {
vb, err := batch.verifyOneBlob(ctx, scs[i]) vb, err := batch.verifyOneBlob(scs[i])
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -80,13 +73,13 @@ func (batch *BlobBatchVerifier) VerifiedROBlobs(ctx context.Context, blk blocks.
return vs, nil return vs, nil
} }
func (batch *BlobBatchVerifier) verifyOneBlob(ctx context.Context, sc blocks.ROBlob) (blocks.VerifiedROBlob, error) { func (batch *BlobBatchVerifier) verifyOneBlob(sc blocks.ROBlob) (blocks.VerifiedROBlob, error) {
vb := blocks.VerifiedROBlob{} vb := blocks.VerifiedROBlob{}
bv := batch.newVerifier(sc, verification.InitsyncSidecarRequirements) bv := batch.newVerifier(sc, batch.reqs)
// We can satisfy the following 2 requirements immediately because VerifiedROBlobs always verifies commitments // We can satisfy the following 2 requirements immediately because VerifiedROBlobs always verifies commitments
// and block signature for all blobs in the batch before calling verifyOneBlob. // and block signature for all blobs in the batch before calling verifyOneBlob.
bv.SatisfyRequirement(verification.RequireSidecarKzgProofVerified) bv.SatisfyRequirement(RequireSidecarKzgProofVerified)
bv.SatisfyRequirement(verification.RequireValidProposerSignature) bv.SatisfyRequirement(RequireValidProposerSignature)
if err := bv.BlobIndexInBounds(); err != nil { if err := bv.BlobIndexInBounds(); err != nil {
return vb, err return vb, err

View File

@@ -0,0 +1,189 @@
package verification
import (
"context"
"testing"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/testing/util"
"github.com/stretchr/testify/require"
)
func TestBatchVerifier(t *testing.T) {
ctx := context.Background()
mockCV := func(err error) roblobCommitmentVerifier {
return func(...blocks.ROBlob) error {
return err
}
}
var invCmtErr = errors.New("mock invalid commitment")
type vbcbt func() (blocks.VerifiedROBlob, error)
vbcb := func(bl blocks.ROBlob, err error) vbcbt {
return func() (blocks.VerifiedROBlob, error) {
return blocks.VerifiedROBlob{ROBlob: bl}, err
}
}
cases := []struct {
name string
nv func() NewBlobVerifier
cv roblobCommitmentVerifier
bandb func(t *testing.T, n int) (blocks.ROBlock, []blocks.ROBlob)
err error
nblobs int
reqs []Requirement
}{
{
name: "no blobs",
bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) {
return util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, nb)
},
nv: func() NewBlobVerifier {
return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier {
return &MockBlobVerifier{cbVerifiedROBlob: vbcb(bl, nil)}
}
},
nblobs: 0,
},
{
name: "happy path",
nv: func() NewBlobVerifier {
return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier {
return &MockBlobVerifier{cbVerifiedROBlob: vbcb(bl, nil)}
}
},
bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) {
return util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, nb)
},
nblobs: 3,
},
{
name: "partial batch",
nv: func() NewBlobVerifier {
return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier {
return &MockBlobVerifier{cbVerifiedROBlob: vbcb(bl, nil)}
}
},
bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) {
// Add extra blobs to the block that we won't return
blk, blbs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, nb+3)
return blk, blbs[0:3]
},
nblobs: 3,
},
{
name: "invalid commitment",
nv: func() NewBlobVerifier {
return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier {
return &MockBlobVerifier{cbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) {
t.Fatal("Batch verifier should stop before this point")
return blocks.VerifiedROBlob{}, nil
}}
}
},
cv: mockCV(invCmtErr),
bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) {
return util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, nb)
},
err: invCmtErr,
nblobs: 1,
},
{
name: "signature mismatch",
nv: func() NewBlobVerifier {
return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier {
return &MockBlobVerifier{cbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) {
t.Fatal("Batch verifier should stop before this point")
return blocks.VerifiedROBlob{}, nil
}}
}
},
bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) {
blk, blbs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, nb)
blbs[0].SignedBlockHeader.Signature = []byte("wrong")
return blk, blbs
},
err: ErrBatchSignatureMismatch,
nblobs: 2,
},
{
name: "root mismatch",
nv: func() NewBlobVerifier {
return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier {
return &MockBlobVerifier{cbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) {
t.Fatal("Batch verifier should stop before this point")
return blocks.VerifiedROBlob{}, nil
}}
}
},
bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) {
blk, blbs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, nb)
wr, err := blocks.NewROBlobWithRoot(blbs[0].BlobSidecar, bytesutil.ToBytes32([]byte("wrong")))
require.NoError(t, err)
blbs[0] = wr
return blk, blbs
},
err: ErrBatchBlockRootMismatch,
nblobs: 1,
},
{
name: "idx oob",
nv: func() NewBlobVerifier {
return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier {
return &MockBlobVerifier{
ErrBlobIndexInBounds: ErrBlobIndexInvalid,
cbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) {
t.Fatal("Batch verifier should stop before this point")
return blocks.VerifiedROBlob{}, nil
}}
}
},
bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) {
return util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, nb)
},
nblobs: 1,
err: ErrBlobIndexInvalid,
},
{
name: "inclusion proof invalid",
nv: func() NewBlobVerifier {
return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier {
return &MockBlobVerifier{
ErrSidecarInclusionProven: ErrSidecarInclusionProofInvalid,
cbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) {
t.Fatal("Batch verifier should stop before this point")
return blocks.VerifiedROBlob{}, nil
}}
}
},
bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) {
return util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, nb)
},
nblobs: 1,
err: ErrSidecarInclusionProofInvalid,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
blk, blbs := c.bandb(t, c.nblobs)
reqs := c.reqs
if reqs == nil {
reqs = InitsyncSidecarRequirements
}
bbv := NewBlobBatchVerifier(c.nv(), reqs)
if c.cv == nil {
bbv.verifyKzg = mockCV(nil)
} else {
bbv.verifyKzg = c.cv
}
vb, err := bbv.VerifiedROBlobs(ctx, blk, blbs)
if c.err != nil {
require.ErrorIs(t, err, c.err)
return
}
require.NoError(t, err)
require.Equal(t, c.nblobs, len(vb))
})
}
}

View File

@@ -70,6 +70,9 @@ var InitsyncSidecarRequirements = requirementList(GossipSidecarRequirements).exc
// BackfillSidecarRequirements is the same as InitsyncSidecarRequirements. // BackfillSidecarRequirements is the same as InitsyncSidecarRequirements.
var BackfillSidecarRequirements = requirementList(InitsyncSidecarRequirements).excluding() var BackfillSidecarRequirements = requirementList(InitsyncSidecarRequirements).excluding()
// PendingQueueSidecarRequirements is the same as InitsyncSidecarRequirements, used by the pending blocks queue.
var PendingQueueSidecarRequirements = requirementList(InitsyncSidecarRequirements).excluding()
var ( var (
ErrBlobInvalid = errors.New("blob failed verification") ErrBlobInvalid = errors.New("blob failed verification")
// ErrBlobIndexInvalid means RequireBlobIndexInBounds failed. // ErrBlobIndexInvalid means RequireBlobIndexInBounds failed.

View File

@@ -39,7 +39,7 @@ func TestResultList(t *testing.T) {
func TestExportedBlobSanityCheck(t *testing.T) { func TestExportedBlobSanityCheck(t *testing.T) {
// make sure all requirement lists contain the bare minimum checks // make sure all requirement lists contain the bare minimum checks
sanity := []Requirement{RequireValidProposerSignature, RequireSidecarKzgProofVerified, RequireBlobIndexInBounds, RequireSidecarInclusionProven} sanity := []Requirement{RequireValidProposerSignature, RequireSidecarKzgProofVerified, RequireBlobIndexInBounds, RequireSidecarInclusionProven}
reqs := [][]Requirement{GossipSidecarRequirements, SpectestSidecarRequirements, InitsyncSidecarRequirements, BackfillSidecarRequirements} reqs := [][]Requirement{GossipSidecarRequirements, SpectestSidecarRequirements, InitsyncSidecarRequirements, BackfillSidecarRequirements, PendingQueueSidecarRequirements}
for i := range reqs { for i := range reqs {
r := reqs[i] r := reqs[i]
reqMap := make(map[Requirement]struct{}) reqMap := make(map[Requirement]struct{})