Implement Batch Verifier For Pubsub Objects (#9344)

* add initial commit

* add aggregates in

* fmt

* add new changes

* gaz

* fix copy

* terence's review

* fix broken build
This commit is contained in:
Nishant Das
2021-09-15 13:49:50 +08:00
committed by GitHub
parent 31a78ab22a
commit 34391fa780
14 changed files with 372 additions and 16 deletions

View File

@@ -121,7 +121,9 @@ func (ds *Server) getPeer(pid peer.ID) (*pbrpc.DebugPeerResponse, error) {
}
pStatus, err := peers.ChainState(pid)
if err != nil {
return nil, status.Errorf(codes.NotFound, "Requested peer does not exist: %v", err)
// In the event chain state is non existent, we
// initialize with the zero value.
pStatus = new(ethpb.Status)
}
lastUpdated, err := peers.ChainStateLastUpdated(pid)
if err != nil {

View File

@@ -3,6 +3,7 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"batch_verifier.go",
"context.go",
"deadlines.go",
"decode_pubsub.go",
@@ -122,6 +123,7 @@ go_test(
name = "go_default_test",
size = "small",
srcs = [
"batch_verifier_test.go",
"context_test.go",
"decode_pubsub_test.go",
"error_test.go",

View File

@@ -0,0 +1,101 @@
package sync
import (
"context"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/monitoring/tracing"
"github.com/prysmaticlabs/prysm/shared/bls"
"go.opencensus.io/trace"
)
const signatureVerificationInterval = 50 * time.Millisecond
const verifierLimit = 50
type signatureVerifier struct {
set *bls.SignatureSet
resChan chan error
}
// A routine that runs in the background to perform batch
// verifications of incoming messages from gossip.
func (s *Service) verifierRoutine() {
verifierBatch := make([]*signatureVerifier, 0)
ticker := time.NewTicker(signatureVerificationInterval)
for {
select {
case <-s.ctx.Done():
// Clean up currently utilised resources.
ticker.Stop()
for i := 0; i < len(verifierBatch); i++ {
verifierBatch[i].resChan <- s.ctx.Err()
}
return
case sig := <-s.signatureChan:
verifierBatch = append(verifierBatch, sig)
if len(verifierBatch) >= verifierLimit {
verifyBatch(verifierBatch)
verifierBatch = []*signatureVerifier{}
}
case <-ticker.C:
if len(verifierBatch) > 0 {
verifyBatch(verifierBatch)
verifierBatch = []*signatureVerifier{}
}
}
}
}
func (s *Service) validateWithBatchVerifier(ctx context.Context, message string, set *bls.SignatureSet) pubsub.ValidationResult {
ctx, span := trace.StartSpan(ctx, "sync.validateWithBatchVerifier")
defer span.End()
resChan := make(chan error)
verificationSet := &signatureVerifier{set: set.Copy(), resChan: resChan}
s.signatureChan <- verificationSet
resErr := <-resChan
close(resChan)
// If verification fails we fallback to individual verification
// of each signature set.
if resErr != nil {
log.WithError(resErr).Tracef("Could not perform batch verification of %s", message)
verified, err := set.Verify()
if err != nil {
log.WithError(err).Debugf("Could not verify %s", message)
tracing.AnnotateError(span, err)
return pubsub.ValidationReject
}
if !verified {
log.Debugf("Verification of %s failed", message)
tracing.AnnotateError(span, err)
return pubsub.ValidationReject
}
}
return pubsub.ValidationAccept
}
func verifyBatch(verifierBatch []*signatureVerifier) {
if verifierBatch == nil || len(verifierBatch) == 0 {
return
}
aggSet := verifierBatch[0].set
verificationErr := error(nil)
for i := 1; i < len(verifierBatch); i++ {
aggSet = aggSet.Join(verifierBatch[i].set)
}
verified, err := aggSet.Verify()
switch {
case err != nil:
verificationErr = err
case !verified:
verificationErr = errors.New("batch signature verification failed")
}
for i := 0; i < len(verifierBatch); i++ {
verifierBatch[i].resChan <- verificationErr
}
}

View File

@@ -0,0 +1,80 @@
package sync
import (
"context"
"testing"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
)
func TestValidateWithBatchVerifier(t *testing.T) {
_, keys, err := testutil.DeterministicDepositsAndKeys(10)
assert.NoError(t, err)
sig := keys[0].Sign(make([]byte, 32))
badSig := keys[1].Sign(make([]byte, 32))
validSet := &bls.SignatureSet{
Messages: [][32]byte{{}},
PublicKeys: []bls.PublicKey{keys[0].PublicKey()},
Signatures: [][]byte{sig.Marshal()},
}
invalidSet := &bls.SignatureSet{
Messages: [][32]byte{{}},
PublicKeys: []bls.PublicKey{keys[0].PublicKey()},
Signatures: [][]byte{badSig.Marshal()},
}
tests := []struct {
name string
message string
set *bls.SignatureSet
preFilledSets []*bls.SignatureSet
want pubsub.ValidationResult
}{
{
name: "empty queue",
message: "random",
set: validSet,
want: pubsub.ValidationAccept,
},
{
name: "invalid set",
message: "random",
set: invalidSet,
want: pubsub.ValidationReject,
},
{
name: "invalid set in routine with valid set",
message: "random",
set: validSet,
preFilledSets: []*bls.SignatureSet{invalidSet},
want: pubsub.ValidationAccept,
},
{
name: "valid set in routine with invalid set",
message: "random",
set: invalidSet,
preFilledSets: []*bls.SignatureSet{validSet},
want: pubsub.ValidationReject,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
svc := &Service{
ctx: ctx,
cancel: cancel,
signatureChan: make(chan *signatureVerifier, verifierLimit),
}
go svc.verifierRoutine()
for _, st := range tt.preFilledSets {
svc.signatureChan <- &signatureVerifier{set: st, resChan: make(chan error, 10)}
}
if got := svc.validateWithBatchVerifier(context.Background(), tt.message, tt.set); got != tt.want {
t.Errorf("validateWithBatchVerifier() = %v, want %v", got, tt.want)
}
cancel()
})
}
}

View File

@@ -127,6 +127,7 @@ type Service struct {
seenSyncContributionCache *lru.Cache
badBlockCache *lru.Cache
badBlockLock sync.RWMutex
signatureChan chan *signatureVerifier
}
// NewService initializes new regular sync service.
@@ -145,9 +146,11 @@ func NewService(ctx context.Context, cfg *Config) *Service {
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof),
subHandler: newSubTopicHandler(),
rateLimiter: rLimiter,
signatureChan: make(chan *signatureVerifier, verifierLimit),
}
go r.registerHandlers()
go r.verifierRoutine()
return r
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/config/features"
"github.com/prysmaticlabs/prysm/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bls"
@@ -179,6 +180,10 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed *ethpb.Signe
}
set := bls.NewSet()
set.Join(selectionSigSet).Join(aggregatorSigSet).Join(attSigSet)
if features.Get().EnableBatchVerification {
return s.validateWithBatchVerifier(ctx, "aggregate", set)
}
valid, err := set.Verify()
if err != nil {
tracing.AnnotateError(span, errors.Errorf("Could not join signature set"))
@@ -188,7 +193,6 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed *ethpb.Signe
tracing.AnnotateError(span, errors.Errorf("Could not verify selection or aggregator or attestation signature"))
return pubsub.ValidationReject
}
return pubsub.ValidationAccept
}

