Compare commits

...

1 Commits

Author SHA1 Message Date
Manu NALEPA
87961432c6 registerSubscribers: Split in small functions, standardize name and add comments.
No functional change.
2024-09-03 18:20:38 +02:00
35 changed files with 297 additions and 241 deletions

View File

@@ -402,7 +402,7 @@ func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic
}
func attestationToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {
return fmt.Sprintf(AttestationSubnetTopicFormat, forkDigest, subnet)
return fmt.Sprintf(BeaconAttestationSubnetTopicFormat, forkDigest, subnet)
}
func syncCommitteeToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {

View File

@@ -103,8 +103,8 @@ func TestService_Broadcast_ReturnsErr_TopicNotMapped(t *testing.T) {
}
func TestService_Attestation_Subnet(t *testing.T) {
if gtm := GossipTypeMapping[reflect.TypeOf(&ethpb.Attestation{})]; gtm != AttestationSubnetTopicFormat {
t.Errorf("Constant is out of date. Wanted %s, got %s", AttestationSubnetTopicFormat, gtm)
if gtm := GossipTypeMapping[reflect.TypeOf(&ethpb.Attestation{})]; gtm != BeaconAttestationSubnetTopicFormat {
t.Errorf("Constant is out of date. Wanted %s, got %s", BeaconAttestationSubnetTopicFormat, gtm)
}
tests := []struct {
@@ -170,7 +170,7 @@ func TestService_BroadcastAttestation(t *testing.T) {
msg := util.HydrateAttestation(&ethpb.Attestation{AggregationBits: bitfield.NewBitlist(7)})
subnet := uint64(5)
topic := AttestationSubnetTopicFormat
topic := BeaconAttestationSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(msg)] = topic
digest, err := p.currentForkDigest()
require.NoError(t, err)
@@ -332,7 +332,7 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
go p2.listenForNewNodes()
msg := util.HydrateAttestation(&ethpb.Attestation{AggregationBits: bitfield.NewBitlist(7)})
topic := AttestationSubnetTopicFormat
topic := BeaconAttestationSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(msg)] = topic
digest, err := p.currentForkDigest()
require.NoError(t, err)

View File

@@ -12,24 +12,24 @@ import (
// gossipTopicMappings represent the protocol ID to protobuf message type map for easy
// lookup.
var gossipTopicMappings = map[string]func() proto.Message{
BlockSubnetTopicFormat: func() proto.Message { return &ethpb.SignedBeaconBlock{} },
AttestationSubnetTopicFormat: func() proto.Message { return &ethpb.Attestation{} },
ExitSubnetTopicFormat: func() proto.Message { return &ethpb.SignedVoluntaryExit{} },
ProposerSlashingSubnetTopicFormat: func() proto.Message { return &ethpb.ProposerSlashing{} },
AttesterSlashingSubnetTopicFormat: func() proto.Message { return &ethpb.AttesterSlashing{} },
AggregateAndProofSubnetTopicFormat: func() proto.Message { return &ethpb.SignedAggregateAttestationAndProof{} },
SyncContributionAndProofSubnetTopicFormat: func() proto.Message { return &ethpb.SignedContributionAndProof{} },
SyncCommitteeSubnetTopicFormat: func() proto.Message { return &ethpb.SyncCommitteeMessage{} },
BlsToExecutionChangeSubnetTopicFormat: func() proto.Message { return &ethpb.SignedBLSToExecutionChange{} },
BlobSubnetTopicFormat: func() proto.Message { return &ethpb.BlobSidecar{} },
DataColumnSubnetTopicFormat: func() proto.Message { return &ethpb.DataColumnSidecar{} },
BeaconBlockSubnetTopicFormat: func() proto.Message { return &ethpb.SignedBeaconBlock{} },
BeaconAttestationSubnetTopicFormat: func() proto.Message { return &ethpb.Attestation{} },
VoluntaryExitSubnetTopicFormat: func() proto.Message { return &ethpb.SignedVoluntaryExit{} },
ProposerSlashingSubnetTopicFormat: func() proto.Message { return &ethpb.ProposerSlashing{} },
AttesterSlashingSubnetTopicFormat: func() proto.Message { return &ethpb.AttesterSlashing{} },
BeaconAggregateAndProofSubnetTopicFormat: func() proto.Message { return &ethpb.SignedAggregateAttestationAndProof{} },
SyncCommitteeContributionAndProofSubnetTopicFormat: func() proto.Message { return &ethpb.SignedContributionAndProof{} },
SyncCommitteeSubnetTopicFormat: func() proto.Message { return &ethpb.SyncCommitteeMessage{} },
BlsToExecutionChangeSubnetTopicFormat: func() proto.Message { return &ethpb.SignedBLSToExecutionChange{} },
BlobSubnetTopicFormat: func() proto.Message { return &ethpb.BlobSidecar{} },
DataColumnSubnetTopicFormat: func() proto.Message { return &ethpb.DataColumnSidecar{} },
}
// GossipTopicMappings is a function to return the assigned data type
// versioned by epoch.
func GossipTopicMappings(topic string, epoch primitives.Epoch) proto.Message {
switch topic {
case BlockSubnetTopicFormat:
case BeaconBlockSubnetTopicFormat:
if epoch >= params.BeaconConfig().ElectraForkEpoch {
return &ethpb.SignedBeaconBlockElectra{}
}
@@ -46,7 +46,7 @@ func GossipTopicMappings(topic string, epoch primitives.Epoch) proto.Message {
return &ethpb.SignedBeaconBlockAltair{}
}
return gossipMessage(topic)
case AttestationSubnetTopicFormat:
case BeaconAttestationSubnetTopicFormat:
if epoch >= params.BeaconConfig().ElectraForkEpoch {
return &ethpb.AttestationElectra{}
}
@@ -56,7 +56,7 @@ func GossipTopicMappings(topic string, epoch primitives.Epoch) proto.Message {
return &ethpb.AttesterSlashingElectra{}
}
return gossipMessage(topic)
case AggregateAndProofSubnetTopicFormat:
case BeaconAggregateAndProofSubnetTopicFormat:
if epoch >= params.BeaconConfig().ElectraForkEpoch {
return &ethpb.SignedAggregateAttestationAndProofElectra{}
}
@@ -93,16 +93,16 @@ func init() {
GossipTypeMapping[reflect.TypeOf(v())] = k
}
// Specially handle Altair objects.
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedBeaconBlockAltair{})] = BlockSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedBeaconBlockAltair{})] = BeaconBlockSubnetTopicFormat
// Specially handle Bellatrix objects.
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedBeaconBlockBellatrix{})] = BlockSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedBeaconBlockBellatrix{})] = BeaconBlockSubnetTopicFormat
// Specially handle Capella objects.
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedBeaconBlockCapella{})] = BlockSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedBeaconBlockCapella{})] = BeaconBlockSubnetTopicFormat
// Specially handle Deneb objects.
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedBeaconBlockDeneb{})] = BlockSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedBeaconBlockDeneb{})] = BeaconBlockSubnetTopicFormat
// Specially handle Electra objects.
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedBeaconBlockElectra{})] = BlockSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.AttestationElectra{})] = AttestationSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedBeaconBlockElectra{})] = BeaconBlockSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.AttestationElectra{})] = BeaconAttestationSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.AttesterSlashingElectra{})] = AttesterSlashingSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedAggregateAttestationAndProofElectra{})] = AggregateAndProofSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedAggregateAttestationAndProofElectra{})] = BeaconAggregateAndProofSubnetTopicFormat
}

View File

