From a103dd91c0006d544f400305501cd503c607f352 Mon Sep 17 00:00:00 2001 From: Mohamed Zahoor Date: Sun, 6 Mar 2022 13:39:12 +0530 Subject: [PATCH] cherry picked PR for #10274 (#10300) * ignore topic messages (except block topic) during optimistic sync * address review comments * nit pick fix Co-authored-by: Nishant Das --- beacon-chain/sync/validate_aggregate_proof.go | 10 ++++++ .../sync/validate_aggregate_proof_test.go | 32 +++++++++++++++++ .../sync/validate_attester_slashing.go | 10 ++++++ .../sync/validate_attester_slashing_test.go | 31 ++++++++++++++++ .../sync/validate_beacon_attestation.go | 11 ++++++ .../sync/validate_beacon_attestation_test.go | 34 ++++++++++++++++++ .../sync/validate_proposer_slashing.go | 10 ++++++ .../sync/validate_proposer_slashing_test.go | 30 ++++++++++++++++ .../sync/validate_sync_committee_message.go | 11 ++++++ .../validate_sync_committee_message_test.go | 32 +++++++++++++++++ .../sync/validate_sync_contribution_proof.go | 11 ++++++ .../validate_sync_contribution_proof_test.go | 35 ++++++++++++++++++- beacon-chain/sync/validate_voluntary_exit.go | 10 ++++++ .../sync/validate_voluntary_exit_test.go | 32 +++++++++++++++++ 14 files changed, 298 insertions(+), 1 deletion(-) diff --git a/beacon-chain/sync/validate_aggregate_proof.go b/beacon-chain/sync/validate_aggregate_proof.go index 0bc5ceb362..05695bd8b0 100644 --- a/beacon-chain/sync/validate_aggregate_proof.go +++ b/beacon-chain/sync/validate_aggregate_proof.go @@ -41,6 +41,16 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms return pubsub.ValidationIgnore, nil } + // We should not attempt to process this message if the node is running in optimistic mode. + // We just ignore in p2p so that the peer is not penalized. + optimistic, err := s.cfg.chain.IsOptimistic(ctx) + if err != nil { + return pubsub.ValidationReject, err + } + if optimistic { + return pubsub.ValidationIgnore, nil + } + raw, err := s.decodePubsubMessage(msg) if err != nil { tracing.AnnotateError(span, err) diff --git a/beacon-chain/sync/validate_aggregate_proof_test.go b/beacon-chain/sync/validate_aggregate_proof_test.go index 0be661865e..3dd1265bad 100644 --- a/beacon-chain/sync/validate_aggregate_proof_test.go +++ b/beacon-chain/sync/validate_aggregate_proof_test.go @@ -686,3 +686,35 @@ func TestValidateAggregateAndProof_RejectWhenAttEpochDoesntEqualTargetEpoch(t *t assert.NotNil(t, err) assert.Equal(t, pubsub.ValidationReject, res) } + +func TestValidateAggregateAndProof_Optimistic(t *testing.T) { + p := p2ptest.NewTestP2P(t) + ctx := context.Background() + + exit, s := setupValidExit(t) + + r := &Service{ + cfg: &config{ + p2p: p, + chain: &mock.ChainService{ + State: s, + Optimistic: true, + }, + initialSync: &mockSync.Sync{IsSyncing: false}, + }, + } + buf := new(bytes.Buffer) + _, err := p.Encoding().EncodeGossip(buf, exit) + require.NoError(t, err) + topic := p2p.GossipTypeMapping[reflect.TypeOf(exit)] + m := &pubsub.Message{ + Message: &pubsubpb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }, + } + res, err := r.validateAggregateAndProof(ctx, "", m) + assert.NoError(t, err) + valid := res == pubsub.ValidationIgnore + assert.Equal(t, true, valid, "Validation should have ignored the message") +} diff --git a/beacon-chain/sync/validate_attester_slashing.go b/beacon-chain/sync/validate_attester_slashing.go index ab2a91fee7..faae937e42 100644 --- a/beacon-chain/sync/validate_attester_slashing.go +++ b/beacon-chain/sync/validate_attester_slashing.go @@ -26,6 +26,16 @@ func (s *Service) validateAttesterSlashing(ctx context.Context, pid peer.ID, msg return pubsub.ValidationIgnore, nil } + // We should not attempt to process this message if the node is running in optimistic mode. + // We just ignore in p2p so that the peer is not penalized. + optimistic, err := s.cfg.chain.IsOptimistic(ctx) + if err != nil { + return pubsub.ValidationReject, err + } + if optimistic { + return pubsub.ValidationIgnore, nil + } + ctx, span := trace.StartSpan(ctx, "sync.validateAttesterSlashing") defer span.End() diff --git a/beacon-chain/sync/validate_attester_slashing_test.go b/beacon-chain/sync/validate_attester_slashing_test.go index fcb410cca0..449baa6990 100644 --- a/beacon-chain/sync/validate_attester_slashing_test.go +++ b/beacon-chain/sync/validate_attester_slashing_test.go @@ -293,3 +293,34 @@ func TestSeenAttesterSlashingIndices(t *testing.T) { assert.Equal(t, tc.seen, r.hasSeenAttesterSlashingIndices(tc.checkIndices1, tc.checkIndices2)) } } + +func TestValidateAttesterSlashing_Optimistic(t *testing.T) { + p := p2ptest.NewTestP2P(t) + ctx := context.Background() + + slashing, s := setupValidAttesterSlashing(t) + + r := &Service{ + cfg: &config{ + p2p: p, + chain: &mock.ChainService{State: s, Optimistic: true}, + initialSync: &mockSync.Sync{IsSyncing: false}, + }, + } + + buf := new(bytes.Buffer) + _, err := p.Encoding().EncodeGossip(buf, slashing) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)] + msg := &pubsub.Message{ + Message: &pubsubpb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }, + } + res, err := r.validateAttesterSlashing(ctx, "foobar", msg) + assert.NoError(t, err) + valid := res == pubsub.ValidationIgnore + assert.Equal(t, true, valid, "Should have ignore this message") +} diff --git a/beacon-chain/sync/validate_beacon_attestation.go b/beacon-chain/sync/validate_beacon_attestation.go index a33f267230..519ce8da91 100644 --- a/beacon-chain/sync/validate_beacon_attestation.go +++ b/beacon-chain/sync/validate_beacon_attestation.go @@ -40,6 +40,17 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p if s.cfg.initialSync.Syncing() { return pubsub.ValidationIgnore, nil } + + // We should not attempt to process this message if the node is running in optimistic mode. + // We just ignore in p2p so that the peer is not penalized. + optimistic, err := s.cfg.chain.IsOptimistic(ctx) + if err != nil { + return pubsub.ValidationReject, err + } + if optimistic { + return pubsub.ValidationIgnore, nil + } + ctx, span := trace.StartSpan(ctx, "sync.validateCommitteeIndexBeaconAttestation") defer span.End() diff --git a/beacon-chain/sync/validate_beacon_attestation_test.go b/beacon-chain/sync/validate_beacon_attestation_test.go index ad5f097818..aa1e28b925 100644 --- a/beacon-chain/sync/validate_beacon_attestation_test.go +++ b/beacon-chain/sync/validate_beacon_attestation_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "reflect" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/signing" dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/p2p" p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing" lruwrpr "github.com/prysmaticlabs/prysm/cache/lru" @@ -22,6 +24,7 @@ import ( "github.com/prysmaticlabs/prysm/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper" + "github.com/prysmaticlabs/prysm/testing/assert" "github.com/prysmaticlabs/prysm/testing/require" "github.com/prysmaticlabs/prysm/testing/util" ) @@ -298,3 +301,34 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) { }) } } + +func TestServiceValidateCommitteeIndexBeaconAttestation_Optimistic(t *testing.T) { + p := p2ptest.NewTestP2P(t) + ctx := context.Background() + + slashing, s := setupValidAttesterSlashing(t) + + r := &Service{ + cfg: &config{ + p2p: p, + chain: &mockChain.ChainService{State: s, Optimistic: true}, + initialSync: &mockSync.Sync{IsSyncing: false}, + }, + } + + buf := new(bytes.Buffer) + _, err := p.Encoding().EncodeGossip(buf, slashing) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)] + msg := &pubsub.Message{ + Message: &pubsubpb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }, + } + res, err := r.validateCommitteeIndexBeaconAttestation(ctx, "foobar", msg) + assert.NoError(t, err) + valid := res == pubsub.ValidationIgnore + assert.Equal(t, true, valid, "Should have ignore this message") +} diff --git a/beacon-chain/sync/validate_proposer_slashing.go b/beacon-chain/sync/validate_proposer_slashing.go index cadf24602a..c3593bfd78 100644 --- a/beacon-chain/sync/validate_proposer_slashing.go +++ b/beacon-chain/sync/validate_proposer_slashing.go @@ -26,6 +26,16 @@ func (s *Service) validateProposerSlashing(ctx context.Context, pid peer.ID, msg return pubsub.ValidationIgnore, nil } + // We should not attempt to process this message if the node is running in optimistic mode. + // We just ignore in p2p so that the peer is not penalized. + optimistic, err := s.cfg.chain.IsOptimistic(ctx) + if err != nil { + return pubsub.ValidationReject, err + } + if optimistic { + return pubsub.ValidationIgnore, nil + } + ctx, span := trace.StartSpan(ctx, "sync.validateProposerSlashing") defer span.End() diff --git a/beacon-chain/sync/validate_proposer_slashing_test.go b/beacon-chain/sync/validate_proposer_slashing_test.go index 21421d5616..cf9b35816d 100644 --- a/beacon-chain/sync/validate_proposer_slashing_test.go +++ b/beacon-chain/sync/validate_proposer_slashing_test.go @@ -209,3 +209,33 @@ func TestValidateProposerSlashing_Syncing(t *testing.T) { valid := res == pubsub.ValidationAccept assert.Equal(t, false, valid, "Did not fail validation") } + +func TestValidateProposerSlashing_Optimistic(t *testing.T) { + p := p2ptest.NewTestP2P(t) + ctx := context.Background() + + slashing, s := setupValidProposerSlashing(t) + + r := &Service{ + cfg: &config{ + p2p: p, + chain: &mock.ChainService{State: s, Optimistic: true}, + initialSync: &mockSync.Sync{IsSyncing: false}, + }, + } + + buf := new(bytes.Buffer) + _, err := p.Encoding().EncodeGossip(buf, slashing) + require.NoError(t, err) + topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)] + m := &pubsub.Message{ + Message: &pubsubpb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }, + } + res, err := r.validateProposerSlashing(ctx, "", m) + assert.NoError(t, err) + valid := res == pubsub.ValidationIgnore + assert.Equal(t, true, valid, "Did not ignore the message") +} diff --git a/beacon-chain/sync/validate_sync_committee_message.go b/beacon-chain/sync/validate_sync_committee_message.go index aac66bf3f0..96bfe6434a 100644 --- a/beacon-chain/sync/validate_sync_committee_message.go +++ b/beacon-chain/sync/validate_sync_committee_message.go @@ -57,6 +57,17 @@ func (s *Service) validateSyncCommitteeMessage( if s.cfg.initialSync.Syncing() { return pubsub.ValidationIgnore, nil } + + // We should not attempt to process this message if the node is running in optimistic mode. + // We just ignore in p2p so that the peer is not penalized. + optimistic, err := s.cfg.chain.IsOptimistic(ctx) + if err != nil { + return pubsub.ValidationReject, err + } + if optimistic { + return pubsub.ValidationIgnore, nil + } + if msg.Topic == nil { return pubsub.ValidationReject, errInvalidTopic } diff --git a/beacon-chain/sync/validate_sync_committee_message_test.go b/beacon-chain/sync/validate_sync_committee_message_test.go index 1aa9656498..9c215cc427 100644 --- a/beacon-chain/sync/validate_sync_committee_message_test.go +++ b/beacon-chain/sync/validate_sync_committee_message_test.go @@ -1,6 +1,7 @@ package sync import ( + "bytes" "context" "fmt" "reflect" @@ -561,3 +562,34 @@ func Test_ignoreEmptyCommittee(t *testing.T) { }) } } + +func TestValidateSyncCommitteeMessage_Optimistic(t *testing.T) { + p := mockp2p.NewTestP2P(t) + ctx := context.Background() + + slashing, s := setupValidAttesterSlashing(t) + + r := &Service{ + cfg: &config{ + p2p: p, + chain: &mockChain.ChainService{State: s, Optimistic: true}, + initialSync: &mockSync.Sync{IsSyncing: false}, + }, + } + + buf := new(bytes.Buffer) + _, err := p.Encoding().EncodeGossip(buf, slashing) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)] + msg := &pubsub.Message{ + Message: &pubsub_pb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }, + } + res, err := r.validateCommitteeIndexBeaconAttestation(ctx, "foobar", msg) + assert.NoError(t, err) + valid := res == pubsub.ValidationIgnore + assert.Equal(t, true, valid, "Should have ignore this message") +} diff --git a/beacon-chain/sync/validate_sync_contribution_proof.go b/beacon-chain/sync/validate_sync_contribution_proof.go index c553f04c28..5f4b5eebe3 100644 --- a/beacon-chain/sync/validate_sync_contribution_proof.go +++ b/beacon-chain/sync/validate_sync_contribution_proof.go @@ -52,6 +52,17 @@ func (s *Service) validateSyncContributionAndProof(ctx context.Context, pid peer if s.cfg.initialSync.Syncing() { return pubsub.ValidationIgnore, nil } + + // We should not attempt to process this message if the node is running in optimistic mode. + // We just ignore in p2p so that the peer is not penalized. + optimistic, err := s.cfg.chain.IsOptimistic(ctx) + if err != nil { + return pubsub.ValidationReject, err + } + if optimistic { + return pubsub.ValidationIgnore, nil + } + m, err := s.readSyncContributionMessage(msg) if err != nil { tracing.AnnotateError(span, err) diff --git a/beacon-chain/sync/validate_sync_contribution_proof_test.go b/beacon-chain/sync/validate_sync_contribution_proof_test.go index 660f342f8b..a9796cfc94 100644 --- a/beacon-chain/sync/validate_sync_contribution_proof_test.go +++ b/beacon-chain/sync/validate_sync_contribution_proof_test.go @@ -1,8 +1,10 @@ package sync import ( + "bytes" "context" "fmt" + "reflect" "testing" "time" @@ -890,7 +892,7 @@ func TestService_ValidateSyncContributionAndProof(t *testing.T) { } } -func TestService_ValidateSyncContributionAndProof_Broadcast(t *testing.T) { +func TestValidateSyncContributionAndProof(t *testing.T) { ctx := context.Background() db := testingDB.SetupDB(t) headRoot, keys := fillUpBlocksAndState(ctx, t, db) @@ -1026,6 +1028,37 @@ func TestService_ValidateSyncContributionAndProof_Broadcast(t *testing.T) { } } +func TestValidateSyncContributionAndProof_Optimistic(t *testing.T) { + p := mockp2p.NewTestP2P(t) + ctx := context.Background() + + slashing, s := setupValidAttesterSlashing(t) + + r := &Service{ + cfg: &config{ + p2p: p, + chain: &mockChain.ChainService{State: s, Optimistic: true}, + initialSync: &mockSync.Sync{IsSyncing: false}, + }, + } + + buf := new(bytes.Buffer) + _, err := p.Encoding().EncodeGossip(buf, slashing) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)] + msg := &pubsub.Message{ + Message: &pubsub_pb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }, + } + res, err := r.validateCommitteeIndexBeaconAttestation(ctx, "foobar", msg) + assert.NoError(t, err) + valid := res == pubsub.ValidationIgnore + assert.Equal(t, true, valid, "Should have ignore this message") +} + func fillUpBlocksAndState(ctx context.Context, t *testing.T, beaconDB db.Database) ([32]byte, []bls.SecretKey) { gs, keys := util.DeterministicGenesisStateAltair(t, 64) sCom, err := altair.NextSyncCommittee(ctx, gs) diff --git a/beacon-chain/sync/validate_voluntary_exit.go b/beacon-chain/sync/validate_voluntary_exit.go index a55a78cb43..8f3884d0a8 100644 --- a/beacon-chain/sync/validate_voluntary_exit.go +++ b/beacon-chain/sync/validate_voluntary_exit.go @@ -29,6 +29,16 @@ func (s *Service) validateVoluntaryExit(ctx context.Context, pid peer.ID, msg *p return pubsub.ValidationIgnore, nil } + // We should not attempt to process this message if the node is running in optimistic mode. + // We just ignore in p2p so that the peer is not penalized. + optimistic, err := s.cfg.chain.IsOptimistic(ctx) + if err != nil { + return pubsub.ValidationReject, err + } + if optimistic { + return pubsub.ValidationIgnore, nil + } + ctx, span := trace.StartSpan(ctx, "sync.validateVoluntaryExit") defer span.End() diff --git a/beacon-chain/sync/validate_voluntary_exit_test.go b/beacon-chain/sync/validate_voluntary_exit_test.go index 5f3eabc63f..bb6e393686 100644 --- a/beacon-chain/sync/validate_voluntary_exit_test.go +++ b/beacon-chain/sync/validate_voluntary_exit_test.go @@ -195,3 +195,35 @@ func TestValidateVoluntaryExit_ValidExit_Syncing(t *testing.T) { valid := res == pubsub.ValidationAccept assert.Equal(t, false, valid, "Validation should have failed") } + +func TestValidateVoluntaryExit_Optimistic(t *testing.T) { + p := p2ptest.NewTestP2P(t) + ctx := context.Background() + + exit, s := setupValidExit(t) + + r := &Service{ + cfg: &config{ + p2p: p, + chain: &mock.ChainService{ + State: s, + Optimistic: true, + }, + initialSync: &mockSync.Sync{IsSyncing: false}, + }, + } + buf := new(bytes.Buffer) + _, err := p.Encoding().EncodeGossip(buf, exit) + require.NoError(t, err) + topic := p2p.GossipTypeMapping[reflect.TypeOf(exit)] + m := &pubsub.Message{ + Message: &pubsubpb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }, + } + res, err := r.validateVoluntaryExit(ctx, "", m) + assert.NoError(t, err) + valid := res == pubsub.ValidationIgnore + assert.Equal(t, true, valid, "Validation should have ignored the message") +}