View File

@@ -16,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/config/features"
"github.com/prysmaticlabs/prysm/monitoring/tracing"
eth "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
@@ -187,12 +188,20 @@ func (s *Service) validateUnaggregatedAttWithState(ctx context.Context, a *eth.A
return pubsub.ValidationReject
}
if features.Get().EnableBatchVerification {
set, err := blocks.AttestationSignatureSet(ctx, bs, []*eth.Attestation{a})
if err != nil {
log.WithError(err).Debug("Could not create attestation signature set.")
tracing.AnnotateError(span, err)
return pubsub.ValidationReject
}
return s.validateWithBatchVerifier(ctx, "attestation", set)
}
if err := blocks.VerifyAttestationSignature(ctx, bs, a); err != nil {
log.WithError(err).Debug("Could not verify attestation")
tracing.AnnotateError(span, err)
return pubsub.ValidationReject
}
return pubsub.ValidationAccept
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/config/features"
"github.com/prysmaticlabs/prysm/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bls"
@@ -231,13 +232,6 @@ func (s *Service) rejectInvalidSyncCommitteeSignature(m *ethpb.SyncCommitteeMess
return pubsub.ValidationReject
}
// We reject a malformed signature from bytes according to the p2p specification.
blsSig, err := bls.SignatureFromBytes(m.Signature)
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject
}
// Ignore a malformed public key from bytes according to the p2p specification.
pKey, err := bls.PublicKeyFromBytes(pubKey[:])
if err != nil {
@@ -245,6 +239,25 @@ func (s *Service) rejectInvalidSyncCommitteeSignature(m *ethpb.SyncCommitteeMess
return pubsub.ValidationIgnore
}
// Batch verify message signature before unmarshalling
// the signature to a G2 point if batch verification is
// enabled.
if features.Get().EnableBatchVerification {
set := &bls.SignatureSet{
Messages: [][32]byte{sigRoot},
PublicKeys: []bls.PublicKey{pKey},
Signatures: [][]byte{m.Signature},
}
return s.validateWithBatchVerifier(ctx, "sync committee message", set)
}
// We reject a malformed signature from bytes according to the p2p specification.
blsSig, err := bls.SignatureFromBytes(m.Signature)
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject
}
verified := blsSig.Verify(pKey, sigRoot[:])
if !verified {
return pubsub.ValidationReject

View File

@@ -2,6 +2,7 @@ package sync
import (
"context"
"errors"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@@ -9,6 +10,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/config/features"
"github.com/prysmaticlabs/prysm/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bls"
@@ -201,6 +203,25 @@ func (s *Service) rejectInvalidContributionSignature(m *ethpb.SignedContribution
if err != nil {
return pubsub.ValidationIgnore
}
if features.Get().EnableBatchVerification {
publicKey, err := bls.PublicKeyFromBytes(pubkey[:])
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject
}
root, err := helpers.ComputeSigningRoot(m.Message, d)
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject
}
set := &bls.SignatureSet{
Messages: [][32]byte{root},
PublicKeys: []bls.PublicKey{publicKey},
Signatures: [][]byte{m.Signature},
}
return s.validateWithBatchVerifier(ctx, "sync contribution signature", set)
}
if err := helpers.VerifySigningRoot(m.Message, pubkey[:], m.Signature, d); err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject
@@ -216,6 +237,7 @@ func (s *Service) rejectInvalidSyncAggregateSignature(m *ethpb.SignedContributio
// The aggregate signature is valid for the message `beacon_block_root` and aggregate pubkey
// derived from the participation info in `aggregation_bits` for the subcommittee specified by the `contribution.subcommittee_index`.
activePubkeys := []bls.PublicKey{}
activeRawPubkeys := [][]byte{}
syncPubkeys, err := s.cfg.Chain.HeadSyncCommitteePubKeys(ctx, m.Message.Contribution.Slot, types.CommitteeIndex(m.Message.Contribution.SubcommitteeIndex))
if err != nil {
return pubsub.ValidationIgnore
@@ -234,13 +256,9 @@ func (s *Service) rejectInvalidSyncAggregateSignature(m *ethpb.SignedContributio
return pubsub.ValidationIgnore
}
activePubkeys = append(activePubkeys, pubK)
activeRawPubkeys = append(activeRawPubkeys, pk)
}
}
sig, err := bls.SignatureFromBytes(m.Message.Contribution.Signature)
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject
}
d, err := s.cfg.Chain.HeadSyncCommitteeDomain(ctx, m.Message.Contribution.Slot)
if err != nil {
tracing.AnnotateError(span, err)
@@ -252,6 +270,26 @@ func (s *Service) rejectInvalidSyncAggregateSignature(m *ethpb.SignedContributio
tracing.AnnotateError(span, err)
return pubsub.ValidationIgnore
}
// Aggregate pubkeys separately again to allow
// for signature sets to be created for batch verification.
if features.Get().EnableBatchVerification {
aggKey, err := bls.AggregatePublicKeys(activeRawPubkeys)
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationIgnore
}
set := &bls.SignatureSet{
Messages: [][32]byte{sigRoot},
PublicKeys: []bls.PublicKey{aggKey},
Signatures: [][]byte{m.Message.Contribution.Signature},
}
return s.validateWithBatchVerifier(ctx, "sync contribution aggregate signature", set)
}
sig, err := bls.SignatureFromBytes(m.Message.Contribution.Signature)
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject
}
verified := sig.Eth2FastAggregateVerify(activePubkeys, sigRoot)
if !verified {
return pubsub.ValidationReject
@@ -292,5 +330,25 @@ func (s *Service) verifySyncSelectionData(ctx context.Context, m *ethpb.Contribu
if err != nil {
return err
}
if features.Get().EnableBatchVerification {
publicKey, err := bls.PublicKeyFromBytes(pubkey[:])
if err != nil {
return err
}
root, err := helpers.ComputeSigningRoot(selectionData, domain)
if err != nil {
return err
}
set := &bls.SignatureSet{
Messages: [][32]byte{root},
PublicKeys: []bls.PublicKey{publicKey},
Signatures: [][]byte{m.SelectionProof},
}
valid := s.validateWithBatchVerifier(ctx, "sync contribution selection signature", set)
if valid != pubsub.ValidationAccept {
return errors.New("invalid sync selection proof provided")
}
return nil
}
return helpers.VerifySigningRoot(selectionData, pubkey[:], m.SelectionProof, domain)
}

View File

@@ -51,6 +51,7 @@ type Flags struct {
EnableOptimizedBalanceUpdate bool // EnableOptimizedBalanceUpdate uses an updated method of performing balance updates.
EnableDoppelGanger bool // EnableDoppelGanger enables doppelganger protection on startup for the validator.
EnableHistoricalSpaceRepresentation bool // EnableHistoricalSpaceRepresentation enables the saving of registry validators in separate buckets to save space
EnableBatchVerification bool // EnableBatchVerification enables batch signature verification on gossip messages.
// Logging related toggles.
DisableGRPCConnectionLogs bool // Disables logging when a new grpc client has connected.
@@ -213,6 +214,10 @@ func ConfigureBeaconChain(ctx *cli.Context) {
logDisabled(disableActiveBalanceCache)
cfg.EnableActiveBalanceCache = false
}
if ctx.Bool(enableBatchGossipVerification.Name) {
logEnabled(enableBatchGossipVerification)
cfg.EnableBatchVerification = true
}
Init(cfg)
}

View File

@@ -135,6 +135,10 @@ var (
Name: "disable-active-balance-cache",
Usage: "This disables active balance cache, which improves node performance during block processing",
}
enableBatchGossipVerification = &cli.BoolFlag{
Name: "enable-batch-gossip-verification",
Usage: "This enables batch verification of signatures received over gossip.",
}
)
// devModeFlags holds list of flags that are set when development mode is on.
@@ -142,6 +146,7 @@ var devModeFlags = []cli.Flag{
enableLargerGossipHistory,
enableNextSlotStateCache,
forceOptMaxCoverAggregationStategy,
enableBatchGossipVerification,
}
// ValidatorFlags contains a list of all the feature flags that apply to the validator client.
@@ -193,6 +198,7 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{
disableCorrectlyInsertOrphanedAtts,
disableCorrectlyPruneCanonicalAtts,
disableActiveBalanceCache,
enableBatchGossipVerification,
}...)
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.