@@ -44,86 +44,86 @@ func TestGossipTopicMappings_CorrectType(t *testing.T) {
params.OverrideBeaconConfig(bCfg)
// Phase 0
pMessage := GossipTopicMappings(BlockSubnetTopicFormat, 0)
pMessage := GossipTopicMappings(BeaconBlockSubnetTopicFormat, 0)
_, ok := pMessage.(*ethpb.SignedBeaconBlock)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AttestationSubnetTopicFormat, 0)
pMessage = GossipTopicMappings(BeaconAttestationSubnetTopicFormat, 0)
_, ok = pMessage.(*ethpb.Attestation)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AttesterSlashingSubnetTopicFormat, 0)
_, ok = pMessage.(*ethpb.AttesterSlashing)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AggregateAndProofSubnetTopicFormat, 0)
pMessage = GossipTopicMappings(BeaconAggregateAndProofSubnetTopicFormat, 0)
_, ok = pMessage.(*ethpb.SignedAggregateAttestationAndProof)
assert.Equal(t, true, ok)
// Altair Fork
pMessage = GossipTopicMappings(BlockSubnetTopicFormat, altairForkEpoch)
pMessage = GossipTopicMappings(BeaconBlockSubnetTopicFormat, altairForkEpoch)
_, ok = pMessage.(*ethpb.SignedBeaconBlockAltair)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AttestationSubnetTopicFormat, altairForkEpoch)
pMessage = GossipTopicMappings(BeaconAttestationSubnetTopicFormat, altairForkEpoch)
_, ok = pMessage.(*ethpb.Attestation)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AttesterSlashingSubnetTopicFormat, altairForkEpoch)
_, ok = pMessage.(*ethpb.AttesterSlashing)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AggregateAndProofSubnetTopicFormat, altairForkEpoch)
pMessage = GossipTopicMappings(BeaconAggregateAndProofSubnetTopicFormat, altairForkEpoch)
_, ok = pMessage.(*ethpb.SignedAggregateAttestationAndProof)
assert.Equal(t, true, ok)
// Bellatrix Fork
pMessage = GossipTopicMappings(BlockSubnetTopicFormat, bellatrixForkEpoch)
pMessage = GossipTopicMappings(BeaconBlockSubnetTopicFormat, bellatrixForkEpoch)
_, ok = pMessage.(*ethpb.SignedBeaconBlockBellatrix)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AttestationSubnetTopicFormat, bellatrixForkEpoch)
pMessage = GossipTopicMappings(BeaconAttestationSubnetTopicFormat, bellatrixForkEpoch)
_, ok = pMessage.(*ethpb.Attestation)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AttesterSlashingSubnetTopicFormat, bellatrixForkEpoch)
_, ok = pMessage.(*ethpb.AttesterSlashing)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AggregateAndProofSubnetTopicFormat, bellatrixForkEpoch)
pMessage = GossipTopicMappings(BeaconAggregateAndProofSubnetTopicFormat, bellatrixForkEpoch)
_, ok = pMessage.(*ethpb.SignedAggregateAttestationAndProof)
assert.Equal(t, true, ok)
// Capella Fork
pMessage = GossipTopicMappings(BlockSubnetTopicFormat, capellaForkEpoch)
pMessage = GossipTopicMappings(BeaconBlockSubnetTopicFormat, capellaForkEpoch)
_, ok = pMessage.(*ethpb.SignedBeaconBlockCapella)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AttestationSubnetTopicFormat, capellaForkEpoch)
pMessage = GossipTopicMappings(BeaconAttestationSubnetTopicFormat, capellaForkEpoch)
_, ok = pMessage.(*ethpb.Attestation)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AttesterSlashingSubnetTopicFormat, capellaForkEpoch)
_, ok = pMessage.(*ethpb.AttesterSlashing)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AggregateAndProofSubnetTopicFormat, capellaForkEpoch)
pMessage = GossipTopicMappings(BeaconAggregateAndProofSubnetTopicFormat, capellaForkEpoch)
_, ok = pMessage.(*ethpb.SignedAggregateAttestationAndProof)
assert.Equal(t, true, ok)
// Deneb Fork
pMessage = GossipTopicMappings(BlockSubnetTopicFormat, denebForkEpoch)
pMessage = GossipTopicMappings(BeaconBlockSubnetTopicFormat, denebForkEpoch)
_, ok = pMessage.(*ethpb.SignedBeaconBlockDeneb)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AttestationSubnetTopicFormat, denebForkEpoch)
pMessage = GossipTopicMappings(BeaconAttestationSubnetTopicFormat, denebForkEpoch)
_, ok = pMessage.(*ethpb.Attestation)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AttesterSlashingSubnetTopicFormat, denebForkEpoch)
_, ok = pMessage.(*ethpb.AttesterSlashing)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AggregateAndProofSubnetTopicFormat, denebForkEpoch)
pMessage = GossipTopicMappings(BeaconAggregateAndProofSubnetTopicFormat, denebForkEpoch)
_, ok = pMessage.(*ethpb.SignedAggregateAttestationAndProof)
assert.Equal(t, true, ok)
// Electra Fork
pMessage = GossipTopicMappings(BlockSubnetTopicFormat, electraForkEpoch)
pMessage = GossipTopicMappings(BeaconBlockSubnetTopicFormat, electraForkEpoch)
_, ok = pMessage.(*ethpb.SignedBeaconBlockElectra)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AttestationSubnetTopicFormat, electraForkEpoch)
pMessage = GossipTopicMappings(BeaconAttestationSubnetTopicFormat, electraForkEpoch)
_, ok = pMessage.(*ethpb.AttestationElectra)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AttesterSlashingSubnetTopicFormat, electraForkEpoch)
_, ok = pMessage.(*ethpb.AttesterSlashingElectra)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AggregateAndProofSubnetTopicFormat, electraForkEpoch)
pMessage = GossipTopicMappings(BeaconAggregateAndProofSubnetTopicFormat, electraForkEpoch)
_, ok = pMessage.(*ethpb.SignedAggregateAttestationAndProofElectra)
assert.Equal(t, true, ok)
}

View File