View File

@@ -21,7 +21,10 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["bls_test.go"],
srcs = [
"bls_test.go",
"signature_set_test.go",
],
embed = [":go_default_library"],
deps = [
"//shared/bls/common:go_default_library",

View File

@@ -30,3 +30,27 @@ func (s *SignatureSet) Join(set *SignatureSet) *SignatureSet {
func (s *SignatureSet) Verify() (bool, error) {
return VerifyMultipleSignatures(s.Signatures, s.Messages, s.PublicKeys)
}
// Copy the attached signature set and return it
// to the caller.
func (s *SignatureSet) Copy() *SignatureSet {
signatures := make([][]byte, len(s.Signatures))
pubkeys := make([]PublicKey, len(s.PublicKeys))
messages := make([][32]byte, len(s.Messages))
for i := range s.Signatures {
sig := make([]byte, len(s.Signatures[i]))
copy(sig, s.Signatures[i])
signatures[i] = sig
}
for i := range s.PublicKeys {
pubkeys[i] = s.PublicKeys[i].Copy()
}
for i := range s.Messages {
copy(messages[i][:], s.Messages[i][:])
}
return &SignatureSet{
Signatures: signatures,
PublicKeys: pubkeys,
Messages: messages,
}
}

View File

@@ -0,0 +1,46 @@
package bls
import (
"testing"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
)
func TestCopySignatureSet(t *testing.T) {
t.Run("blst", func(t *testing.T) {
key, err := RandKey()
assert.NoError(t, err)
key2, err := RandKey()
assert.NoError(t, err)
key3, err := RandKey()
assert.NoError(t, err)
message := [32]byte{'C', 'D'}
message2 := [32]byte{'E', 'F'}
message3 := [32]byte{'H', 'I'}
sig := key.Sign(message[:])
sig2 := key2.Sign(message2[:])
sig3 := key3.Sign(message3[:])
set := &SignatureSet{
Signatures: [][]byte{sig.Marshal()},
PublicKeys: []PublicKey{key.PublicKey()},
Messages: [][32]byte{message},
}
set2 := &SignatureSet{
Signatures: [][]byte{sig2.Marshal()},
PublicKeys: []PublicKey{key.PublicKey()},
Messages: [][32]byte{message},
}
set3 := &SignatureSet{
Signatures: [][]byte{sig3.Marshal()},
PublicKeys: []PublicKey{key.PublicKey()},
Messages: [][32]byte{message},
}
aggSet := set.Join(set2).Join(set3)
aggSet2 := aggSet.Copy()
assert.DeepEqual(t, aggSet, aggSet2)
})
}