@@ -21,7 +21,7 @@ func TestMsgID_HashesCorrectly(t *testing.T) {
genesisValidatorsRoot := bytesutil.PadTo([]byte{'A'}, 32)
d, err := forks.CreateForkDigest(time.Now(), genesisValidatorsRoot)
assert.NoError(t, err)
tpc := fmt.Sprintf(p2p.BlockSubnetTopicFormat, d)
tpc := fmt.Sprintf(p2p.BeaconBlockSubnetTopicFormat, d)
invalidSnappy := [32]byte{'J', 'U', 'N', 'K'}
pMsg := &pubsubpb.Message{Data: invalidSnappy[:], Topic: &tpc}
hashedData := hash.Hash(append(params.BeaconConfig().MessageDomainInvalidSnappy[:], pMsg.Data...))
@@ -41,7 +41,7 @@ func TestMessageIDFunction_HashesCorrectlyAltair(t *testing.T) {
genesisValidatorsRoot := bytesutil.PadTo([]byte{'A'}, 32)
d, err := signing.ComputeForkDigest(params.BeaconConfig().AltairForkVersion, genesisValidatorsRoot)
assert.NoError(t, err)
tpc := fmt.Sprintf(p2p.BlockSubnetTopicFormat, d)
tpc := fmt.Sprintf(p2p.BeaconBlockSubnetTopicFormat, d)
topicLen := uint64(len(tpc))
topicLenBytes := bytesutil.Uint64ToBytesLittleEndian(topicLen)
invalidSnappy := [32]byte{'J', 'U', 'N', 'K'}
@@ -71,7 +71,7 @@ func TestMessageIDFunction_HashesCorrectlyBellatrix(t *testing.T) {
genesisValidatorsRoot := bytesutil.PadTo([]byte{'A'}, 32)
d, err := signing.ComputeForkDigest(params.BeaconConfig().BellatrixForkVersion, genesisValidatorsRoot)
assert.NoError(t, err)
tpc := fmt.Sprintf(p2p.BlockSubnetTopicFormat, d)
tpc := fmt.Sprintf(p2p.BeaconBlockSubnetTopicFormat, d)
topicLen := uint64(len(tpc))
topicLenBytes := bytesutil.Uint64ToBytesLittleEndian(topicLen)
invalidSnappy := [32]byte{'J', 'U', 'N', 'K'}

View File

@@ -35,22 +35,22 @@ func TestService_CanSubscribe(t *testing.T) {
tests := []test{
{
name: "block topic on current fork",
topic: fmt.Sprintf(BlockSubnetTopicFormat, digest) + validProtocolSuffix,
topic: fmt.Sprintf(BeaconBlockSubnetTopicFormat, digest) + validProtocolSuffix,
want: true,
},
{
name: "block topic on unknown fork",
topic: fmt.Sprintf(BlockSubnetTopicFormat, [4]byte{0xFF, 0xEE, 0x56, 0x21}) + validProtocolSuffix,
topic: fmt.Sprintf(BeaconBlockSubnetTopicFormat, [4]byte{0xFF, 0xEE, 0x56, 0x21}) + validProtocolSuffix,
want: false,
},
{
name: "block topic missing protocol suffix",
topic: fmt.Sprintf(BlockSubnetTopicFormat, currentFork),
topic: fmt.Sprintf(BeaconBlockSubnetTopicFormat, currentFork),
want: false,
},
{
name: "block topic wrong protocol suffix",
topic: fmt.Sprintf(BlockSubnetTopicFormat, currentFork) + "/foobar",
topic: fmt.Sprintf(BeaconBlockSubnetTopicFormat, currentFork) + "/foobar",
want: false,
},
{
@@ -75,12 +75,12 @@ func TestService_CanSubscribe(t *testing.T) {
},
{
name: "att subnet topic on current fork",
topic: fmt.Sprintf(AttestationSubnetTopicFormat, digest, 55 /*subnet*/) + validProtocolSuffix,
topic: fmt.Sprintf(BeaconAttestationSubnetTopicFormat, digest, 55 /*subnet*/) + validProtocolSuffix,
want: true,
},
{
name: "att subnet topic on unknown fork",
topic: fmt.Sprintf(AttestationSubnetTopicFormat, [4]byte{0xCC, 0xBB, 0xAA, 0xA1} /*fork digest*/, 54 /*subnet*/) + validProtocolSuffix,
topic: fmt.Sprintf(BeaconAttestationSubnetTopicFormat, [4]byte{0xCC, 0xBB, 0xAA, 0xA1} /*fork digest*/, 54 /*subnet*/) + validProtocolSuffix,
want: false,
},
}
@@ -90,7 +90,7 @@ func TestService_CanSubscribe(t *testing.T) {
formatting := []interface{}{digest}
// Special case for attestation subnets which have a second formatting placeholder.
if topic == AttestationSubnetTopicFormat || topic == SyncCommitteeSubnetTopicFormat || topic == BlobSubnetTopicFormat || topic == DataColumnSubnetTopicFormat {
if topic == BeaconAttestationSubnetTopicFormat || topic == SyncCommitteeSubnetTopicFormat || topic == BlobSubnetTopicFormat || topic == DataColumnSubnetTopicFormat {
formatting = append(formatting, 0 /* some subnet ID */)
}
@@ -252,7 +252,7 @@ func TestService_FilterIncomingSubscriptions(t *testing.T) {
return &b
}(),
Topicid: func() *string {
s := fmt.Sprintf(BlockSubnetTopicFormat, digest) + validProtocolSuffix
s := fmt.Sprintf(BeaconBlockSubnetTopicFormat, digest) + validProtocolSuffix
return &s
}(),
},
@@ -266,7 +266,7 @@ func TestService_FilterIncomingSubscriptions(t *testing.T) {
return &b
}(),
Topicid: func() *string {
s := fmt.Sprintf(BlockSubnetTopicFormat, digest) + validProtocolSuffix
s := fmt.Sprintf(BeaconBlockSubnetTopicFormat, digest) + validProtocolSuffix
return &s
}(),
},
@@ -282,7 +282,7 @@ func TestService_FilterIncomingSubscriptions(t *testing.T) {
return &b
}(),
Topicid: func() *string {
s := fmt.Sprintf(BlockSubnetTopicFormat, digest) + validProtocolSuffix
s := fmt.Sprintf(BeaconBlockSubnetTopicFormat, digest) + validProtocolSuffix
return &s
}(),
},
@@ -292,7 +292,7 @@ func TestService_FilterIncomingSubscriptions(t *testing.T) {
return &b
}(),
Topicid: func() *string {
s := fmt.Sprintf(BlockSubnetTopicFormat, digest) + validProtocolSuffix
s := fmt.Sprintf(BeaconBlockSubnetTopicFormat, digest) + validProtocolSuffix
return &s
}(),
},
@@ -306,7 +306,7 @@ func TestService_FilterIncomingSubscriptions(t *testing.T) {
return &b
}(),
Topicid: func() *string {
s := fmt.Sprintf(BlockSubnetTopicFormat, digest) + validProtocolSuffix
s := fmt.Sprintf(BeaconBlockSubnetTopicFormat, digest) + validProtocolSuffix
return &s
}(),
},

View File

@@ -11,7 +11,7 @@ import (
)
func FuzzMsgID(f *testing.F) {
validTopic := fmt.Sprintf(p2p.BlockSubnetTopicFormat, []byte{0xb5, 0x30, 0x3f, 0x2a}) + "/" + encoder.ProtocolSuffixSSZSnappy
validTopic := fmt.Sprintf(p2p.BeaconBlockSubnetTopicFormat, []byte{0xb5, 0x30, 0x3f, 0x2a}) + "/" + encoder.ProtocolSuffixSSZSnappy
f.Add(validTopic)
f.Fuzz(func(t *testing.T, topic string) {

View File

@@ -40,7 +40,7 @@ func TestService_PublishToTopicConcurrentMapWrite(t *testing.T) {
s.host = p0.BHost
s.pubsub = p0.PubSub()
topic := fmt.Sprintf(BlockSubnetTopicFormat, fd) + "/" + encoder.ProtocolSuffixSSZSnappy
topic := fmt.Sprintf(BeaconBlockSubnetTopicFormat, fd) + "/" + encoder.ProtocolSuffixSSZSnappy
// Establish the remote peer to be subscribed to the outgoing topic.
_, err = p1.SubscribeToTopic(topic)
@@ -95,7 +95,7 @@ func TestExtractGossipDigest(t *testing.T) {
},
{
name: "short digest",
topic: fmt.Sprintf(BlockSubnetTopicFormat, []byte{0xb5, 0x30, 0x3f}) + "/" + encoder.ProtocolSuffixSSZSnappy,
topic: fmt.Sprintf(BeaconBlockSubnetTopicFormat, []byte{0xb5, 0x30, 0x3f}) + "/" + encoder.ProtocolSuffixSSZSnappy,
want: [4]byte{},
wantErr: true,
error: errors.New("invalid digest length wanted"),
@@ -109,7 +109,7 @@ func TestExtractGossipDigest(t *testing.T) {
},
{
name: "valid topic",
topic: fmt.Sprintf(BlockSubnetTopicFormat, []byte{0xb5, 0x30, 0x3f, 0x2a}) + "/" + encoder.ProtocolSuffixSSZSnappy,
topic: fmt.Sprintf(BeaconBlockSubnetTopicFormat, []byte{0xb5, 0x30, 0x3f, 0x2a}) + "/" + encoder.ProtocolSuffixSSZSnappy,
want: [4]byte{0xb5, 0x30, 0x3f, 0x2a},
wantErr: false,
error: nil,
@@ -128,7 +128,7 @@ func TestExtractGossipDigest(t *testing.T) {
}
func BenchmarkExtractGossipDigest(b *testing.B) {
topic := fmt.Sprintf(BlockSubnetTopicFormat, []byte{0xb5, 0x30, 0x3f, 0x2a}) + "/" + encoder.ProtocolSuffixSSZSnappy
topic := fmt.Sprintf(BeaconBlockSubnetTopicFormat, []byte{0xb5, 0x30, 0x3f, 0x2a}) + "/" + encoder.ProtocolSuffixSSZSnappy
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := ExtractGossipDigest(topic)

View File

@@ -313,7 +313,7 @@ func TestService_JoinLeaveTopic(t *testing.T) {
assert.Equal(t, 0, len(s.joinedTopics))
topic := fmt.Sprintf(AttestationSubnetTopicFormat, fd, 42) + "/" + encoder.ProtocolSuffixSSZSnappy
topic := fmt.Sprintf(BeaconAttestationSubnetTopicFormat, fd, 42) + "/" + encoder.ProtocolSuffixSSZSnappy
topicHandle, err := s.JoinTopic(topic)
assert.NoError(t, err)
assert.Equal(t, 1, len(s.joinedTopics))

View File

@@ -113,7 +113,7 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) {
service.dv5Listener.LocalNode().Set(entry)
// Join and subscribe to the subnet, needed by libp2p.
topic, err := service.pubsub.Join(fmt.Sprintf(AttestationSubnetTopicFormat, bootNodeForkDigest, subnet) + "/ssz_snappy")
topic, err := service.pubsub.Join(fmt.Sprintf(BeaconAttestationSubnetTopicFormat, bootNodeForkDigest, subnet) + "/ssz_snappy")
require.NoError(t, err)
_, err = topic.Subscribe()
@@ -155,7 +155,7 @@ func TestStartDiscV5_FindPeersWithSubnet(t *testing.T) {
exists := make([]bool, 0, 3)
for i := 1; i <= 3; i++ {
subnet := uint64(i)
topic := fmt.Sprintf(AttestationSubnetTopicFormat, bootNodeForkDigest, subnet)
topic := fmt.Sprintf(BeaconAttestationSubnetTopicFormat, bootNodeForkDigest, subnet)
exist := false

View File

@@ -35,22 +35,22 @@ const (
// Topic Formats
//
// AttestationSubnetTopicFormat is the topic format for the attestation subnet.
AttestationSubnetTopicFormat = GossipProtocolAndDigest + GossipAttestationMessage + "_%d"
// BeaconAttestationSubnetTopicFormat is the topic format for the attestation subnet.
BeaconAttestationSubnetTopicFormat = GossipProtocolAndDigest + GossipAttestationMessage + "_%d"
// SyncCommitteeSubnetTopicFormat is the topic format for the sync committee subnet.
SyncCommitteeSubnetTopicFormat = GossipProtocolAndDigest + GossipSyncCommitteeMessage + "_%d"
// BlockSubnetTopicFormat is the topic format for the block subnet.
BlockSubnetTopicFormat = GossipProtocolAndDigest + GossipBlockMessage
// ExitSubnetTopicFormat is the topic format for the voluntary exit subnet.
ExitSubnetTopicFormat = GossipProtocolAndDigest + GossipExitMessage
// BeaconBlockSubnetTopicFormat is the topic format for the block subnet.
BeaconBlockSubnetTopicFormat = GossipProtocolAndDigest + GossipBlockMessage
// VoluntaryExitSubnetTopicFormat is the topic format for the voluntary exit subnet.
VoluntaryExitSubnetTopicFormat = GossipProtocolAndDigest + GossipExitMessage
// ProposerSlashingSubnetTopicFormat is the topic format for the proposer slashing subnet.
ProposerSlashingSubnetTopicFormat = GossipProtocolAndDigest + GossipProposerSlashingMessage
// AttesterSlashingSubnetTopicFormat is the topic format for the attester slashing subnet.
AttesterSlashingSubnetTopicFormat = GossipProtocolAndDigest + GossipAttesterSlashingMessage
// AggregateAndProofSubnetTopicFormat is the topic format for the aggregate and proof subnet.
AggregateAndProofSubnetTopicFormat = GossipProtocolAndDigest + GossipAggregateAndProofMessage
// SyncContributionAndProofSubnetTopicFormat is the topic format for the sync aggregate and proof subnet.
SyncContributionAndProofSubnetTopicFormat = GossipProtocolAndDigest + GossipContributionAndProofMessage
// BeaconAggregateAndProofSubnetTopicFormat is the topic format for the aggregate and proof subnet.
BeaconAggregateAndProofSubnetTopicFormat = GossipProtocolAndDigest + GossipAggregateAndProofMessage
// SyncCommitteeContributionAndProofSubnetTopicFormat is the topic format for the sync aggregate and proof subnet.
SyncCommitteeContributionAndProofSubnetTopicFormat = GossipProtocolAndDigest + GossipContributionAndProofMessage
// BlsToExecutionChangeSubnetTopicFormat is the topic format for the bls to execution change subnet.
BlsToExecutionChangeSubnetTopicFormat = GossipProtocolAndDigest + GossipBlsToExecutionChangeMessage
// BlobSubnetTopicFormat is the topic format for the blob subnet.

View File

@@ -41,7 +41,7 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err
topic = p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.Attestation{})]
// Given that both sync message related subnets have the same message name, we have to
// differentiate them below.
case strings.Contains(topic, p2p.GossipSyncCommitteeMessage) && !strings.Contains(topic, p2p.SyncContributionAndProofSubnetTopicFormat):
case strings.Contains(topic, p2p.GossipSyncCommitteeMessage) && !strings.Contains(topic, p2p.SyncCommitteeContributionAndProofSubnetTopicFormat):
topic = p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.SyncCommitteeMessage{})]
case strings.Contains(topic, p2p.GossipBlobSidecarMessage):
topic = p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.BlobSidecar{})]
@@ -83,11 +83,11 @@ func (*Service) replaceForkDigest(topic string) (string, error) {
func extractValidDataTypeFromTopic(topic string, digest []byte, clock *startup.Clock) (ssz.Unmarshaler, error) {
switch topic {
case p2p.BlockSubnetTopicFormat:
case p2p.BeaconBlockSubnetTopicFormat:
return extractDataTypeFromTypeMap(types.BlockMap, digest, clock)
case p2p.AttestationSubnetTopicFormat:
case p2p.BeaconAttestationSubnetTopicFormat:
return extractDataTypeFromTypeMap(types.AttestationMap, digest, clock)
case p2p.AggregateAndProofSubnetTopicFormat:
case p2p.BeaconAggregateAndProofSubnetTopicFormat:
return extractDataTypeFromTypeMap(types.AggregateAttestationMap, digest, clock)
}
return nil, nil

View File

@@ -54,117 +54,173 @@ func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Messag
return pubsub.ValidationAccept, nil
}
// Register PubSub subscribers
func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
// registerSubscribersFromGenesis registers subscribers for subnets needed from genesis.
// It includes: beacon block, beacon aggregate and proof, voluntary exit, proposer slashing, attester slashing and beacon attestation.
func (s *Service) registerSubscribersFromGenesis(digest [4]byte) {
// Beacon block
s.subscribe(
p2p.BlockSubnetTopicFormat,
s.validateBeaconBlockPubSub,
p2p.BeaconBlockSubnetTopicFormat,
s.validateBeaconBlockPubSubMsg,
s.beaconBlockSubscriber,
digest,
)
// Beacon aggregate and proof
s.subscribe(
p2p.AggregateAndProofSubnetTopicFormat,
s.validateAggregateAndProof,
s.beaconAggregateProofSubscriber,
p2p.BeaconAggregateAndProofSubnetTopicFormat,
s.validateBeaconAggregateAndProofPubSubMsg,
s.beaconAggregateAndProofSubscriber,
digest,
)
// Voluntary exit
s.subscribe(
p2p.ExitSubnetTopicFormat,
s.validateVoluntaryExit,
p2p.VoluntaryExitSubnetTopicFormat,
s.validateVoluntaryExitPubSubMsg,
s.voluntaryExitSubscriber,
digest,
)
// Proposer slashing
s.subscribe(
p2p.ProposerSlashingSubnetTopicFormat,
s.validateProposerSlashing,
s.validateProposerSlashingPubSubMsg,
s.proposerSlashingSubscriber,
digest,
)
// Attester slashing
s.subscribe(
p2p.AttesterSlashingSubnetTopicFormat,
s.validateAttesterSlashing,
s.validateAttesterSlashingPubSubMsg,
s.attesterSlashingSubscriber,
digest,
)
// Beacon attestation -- subscribe to all subnets.
if flags.Get().SubscribeToAllSubnets {
s.subscribeStaticWithSubnets(
p2p.AttestationSubnetTopicFormat,
s.validateCommitteeIndexBeaconAttestation, /* validator */
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
p2p.BeaconAttestationSubnetTopicFormat,
s.validateBeaconAttestationPubSubMsg,
s.beaconAttestationSubscriber,
digest,
params.BeaconConfig().AttestationSubnetCount,
)
} else {
s.subscribeDynamicWithSubnets(
p2p.AttestationSubnetTopicFormat,
s.validateCommitteeIndexBeaconAttestation, /* validator */
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
return
}
// Beacon attestation -- subscribe to required subnets.
s.subscribeDynamicWithSubnets(
p2p.BeaconAttestationSubnetTopicFormat,
s.validateBeaconAttestationPubSubMsg,
s.beaconAttestationSubscriber,
digest,
)
}
// registerSubscribersFromAltair registers subscribers for subnets needed from the Altair hard fork.
// It includes: sync committee contribution and proof and sync committee.
func (s *Service) registerSubscribersFromAltair(digest [4]byte) {
// Sync committee contribution and proof
s.subscribe(
p2p.SyncCommitteeContributionAndProofSubnetTopicFormat,
s.validateSyncCommitteeContributionAndProofPubSubMsg,
s.syncCommitteeContributionAndProofSubscriber,
digest,
)
// Sync committee -- subscribe to all subnets.
if flags.Get().SubscribeToAllSubnets {
s.subscribeStaticWithSyncSubnets(
p2p.SyncCommitteeSubnetTopicFormat,
s.validateSyncCommitteeMessage,
s.syncCommitteeMessageSubscriber,
digest,
)
return
}
// Altair Fork Version
// Sync committee -- subscribe to required subnets.
s.subscribeDynamicWithSyncSubnets(
p2p.SyncCommitteeSubnetTopicFormat,
s.validateSyncCommitteeMessage,
s.syncCommitteeMessageSubscriber,
digest,
)
}
// registerSubscribersFromCapella registers subscribers for subnets needed from the altair fork.
// It includes: BLS to execution change
func (s *Service) registerSubscribersFromCapella(digest [4]byte) {
// BLS to execution change
s.subscribe(
p2p.BlsToExecutionChangeSubnetTopicFormat,
s.validateBlsToExecutionChange,
s.blsToExecutionChangeSubscriber,
digest,
)
}
// registerSubscribersFromDeneb registers subscribers for subnets needed from the deneb fork.
// It includes: Blob sidecar and data column sidecar (depending of the peerDAS status).
func (s *Service) registerSubscribersFromDeneb(epoch primitives.Epoch, digest [4]byte) {
peerDasIsActive := coreTime.PeerDASIsActive(slots.UnsafeEpochStart(epoch))
if !peerDasIsActive {
// PeerDAS is not yet active, blob sidecar
s.subscribeStaticWithSubnets(
p2p.BlobSubnetTopicFormat,
s.validateBlob,
s.blobSubscriber,
digest,
params.BeaconConfig().BlobsidecarSubnetCount,
)
return
}
// PeerDAS is active, data columns sidecar -- subscribe to all subnets.
if flags.Get().SubscribeToAllSubnets {
s.subscribeStaticWithSubnets(
p2p.DataColumnSubnetTopicFormat,
s.validateDataColumn,
s.dataColumnSubscriber,
digest,
params.BeaconConfig().DataColumnSidecarSubnetCount,
)
return
}
// PeerDAS is active, data columns sidecar -- subscribe to required subnets.
s.subscribeDynamicWithColumnSubnets(
p2p.DataColumnSubnetTopicFormat,
s.validateDataColumn,
s.dataColumnSubscriber,
digest,
)
}
// registerSubscribers registers subscribers
func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
// From genesis
s.registerSubscribersFromGenesis(digest)
// From Altair hard fork
if epoch >= params.BeaconConfig().AltairForkEpoch {
s.subscribe(
p2p.SyncContributionAndProofSubnetTopicFormat,
s.validateSyncContributionAndProof,
s.syncContributionAndProofSubscriber,
digest,
)
if flags.Get().SubscribeToAllSubnets {
s.subscribeStaticWithSyncSubnets(
p2p.SyncCommitteeSubnetTopicFormat,
s.validateSyncCommitteeMessage, /* validator */
s.syncCommitteeMessageSubscriber, /* message handler */
digest,
)
} else {
s.subscribeDynamicWithSyncSubnets(
p2p.SyncCommitteeSubnetTopicFormat,
s.validateSyncCommitteeMessage, /* validator */
s.syncCommitteeMessageSubscriber, /* message handler */
digest,
)
}
s.registerSubscribersFromAltair(digest)
}
// New Gossip Topic in Capella
// From Capella hard fork
if epoch >= params.BeaconConfig().CapellaForkEpoch {
s.subscribe(
p2p.BlsToExecutionChangeSubnetTopicFormat,
s.validateBlsToExecutionChange,
s.blsToExecutionChangeSubscriber,
digest,
)
s.registerSubscribersFromCapella(digest)
}
// New Gossip Topic in Deneb
// From Debeb hard fork
if epoch >= params.BeaconConfig().DenebForkEpoch {
if coreTime.PeerDASIsActive(slots.UnsafeEpochStart(epoch)) {
if flags.Get().SubscribeToAllSubnets {
s.subscribeStaticWithSubnets(
p2p.DataColumnSubnetTopicFormat,
s.validateDataColumn, /* validator */
s.dataColumnSubscriber, /* message handler */
digest,
params.BeaconConfig().DataColumnSidecarSubnetCount,
)
} else {
s.subscribeDynamicWithColumnSubnets(
p2p.DataColumnSubnetTopicFormat,
s.validateDataColumn, /* validator */
s.dataColumnSubscriber, /* message handler */
digest,
)
}
} else {
s.subscribeStaticWithSubnets(
p2p.BlobSubnetTopicFormat,
s.validateBlob, /* validator */
s.blobSubscriber, /* message handler */
digest,
params.BeaconConfig().BlobsidecarSubnetCount,
)
}
s.registerSubscribersFromDeneb(epoch, digest)
}
}

View File

@@ -10,9 +10,9 @@ import (
"google.golang.org/protobuf/proto"
)
// beaconAggregateProofSubscriber forwards the incoming validated aggregated attestation and proof to the
// beaconAggregateAndProofSubscriber forwards the incoming validated aggregated attestation and proof to the
// attestation pool for processing.
func (s *Service) beaconAggregateProofSubscriber(_ context.Context, msg proto.Message) error {
func (s *Service) beaconAggregateAndProofSubscriber(_ context.Context, msg proto.Message) error {
a, ok := msg.(ethpb.SignedAggregateAttAndProof)
if !ok {
return fmt.Errorf("message was not type ethpb.SignedAggregateAttAndProof, type=%T", msg)

View File

@@ -33,7 +33,7 @@ func TestBeaconAggregateProofSubscriber_CanSaveAggregatedAttestation(t *testing.
},
Signature: make([]byte, fieldparams.BLSSignatureLength),
}
require.NoError(t, r.beaconAggregateProofSubscriber(context.Background(), a))
require.NoError(t, r.beaconAggregateAndProofSubscriber(context.Background(), a))
assert.DeepSSZEqual(t, []ethpb.Att{a.Message.Aggregate}, r.cfg.attPool.AggregatedAttestations(), "Did not save aggregated attestation")
}
@@ -55,7 +55,7 @@ func TestBeaconAggregateProofSubscriber_CanSaveUnaggregatedAttestation(t *testin
AggregatorIndex: 100,
},
}
require.NoError(t, r.beaconAggregateProofSubscriber(context.Background(), a))
require.NoError(t, r.beaconAggregateAndProofSubscriber(context.Background(), a))
atts, err := r.cfg.attPool.UnaggregatedAttestations()
require.NoError(t, err)

View File

@@ -14,7 +14,7 @@ import (
"google.golang.org/protobuf/proto"
)
func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, msg proto.Message) error {
func (s *Service) beaconAttestationSubscriber(_ context.Context, msg proto.Message) error {
a, ok := msg.(eth.Att)
if !ok {
return fmt.Errorf("message was not type eth.Att, type=%T", msg)

View File

@@ -9,10 +9,10 @@ import (
"google.golang.org/protobuf/proto"
)
// syncContributionAndProofSubscriber forwards the incoming validated sync contributions and proof to the
// syncCommitteeContributionAndProofSubscriber forwards the incoming validated sync contributions and proof to the
// contribution pool for processing.
// skipcq: SCC-U1000
func (s *Service) syncContributionAndProofSubscriber(_ context.Context, msg proto.Message) error {
func (s *Service) syncCommitteeContributionAndProofSubscriber(_ context.Context, msg proto.Message) error {
sContr, ok := msg.(*ethpb.SignedContributionAndProof)
if !ok {
return fmt.Errorf("message was not type *ethpb.SignedContributionAndProof, type=%T", msg)

View File

@@ -350,7 +350,7 @@ func Test_wrapAndReportValidation(t *testing.T) {
}
fd, err := forks.CreateForkDigest(mChain.GenesisTime(), mChain.ValidatorsRoot[:])
assert.NoError(t, err)
mockTopic := fmt.Sprintf(p2p.BlockSubnetTopicFormat, fd) + encoder.SszNetworkEncoder{}.ProtocolSuffix()
mockTopic := fmt.Sprintf(p2p.BeaconBlockSubnetTopicFormat, fd) + encoder.SszNetworkEncoder{}.ProtocolSuffix()
type args struct {
topic string
v wrappedVal

View File

@@ -23,7 +23,7 @@ func TestSubTopicHandler_CRUD(t *testing.T) {
enc := encoder.SszNetworkEncoder{}
// Valid topic added in.
topic := fmt.Sprintf(p2p.BlockSubnetTopicFormat, digest) + enc.ProtocolSuffix()
topic := fmt.Sprintf(p2p.BeaconBlockSubnetTopicFormat, digest) + enc.ProtocolSuffix()
h.addTopic(topic, new(pubsub.Subscription))
assert.Equal(t, true, h.topicExists(topic))
assert.Equal(t, true, h.digestExists(digest))
@@ -36,11 +36,11 @@ func TestSubTopicHandler_CRUD(t *testing.T) {
h = newSubTopicHandler()
// Multiple Topics added in.
topic = fmt.Sprintf(p2p.BlockSubnetTopicFormat, digest) + enc.ProtocolSuffix()
topic = fmt.Sprintf(p2p.BeaconBlockSubnetTopicFormat, digest) + enc.ProtocolSuffix()
h.addTopic(topic, new(pubsub.Subscription))
assert.Equal(t, true, h.topicExists(topic))
topic = fmt.Sprintf(p2p.ExitSubnetTopicFormat, digest) + enc.ProtocolSuffix()
topic = fmt.Sprintf(p2p.VoluntaryExitSubnetTopicFormat, digest) + enc.ProtocolSuffix()
h.addTopic(topic, new(pubsub.Subscription))
assert.Equal(t, true, h.topicExists(topic))
@@ -52,11 +52,11 @@ func TestSubTopicHandler_CRUD(t *testing.T) {
h.addTopic(topic, new(pubsub.Subscription))
assert.Equal(t, true, h.topicExists(topic))
topic = fmt.Sprintf(p2p.AggregateAndProofSubnetTopicFormat, digest) + enc.ProtocolSuffix()
topic = fmt.Sprintf(p2p.BeaconAggregateAndProofSubnetTopicFormat, digest) + enc.ProtocolSuffix()
h.addTopic(topic, new(pubsub.Subscription))
assert.Equal(t, true, h.topicExists(topic))
topic = fmt.Sprintf(p2p.SyncContributionAndProofSubnetTopicFormat, digest) + enc.ProtocolSuffix()
topic = fmt.Sprintf(p2p.SyncCommitteeContributionAndProofSubnetTopicFormat, digest) + enc.ProtocolSuffix()
h.addTopic(topic, new(pubsub.Subscription))
assert.Equal(t, true, h.topicExists(topic))
@@ -71,7 +71,7 @@ func TestSubTopicHandler_CRUD(t *testing.T) {
h.removeTopic(topic)
assert.Equal(t, false, h.topicExists(topic))
topic = fmt.Sprintf(p2p.ExitSubnetTopicFormat, digest) + enc.ProtocolSuffix()
topic = fmt.Sprintf(p2p.VoluntaryExitSubnetTopicFormat, digest) + enc.ProtocolSuffix()
h.removeTopic(topic)
assert.Equal(t, false, h.topicExists(topic))
@@ -83,15 +83,15 @@ func TestSubTopicHandler_CRUD(t *testing.T) {
assert.Equal(t, 4, len(h.allTopics()))
// Remove remaining topics.
topic = fmt.Sprintf(p2p.BlockSubnetTopicFormat, digest) + enc.ProtocolSuffix()
topic = fmt.Sprintf(p2p.BeaconBlockSubnetTopicFormat, digest) + enc.ProtocolSuffix()
h.removeTopic(topic)
assert.Equal(t, false, h.topicExists(topic))
topic = fmt.Sprintf(p2p.AggregateAndProofSubnetTopicFormat, digest) + enc.ProtocolSuffix()
topic = fmt.Sprintf(p2p.BeaconAggregateAndProofSubnetTopicFormat, digest) + enc.ProtocolSuffix()
h.removeTopic(topic)
assert.Equal(t, false, h.topicExists(topic))
topic = fmt.Sprintf(p2p.SyncContributionAndProofSubnetTopicFormat, digest) + enc.ProtocolSuffix()
topic = fmt.Sprintf(p2p.SyncCommitteeContributionAndProofSubnetTopicFormat, digest) + enc.ProtocolSuffix()
h.removeTopic(topic)
assert.Equal(t, false, h.topicExists(topic))

View File

@@ -108,7 +108,7 @@ func FuzzValidateBeaconBlockPubSub_Phase0(f *testing.F) {
Topic: &strTop,
},
}
_, err := r.validateBeaconBlockPubSub(ctx, peer.ID(pid), msg)
_, err := r.validateBeaconBlockPubSubMsg(ctx, peer.ID(pid), msg)
_ = err
})
}
@@ -190,7 +190,7 @@ func FuzzValidateBeaconBlockPubSub_Altair(f *testing.F) {
Topic: &strTop,
},
}
_, err := r.validateBeaconBlockPubSub(ctx, peer.ID(pid), msg)
_, err := r.validateBeaconBlockPubSubMsg(ctx, peer.ID(pid), msg)
_ = err
})
}
@@ -272,7 +272,7 @@ func FuzzValidateBeaconBlockPubSub_Bellatrix(f *testing.F) {
Topic: &strTop,
},
}
_, err := r.validateBeaconBlockPubSub(ctx, peer.ID(pid), msg)
_, err := r.validateBeaconBlockPubSubMsg(ctx, peer.ID(pid), msg)
_ = err
})
}

View File

@@ -26,9 +26,9 @@ import (
"github.com/prysmaticlabs/prysm/v5/time/slots"
)
// validateAggregateAndProof verifies the aggregated signature and the selection proof is valid before forwarding to the
// validateBeaconAggregateAndProofPubSubMsg verifies the aggregated signature and the selection proof is valid before forwarding to the
// network and downstream services.
func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
func (s *Service) validateBeaconAggregateAndProofPubSubMsg(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
receivedTime := prysmTime.Now()
if pid == s.cfg.p2p.PeerID() {
return pubsub.ValidationAccept, nil

View File

@@ -185,7 +185,7 @@ func TestValidateAggregateAndProof_NoBlock(t *testing.T) {
},
}
if res, err := r.validateAggregateAndProof(context.Background(), "", msg); res == pubsub.ValidationAccept {
if res, err := r.validateBeaconAggregateAndProofPubSubMsg(context.Background(), "", msg); res == pubsub.ValidationAccept {
_ = err
t.Error("Expected validate to fail")
}
@@ -255,7 +255,7 @@ func TestValidateAggregateAndProof_NotWithinSlotRange(t *testing.T) {
},
}
if res, err := r.validateAggregateAndProof(context.Background(), "", msg); res == pubsub.ValidationAccept {
if res, err := r.validateBeaconAggregateAndProofPubSubMsg(context.Background(), "", msg); res == pubsub.ValidationAccept {
_ = err
t.Error("Expected validate to fail")
}
@@ -272,7 +272,7 @@ func TestValidateAggregateAndProof_NotWithinSlotRange(t *testing.T) {
Topic: &topic,
},
}
if res, err := r.validateAggregateAndProof(context.Background(), "", msg); res == pubsub.ValidationAccept {
if res, err := r.validateBeaconAggregateAndProofPubSubMsg(context.Background(), "", msg); res == pubsub.ValidationAccept {
_ = err
t.Error("Expected validate to fail")
}
@@ -338,7 +338,7 @@ func TestValidateAggregateAndProof_ExistedInPool(t *testing.T) {
}
require.NoError(t, r.cfg.attPool.SaveBlockAttestation(att))
if res, err := r.validateAggregateAndProof(context.Background(), "", msg); res == pubsub.ValidationAccept {
if res, err := r.validateBeaconAggregateAndProofPubSubMsg(context.Background(), "", msg); res == pubsub.ValidationAccept {
_ = err
t.Error("Expected validate to fail")
}
@@ -442,7 +442,7 @@ func TestValidateAggregateAndProof_CanValidate(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateAggregateAndProof(context.Background(), "", msg)
res, err := r.validateBeaconAggregateAndProofPubSubMsg(context.Background(), "", msg)
assert.NoError(t, err)
assert.Equal(t, pubsub.ValidationAccept, res, "Validated status is false")
assert.NotNil(t, msg.ValidatorData, "Did not set validator data")
@@ -545,7 +545,7 @@ func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateAggregateAndProof(context.Background(), "", msg)
res, err := r.validateBeaconAggregateAndProofPubSubMsg(context.Background(), "", msg)
assert.NoError(t, err)
require.Equal(t, pubsub.ValidationAccept, res, "Validated status is false")
@@ -562,7 +562,7 @@ func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) {
}
time.Sleep(10 * time.Millisecond) // Wait for cached value to pass through buffers.
if res, err := r.validateAggregateAndProof(context.Background(), "", msg); res == pubsub.ValidationAccept {
if res, err := r.validateBeaconAggregateAndProofPubSubMsg(context.Background(), "", msg); res == pubsub.ValidationAccept {
_ = err
t.Fatal("Validated status is true")
}
@@ -654,7 +654,7 @@ func TestValidateAggregateAndProof_BadBlock(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateAggregateAndProof(context.Background(), "", msg)
res, err := r.validateBeaconAggregateAndProofPubSubMsg(context.Background(), "", msg)
assert.NotNil(t, err)
assert.Equal(t, pubsub.ValidationReject, res, "Validated status is true")
}
@@ -744,7 +744,7 @@ func TestValidateAggregateAndProof_RejectWhenAttEpochDoesntEqualTargetEpoch(t *t
Topic: &topic,
},
}
res, err := r.validateAggregateAndProof(context.Background(), "", msg)
res, err := r.validateBeaconAggregateAndProofPubSubMsg(context.Background(), "", msg)
assert.NotNil(t, err)
assert.Equal(t, pubsub.ValidationReject, res)
}

View File

@@ -20,7 +20,7 @@ import (
// Clients who receive an attester slashing on this topic MUST validate the conditions within VerifyAttesterSlashing before
// forwarding it across the network.
func (s *Service) validateAttesterSlashing(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
func (s *Service) validateAttesterSlashingPubSubMsg(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
// Validation runs on publish (not just subscriptions), so we should approve any message from
// ourselves.
if pid == s.cfg.p2p.PeerID() {

View File

@@ -108,7 +108,7 @@ func TestValidateAttesterSlashing_ValidSlashing(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateAttesterSlashing(ctx, "foobar", msg)
res, err := r.validateAttesterSlashingPubSubMsg(ctx, "foobar", msg)
assert.NoError(t, err)
valid := res == pubsub.ValidationAccept
@@ -153,7 +153,7 @@ func TestValidateAttesterSlashing_ValidOldSlashing(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateAttesterSlashing(ctx, "foobar", msg)
res, err := r.validateAttesterSlashingPubSubMsg(ctx, "foobar", msg)
assert.ErrorContains(t, "validators were previously slashed", err)
valid := res == pubsub.ValidationIgnore
@@ -198,7 +198,7 @@ func TestValidateAttesterSlashing_InvalidSlashing_WithdrawableEpoch(t *testing.T
Topic: &topic,
},
}
res, err := r.validateAttesterSlashing(ctx, "foobar", msg)
res, err := r.validateAttesterSlashingPubSubMsg(ctx, "foobar", msg)
assert.NoError(t, err)
valid := res == pubsub.ValidationAccept
@@ -211,7 +211,7 @@ func TestValidateAttesterSlashing_InvalidSlashing_WithdrawableEpoch(t *testing.T
}
require.NoError(t, s.SetValidators(vals))
res, err = r.validateAttesterSlashing(ctx, "foobar", msg)
res, err = r.validateAttesterSlashingPubSubMsg(ctx, "foobar", msg)
assert.ErrorContains(t, "none of the validators are slashable", err)
invalid := res == pubsub.ValidationReject
@@ -257,7 +257,7 @@ func TestValidateAttesterSlashing_CanFilter(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateAttesterSlashing(ctx, "foobar", msg)
res, err := r.validateAttesterSlashingPubSubMsg(ctx, "foobar", msg)
_ = err
ignored := res == pubsub.ValidationIgnore
assert.Equal(t, true, ignored)
@@ -278,7 +278,7 @@ func TestValidateAttesterSlashing_CanFilter(t *testing.T) {
Topic: &topic,
},
}
res, err = r.validateAttesterSlashing(ctx, "foobar", msg)
res, err = r.validateAttesterSlashingPubSubMsg(ctx, "foobar", msg)
_ = err
ignored = res == pubsub.ValidationIgnore
assert.Equal(t, true, ignored)
@@ -315,7 +315,7 @@ func TestValidateAttesterSlashing_ContextTimeout(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateAttesterSlashing(ctx, "foobar", msg)
res, err := r.validateAttesterSlashingPubSubMsg(ctx, "foobar", msg)
_ = err
valid := res == pubsub.ValidationAccept
assert.Equal(t, false, valid, "slashing from the far distant future should have timed out and returned false")
@@ -346,7 +346,7 @@ func TestValidateAttesterSlashing_Syncing(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateAttesterSlashing(ctx, "foobar", msg)
res, err := r.validateAttesterSlashingPubSubMsg(ctx, "foobar", msg)
_ = err
valid := res == pubsub.ValidationAccept
assert.Equal(t, false, valid, "Passed validation")

View File

@@ -35,7 +35,7 @@ import (
// - The attestation is unaggregated -- that is, it has exactly one participating validator (len(get_attesting_indices(state, attestation.data, attestation.aggregation_bits)) == 1).
// - attestation.data.slot is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (attestation.data.slot + ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= attestation.data.slot).
// - The signature of attestation is valid.
func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
func (s *Service) validateBeaconAttestationPubSubMsg(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
if pid == s.cfg.p2p.PeerID() {
return pubsub.ValidationAccept, nil
}

View File

@@ -290,7 +290,7 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) {
m.Message.Topic = nil
}
res, err := s.validateCommitteeIndexBeaconAttestation(ctx, "", m)
res, err := s.validateBeaconAttestationPubSubMsg(ctx, "", m)
received := res == pubsub.ValidationAccept
if received != tt.want {
t.Fatalf("Did not received wanted validation. Got %v, wanted %v", !tt.want, tt.want)

View File

@@ -35,10 +35,10 @@ var (
errRejectCommitmentLen = errors.New("[REJECT] The length of KZG commitments is less than or equal to the limitation defined in Consensus Layer")
)
// validateBeaconBlockPubSub checks that the incoming block has a valid BLS signature.
// validateBeaconBlockPubSubMsg checks that the incoming block has a valid BLS signature.
// Blocks that have already been seen are ignored. If the BLS signature is any valid signature,
// this method rebroadcasts the message.
func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
func (s *Service) validateBeaconBlockPubSubMsg(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
receivedTime := prysmTime.Now()
// Validation runs on publish (not just subscriptions), so we should approve any message from
// ourselves.

View File

@@ -101,7 +101,7 @@ func TestValidateBeaconBlockPubSub_InvalidSignature(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
require.ErrorIs(t, err, signing.ErrSigFailedToVerify)
result := res == pubsub.ValidationReject
assert.Equal(t, true, result)
@@ -145,7 +145,7 @@ func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
assert.NoError(t, err)
assert.Equal(t, res, pubsub.ValidationIgnore, "block present in DB should be ignored")
}
@@ -208,7 +208,7 @@ func TestValidateBeaconBlockPubSub_CanRecoverStateSummary(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
assert.NoError(t, err)
result := res == pubsub.ValidationAccept
assert.Equal(t, true, result)
@@ -274,7 +274,7 @@ func TestValidateBeaconBlockPubSub_IsInCache(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
assert.NoError(t, err)
result := res == pubsub.ValidationAccept
assert.Equal(t, true, result)
@@ -340,7 +340,7 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
assert.NoError(t, err)
result := res == pubsub.ValidationAccept
assert.Equal(t, true, result)
@@ -409,7 +409,7 @@ func TestValidateBeaconBlockPubSub_WithLookahead(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
assert.NoError(t, err)
result := res == pubsub.ValidationAccept
assert.Equal(t, true, result)
@@ -477,7 +477,7 @@ func TestValidateBeaconBlockPubSub_AdvanceEpochsForState(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
assert.NoError(t, err)
result := res == pubsub.ValidationAccept
assert.Equal(t, true, result)
@@ -520,7 +520,7 @@ func TestValidateBeaconBlockPubSub_Syncing(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
assert.NoError(t, err)
assert.Equal(t, res, pubsub.ValidationIgnore, "block is ignored until fully synced")
}
@@ -586,7 +586,7 @@ func TestValidateBeaconBlockPubSub_IgnoreAndQueueBlocksFromNearFuture(t *testing
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
require.ErrorContains(t, "early block, with current slot", err)
assert.Equal(t, res, pubsub.ValidationIgnore, "early block should be ignored and queued")
@@ -637,7 +637,7 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
assert.NoError(t, err)
assert.Equal(t, res, pubsub.ValidationIgnore, "block from the future should be ignored")
}
@@ -688,7 +688,7 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromThePast(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
require.ErrorContains(t, "greater or equal to block slot", err)
assert.Equal(t, res, pubsub.ValidationIgnore, "block from the past should be ignored")
}
@@ -750,7 +750,7 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) {
}
r.setSeenBlockIndexSlot(msg.Block.Slot, msg.Block.ProposerIndex)
time.Sleep(10 * time.Millisecond) // Wait for cached value to pass through buffers.
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
assert.NoError(t, err)
assert.Equal(t, res, pubsub.ValidationIgnore, "seen proposer block should be ignored")
}
@@ -801,7 +801,7 @@ func TestValidateBeaconBlockPubSub_FilterByFinalizedEpoch(t *testing.T) {
},
}
res, err := r.validateBeaconBlockPubSub(context.Background(), "", m)
res, err := r.validateBeaconBlockPubSubMsg(context.Background(), "", m)
_ = err
assert.Equal(t, pubsub.ValidationIgnore, res)
@@ -817,7 +817,7 @@ func TestValidateBeaconBlockPubSub_FilterByFinalizedEpoch(t *testing.T) {
},
}
res, err = r.validateBeaconBlockPubSub(context.Background(), "", m)
res, err = r.validateBeaconBlockPubSubMsg(context.Background(), "", m)
assert.NoError(t, err)
assert.Equal(t, pubsub.ValidationIgnore, res)
}
@@ -884,7 +884,7 @@ func TestValidateBeaconBlockPubSub_ParentNotFinalizedDescendant(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
assert.Equal(t, pubsub.ValidationReject, res, "Wrong validation result returned")
require.ErrorContains(t, "not descendant of finalized checkpoint", err)
}
@@ -950,7 +950,7 @@ func TestValidateBeaconBlockPubSub_InvalidParentBlock(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
require.ErrorContains(t, "could not unmarshal bytes into signature", err)
assert.Equal(t, res, pubsub.ValidationReject, "block with invalid signature should be rejected")
@@ -982,7 +982,7 @@ func TestValidateBeaconBlockPubSub_InvalidParentBlock(t *testing.T) {
r.cfg.chain = chainService
r.cfg.clock = startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)
res, err = r.validateBeaconBlockPubSub(ctx, "", m)
res, err = r.validateBeaconBlockPubSubMsg(ctx, "", m)
require.ErrorContains(t, "has an invalid parent", err)
// Expect block with bad parent to fail too
assert.Equal(t, res, pubsub.ValidationReject, "block with invalid parent should be ignored")
@@ -1044,7 +1044,7 @@ func TestValidateBeaconBlockPubSub_InsertValidPendingBlock(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
require.ErrorContains(t, "unknown parent for block", err)
assert.Equal(t, res, pubsub.ValidationIgnore, "block with unknown parent should be ignored")
bRoot, err = msg.Block.HashTreeRoot()
@@ -1129,7 +1129,7 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromBadParent(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
assert.ErrorContains(t, "invalid parent", err)
assert.Equal(t, res, pubsub.ValidationReject)
}
@@ -1230,7 +1230,7 @@ func TestValidateBeaconBlockPubSub_ValidExecutionPayload(t *testing.T) {
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
require.NoError(t, err)
result := res == pubsub.ValidationAccept
require.Equal(t, true, result)
@@ -1301,7 +1301,7 @@ func TestValidateBeaconBlockPubSub_InvalidPayloadTimestamp(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
require.NotNil(t, err)
result := res == pubsub.ValidationReject
assert.Equal(t, true, result)
@@ -1462,7 +1462,7 @@ func Test_validateBeaconBlockProcessingWhenParentIsOptimistic(t *testing.T) {
},
}
res, err := r.validateBeaconBlockPubSub(ctx, "", m)
res, err := r.validateBeaconBlockPubSubMsg(ctx, "", m)
require.NoError(t, err)
result := res == pubsub.ValidationAccept
assert.Equal(t, true, result)

View File

@@ -17,7 +17,7 @@ import (
// Clients who receive a proposer slashing on this topic MUST validate the conditions within VerifyProposerSlashing before
// forwarding it across the network.
func (s *Service) validateProposerSlashing(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
func (s *Service) validateProposerSlashingPubSubMsg(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
// Validation runs on publish (not just subscriptions), so we should approve any message from
// ourselves.
if pid == s.cfg.p2p.PeerID() {

View File

@@ -140,7 +140,7 @@ func TestValidateProposerSlashing_ValidSlashing(t *testing.T) {
},
}
res, err := r.validateProposerSlashing(ctx, "", m)
res, err := r.validateProposerSlashingPubSubMsg(ctx, "", m)
assert.NoError(t, err)
valid := res == pubsub.ValidationAccept
assert.Equal(t, true, valid, "Failed validation")
@@ -183,7 +183,7 @@ func TestValidateProposerSlashing_ValidOldSlashing(t *testing.T) {
},
}
res, err := r.validateProposerSlashing(ctx, "", m)
res, err := r.validateProposerSlashingPubSubMsg(ctx, "", m)
assert.ErrorContains(t, "proposer is already slashed", err)
valid := res == pubsub.ValidationIgnore
assert.Equal(t, true, valid, "Failed validation")
@@ -220,7 +220,7 @@ func TestValidateProposerSlashing_ContextTimeout(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateProposerSlashing(ctx, "", m)
res, err := r.validateProposerSlashingPubSubMsg(ctx, "", m)
assert.NotNil(t, err)
valid := res == pubsub.ValidationAccept
assert.Equal(t, false, valid, "Slashing from the far distant future should have timed out and returned false")
@@ -250,7 +250,7 @@ func TestValidateProposerSlashing_Syncing(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateProposerSlashing(ctx, "", m)
res, err := r.validateProposerSlashingPubSubMsg(ctx, "", m)
_ = err
valid := res == pubsub.ValidationAccept
assert.Equal(t, false, valid, "Did not fail validation")

View File

@@ -20,7 +20,7 @@ import (
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
)
// validateSyncContributionAndProof verifies the aggregated signature and the selection proof is valid before forwarding to the
// validateSyncCommitteeContributionAndProofPubSubMsg verifies the aggregated signature and the selection proof is valid before forwarding to the
// network and downstream services.
// Gossip Validation Conditions:
// [IGNORE] The contribution's slot is for the current slot (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance), i.e. contribution.slot == current_slot.
@@ -38,7 +38,7 @@ import (
// [REJECT] The aggregator signature, signed_contribution_and_proof.signature, is valid.
// [REJECT] The aggregate signature is valid for the message beacon_block_root and aggregate pubkey derived from the participation
// info in aggregation_bits for the subcommittee specified by the contribution.subcommittee_index.
func (s *Service) validateSyncContributionAndProof(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
func (s *Service) validateSyncCommitteeContributionAndProofPubSubMsg(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
ctx, span := trace.StartSpan(ctx, "sync.validateSyncContributionAndProof")
defer span.End()

View File

@@ -44,7 +44,7 @@ import (
func TestService_ValidateSyncContributionAndProof(t *testing.T) {
database := testingdb.SetupDB(t)
headRoot, keys := fillUpBlocksAndState(context.Background(), t, database)
defaultTopic := p2p.SyncContributionAndProofSubnetTopicFormat
defaultTopic := p2p.SyncCommitteeContributionAndProofSubnetTopicFormat
defaultTopic = fmt.Sprintf(defaultTopic, []byte{0xAB, 0x00, 0xCC, 0x9E})
defaultTopic = defaultTopic + "/" + encoder.ProtocolSuffixSSZSnappy
chainService := &mockChain.ChainService{
@@ -878,7 +878,7 @@ func TestService_ValidateSyncContributionAndProof(t *testing.T) {
}
}
require.Equal(t, true, svc.chainIsStarted())
if got, err := svc.validateSyncContributionAndProof(ctx, tt.args.pid, msg); got != tt.want {
if got, err := svc.validateSyncCommitteeContributionAndProofPubSubMsg(ctx, tt.args.pid, msg); got != tt.want {
_ = err
t.Errorf("validateSyncContributionAndProof() = %v, want %v", got, tt.want)
}
@@ -890,7 +890,7 @@ func TestValidateSyncContributionAndProof(t *testing.T) {
ctx := context.Background()
database := testingdb.SetupDB(t)
headRoot, keys := fillUpBlocksAndState(ctx, t, database)
defaultTopic := p2p.SyncContributionAndProofSubnetTopicFormat
defaultTopic := p2p.SyncCommitteeContributionAndProofSubnetTopicFormat
defaultTopic = fmt.Sprintf(defaultTopic, []byte{0xAB, 0x00, 0xCC, 0x9E})
defaultTopic = defaultTopic + "/" + encoder.ProtocolSuffixSSZSnappy
var emptySig [96]byte
@@ -1004,7 +1004,7 @@ func TestValidateSyncContributionAndProof(t *testing.T) {
opSub := s.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
_, err = s.validateSyncContributionAndProof(ctx, pid, pubsubMsg)
_, err = s.validateSyncCommitteeContributionAndProofPubSubMsg(ctx, pid, pubsubMsg)
require.NoError(t, err)
// Ensure the state notification was broadcast.

View File

@@ -17,7 +17,7 @@ import (
// Clients who receive a voluntary exit on this topic MUST validate the conditions within process_voluntary_exit before
// forwarding it across the network.
func (s *Service) validateVoluntaryExit(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
func (s *Service) validateVoluntaryExitPubSubMsg(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
// Validation runs on publish (not just subscriptions), so we should approve any message from
// ourselves.
if pid == s.cfg.p2p.PeerID() {

View File

@@ -118,7 +118,7 @@ func TestValidateVoluntaryExit_ValidExit(t *testing.T) {
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
res, err := r.validateVoluntaryExit(ctx, "", m)
res, err := r.validateVoluntaryExitPubSubMsg(ctx, "", m)
require.NoError(t, err)
require.Equal(t, pubsub.ValidationAccept, res, "Failed validation")
require.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data")
@@ -166,7 +166,7 @@ func TestValidateVoluntaryExit_InvalidExitSlot(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateVoluntaryExit(ctx, "", m)
res, err := r.validateVoluntaryExitPubSubMsg(ctx, "", m)
_ = err
valid := res == pubsub.ValidationAccept
assert.Equal(t, false, valid, "passed validation")
@@ -197,7 +197,7 @@ func TestValidateVoluntaryExit_ValidExit_Syncing(t *testing.T) {
Topic: &topic,
},
}
res, err := r.validateVoluntaryExit(ctx, "", m)
res, err := r.validateVoluntaryExitPubSubMsg(ctx, "", m)
_ = err
valid := res == pubsub.ValidationAccept
assert.Equal(t, false, valid, "Validation should have failed")

View File

@@ -69,14 +69,14 @@ const (
var metricComparisonTests = []comparisonTest{
{
name: "beacon aggregate and proof",
topic1: fmt.Sprintf(p2pFailValidationTopic, p2p.AggregateAndProofSubnetTopicFormat),
topic2: fmt.Sprintf(p2pReceivedTotalTopic, p2p.AggregateAndProofSubnetTopicFormat),
topic1: fmt.Sprintf(p2pFailValidationTopic, p2p.BeaconAggregateAndProofSubnetTopicFormat),
topic2: fmt.Sprintf(p2pReceivedTotalTopic, p2p.BeaconAggregateAndProofSubnetTopicFormat),
expectedComparison: 0.8,
},
{
name: "committee index beacon attestations",
topic1: fmt.Sprintf(p2pFailValidationTopic, formatTopic(p2p.AttestationSubnetTopicFormat)),
topic2: fmt.Sprintf(p2pReceivedTotalTopic, formatTopic(p2p.AttestationSubnetTopicFormat)),
topic1: fmt.Sprintf(p2pFailValidationTopic, formatTopic(p2p.BeaconAttestationSubnetTopicFormat)),
topic2: fmt.Sprintf(p2pReceivedTotalTopic, formatTopic(p2p.BeaconAttestationSubnetTopicFormat)),
expectedComparison: 0.15,
},
{