Embed Config Pattern For Slasher, Slashing Protection (#8637)

Co-authored-by: terence tsao <terence@prysmaticlabs.com>
This commit is contained in:
kevlu93
2021-03-21 18:53:17 +01:00
committed by GitHub
parent 4a64d4d133
commit 14439d2b12
19 changed files with 127 additions and 160 deletions

View File

@@ -20,7 +20,7 @@ func (s *Service) ChainHead(
) (*ethpb.ChainHead, error) {
ctx, span := trace.StartSpan(ctx, "beaconclient.ChainHead")
defer span.End()
res, err := s.beaconClient.GetChainHead(ctx, &ptypes.Empty{})
res, err := s.cfg.BeaconClient.GetChainHead(ctx, &ptypes.Empty{})
if err != nil || res == nil {
return nil, errors.Wrap(err, "Could not retrieve chain head or got nil chain head")
}
@@ -36,7 +36,7 @@ func (s *Service) GenesisValidatorsRoot(
defer span.End()
if s.genesisValidatorRoot == nil {
res, err := s.nodeClient.GetGenesis(ctx, &ptypes.Empty{})
res, err := s.cfg.NodeClient.GetGenesis(ctx, &ptypes.Empty{})
if err != nil {
return nil, errors.Wrap(err, "could not retrieve genesis data")
}
@@ -51,7 +51,7 @@ func (s *Service) GenesisValidatorsRoot(
// Poll the beacon node every syncStatusPollingInterval until the node
// is no longer syncing.
func (s *Service) querySyncStatus(ctx context.Context) {
status, err := s.nodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
status, err := s.cfg.NodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Could not fetch sync status")
}
@@ -65,7 +65,7 @@ func (s *Service) querySyncStatus(ctx context.Context) {
for {
select {
case <-ticker.C:
status, err := s.nodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
status, err := s.cfg.NodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Could not fetch sync status")
}

View File

@@ -19,7 +19,7 @@ func TestService_ChainHead(t *testing.T) {
client := mock.NewMockBeaconChainClient(ctrl)
bs := Service{
beaconClient: client,
cfg: &Config{BeaconClient: client},
}
wanted := &ethpb.ChainHead{
HeadSlot: 4,
@@ -38,7 +38,7 @@ func TestService_GenesisValidatorsRoot(t *testing.T) {
client := mock.NewMockNodeClient(ctrl)
bs := Service{
nodeClient: client,
cfg: &Config{NodeClient: client},
}
wanted := &ethpb.Genesis{
GenesisValidatorsRoot: []byte("I am genesis"),
@@ -60,7 +60,7 @@ func TestService_QuerySyncStatus(t *testing.T) {
client := mock.NewMockNodeClient(ctrl)
bs := Service{
nodeClient: client,
cfg: &Config{NodeClient: client},
}
syncStatusPollingInterval = time.Millisecond
client.EXPECT().GetSyncStatus(gomock.Any(), gomock.Any()).Return(&ethpb.SyncStatus{

View File

@@ -27,7 +27,7 @@ func (s *Service) RequestHistoricalAttestations(
if res == nil {
res = &ethpb.ListIndexedAttestationsResponse{}
}
res, err = s.beaconClient.ListIndexedAttestations(ctx, &ethpb.ListIndexedAttestationsRequest{
res, err = s.cfg.BeaconClient.ListIndexedAttestations(ctx, &ethpb.ListIndexedAttestationsRequest{
QueryFilter: &ethpb.ListIndexedAttestationsRequest_Epoch{
Epoch: epoch,
},

View File

@@ -25,8 +25,10 @@ func TestService_RequestHistoricalAttestations(t *testing.T) {
client := mock.NewMockBeaconChainClient(ctrl)
bs := Service{
beaconClient: client,
slasherDB: db,
cfg: &Config{
BeaconClient: client,
SlasherDB: db,
},
}
numAtts := 1000

View File

@@ -27,7 +27,7 @@ var reconnectPeriod = 5 * time.Second
func (s *Service) ReceiveBlocks(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "beaconclient.ReceiveBlocks")
defer span.End()
stream, err := s.beaconClient.StreamBlocks(ctx, &ethpb.StreamBlocksRequest{} /* Prefers unverified block to catch slashing */)
stream, err := s.cfg.BeaconClient.StreamBlocks(ctx, &ethpb.StreamBlocksRequest{} /* Prefers unverified block to catch slashing */)
if err != nil {
log.WithError(err).Error("Failed to retrieve blocks stream")
return
@@ -53,7 +53,7 @@ func (s *Service) ReceiveBlocks(ctx context.Context) {
log.WithError(err).Error("Could not restart beacon connection")
return
}
stream, err = s.beaconClient.StreamBlocks(ctx, &ethpb.StreamBlocksRequest{} /* Prefers unverified block to catch slashing */)
stream, err = s.cfg.BeaconClient.StreamBlocks(ctx, &ethpb.StreamBlocksRequest{} /* Prefers unverified block to catch slashing */)
if err != nil {
log.WithError(err).Error("Could not restart block stream")
return
@@ -93,7 +93,7 @@ func (s *Service) ReceiveBlocks(ctx context.Context) {
func (s *Service) ReceiveAttestations(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "beaconclient.ReceiveAttestations")
defer span.End()
stream, err := s.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
stream, err := s.cfg.BeaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Failed to retrieve attestations stream")
return
@@ -122,7 +122,7 @@ func (s *Service) ReceiveAttestations(ctx context.Context) {
log.WithError(err).Error("Could not restart beacon connection")
return
}
stream, err = s.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
stream, err = s.cfg.BeaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Could not restart attestation stream")
return
@@ -162,7 +162,7 @@ func (s *Service) collectReceivedAttestations(ctx context.Context) {
case att := <-s.receivedAttestationsBuffer:
atts = append(atts, att)
case collectedAtts := <-s.collectedAttestationsBuffer:
if err := s.slasherDB.SaveIndexedAttestations(ctx, collectedAtts); err != nil {
if err := s.cfg.SlasherDB.SaveIndexedAttestations(ctx, collectedAtts); err != nil {
log.WithError(err).Error("Could not save indexed attestation")
continue
}
@@ -197,7 +197,7 @@ func (s *Service) restartBeaconConnection(ctx context.Context) error {
log.Info("Beacon node is still down")
continue
}
s, err := s.nodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
s, err := s.cfg.NodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Could not fetch sync status")
continue

View File

@@ -21,8 +21,8 @@ func TestService_ReceiveBlocks(t *testing.T) {
client := mock.NewMockBeaconChainClient(ctrl)
bs := Service{
beaconClient: client,
blockFeed: new(event.Feed),
cfg: &Config{BeaconClient: client},
blockFeed: new(event.Feed),
}
stream := mock.NewMockBeaconChain_StreamBlocksClient(ctrl)
ctx, cancel := context.WithCancel(context.Background())
@@ -46,7 +46,7 @@ func TestService_ReceiveAttestations(t *testing.T) {
client := mock.NewMockBeaconChainClient(ctrl)
bs := Service{
beaconClient: client,
cfg: &Config{BeaconClient: client},
blockFeed: new(event.Feed),
receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1),
collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1),
@@ -78,9 +78,11 @@ func TestService_ReceiveAttestations_Batched(t *testing.T) {
client := mock.NewMockBeaconChainClient(ctrl)
bs := Service{
beaconClient: client,
cfg: &Config{
BeaconClient: client,
SlasherDB: testDB.SetupSlasherDB(t, false),
},
blockFeed: new(event.Feed),
slasherDB: testDB.SetupSlasherDB(t, false),
attestationFeed: new(event.Feed),
receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1),
collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1),

View File

@@ -39,21 +39,15 @@ type ChainFetcher interface {
// Service struct for the beaconclient service of the slasher.
type Service struct {
cfg *Config
ctx context.Context
cancel context.CancelFunc
cert string
conn *grpc.ClientConn
provider string
beaconClient ethpb.BeaconChainClient
slasherDB db.Database
nodeClient ethpb.NodeClient
clientFeed *event.Feed
blockFeed *event.Feed
attestationFeed *event.Feed
proposerSlashingsChan chan *ethpb.ProposerSlashing
attesterSlashingsChan chan *ethpb.AttesterSlashing
attesterSlashingsFeed *event.Feed
proposerSlashingsFeed *event.Feed
receivedAttestationsBuffer chan *ethpb.IndexedAttestation
collectedAttestationsBuffer chan []*ethpb.IndexedAttestation
publicKeyCache *cache.PublicKeyCache
@@ -82,23 +76,17 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
}
return &Service{
cert: cfg.BeaconCert,
cfg: cfg,
ctx: ctx,
cancel: cancel,
provider: cfg.BeaconProvider,
blockFeed: new(event.Feed),
clientFeed: new(event.Feed),
attestationFeed: new(event.Feed),
slasherDB: cfg.SlasherDB,
proposerSlashingsChan: make(chan *ethpb.ProposerSlashing, 1),
attesterSlashingsChan: make(chan *ethpb.AttesterSlashing, 1),
attesterSlashingsFeed: cfg.AttesterSlashingsFeed,
proposerSlashingsFeed: cfg.ProposerSlashingsFeed,
receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1),
collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1),
publicKeyCache: publicKeyCache,
beaconClient: cfg.BeaconClient,
nodeClient: cfg.NodeClient,
}, nil
}
@@ -145,8 +133,8 @@ func (s *Service) Status() error {
// after they are detected by other services in the slasher.
func (s *Service) Start() {
var dialOpt grpc.DialOption
if s.cert != "" {
creds, err := credentials.NewClientTLSFromFile(s.cert, "")
if s.cfg.BeaconCert != "" {
creds, err := credentials.NewClientTLSFromFile(s.cfg.BeaconCert, "")
if err != nil {
log.Errorf("Could not get valid credentials: %v", err)
}
@@ -173,15 +161,15 @@ func (s *Service) Start() {
grpcutils.LogRequests,
)),
}
conn, err := grpc.DialContext(s.ctx, s.provider, beaconOpts...)
conn, err := grpc.DialContext(s.ctx, s.cfg.BeaconProvider, beaconOpts...)
if err != nil {
log.Fatalf("Could not dial endpoint: %s, %v", s.provider, err)
log.Fatalf("Could not dial endpoint: %s, %v", s.cfg.BeaconProvider, err)
}
s.beaconDialOptions = beaconOpts
log.Info("Successfully started gRPC connection")
s.conn = conn
s.beaconClient = ethpb.NewBeaconChainClient(s.conn)
s.nodeClient = ethpb.NewNodeClient(s.conn)
s.cfg.BeaconClient = ethpb.NewBeaconChainClient(s.conn)
s.cfg.NodeClient = ethpb.NewNodeClient(s.conn)
// We poll for the sync status of the beacon node until it is fully synced.
s.querySyncStatus(s.ctx)

View File

@@ -18,12 +18,12 @@ import (
func (s *Service) subscribeDetectedProposerSlashings(ctx context.Context, ch chan *ethpb.ProposerSlashing) {
ctx, span := trace.StartSpan(ctx, "beaconclient.submitProposerSlashing")
defer span.End()
sub := s.proposerSlashingsFeed.Subscribe(ch)
sub := s.cfg.ProposerSlashingsFeed.Subscribe(ch)
defer sub.Unsubscribe()
for {
select {
case slashing := <-ch:
if _, err := s.beaconClient.SubmitProposerSlashing(ctx, slashing); err != nil {
if _, err := s.cfg.BeaconClient.SubmitProposerSlashing(ctx, slashing); err != nil {
log.Error(err)
}
case <-sub.Err():
@@ -43,14 +43,14 @@ func (s *Service) subscribeDetectedProposerSlashings(ctx context.Context, ch cha
func (s *Service) subscribeDetectedAttesterSlashings(ctx context.Context, ch chan *ethpb.AttesterSlashing) {
ctx, span := trace.StartSpan(ctx, "beaconclient.submitAttesterSlashing")
defer span.End()
sub := s.attesterSlashingsFeed.Subscribe(ch)
sub := s.cfg.AttesterSlashingsFeed.Subscribe(ch)
defer sub.Unsubscribe()
for {
select {
case slashing := <-ch:
if slashing != nil && slashing.Attestation_1 != nil && slashing.Attestation_2 != nil {
slashableIndices := sliceutil.IntersectionUint64(slashing.Attestation_1.AttestingIndices, slashing.Attestation_2.AttestingIndices)
_, err := s.beaconClient.SubmitAttesterSlashing(ctx, slashing)
_, err := s.cfg.BeaconClient.SubmitAttesterSlashing(ctx, slashing)
if err == nil {
log.WithFields(logrus.Fields{
"sourceEpoch": slashing.Attestation_1.Data.Source.Epoch,

View File

@@ -19,8 +19,10 @@ func TestService_SubscribeDetectedProposerSlashings(t *testing.T) {
client := mock.NewMockBeaconChainClient(ctrl)
bs := Service{
beaconClient: client,
proposerSlashingsFeed: new(event.Feed),
cfg: &Config{
BeaconClient: client,
ProposerSlashingsFeed: new(event.Feed),
},
}
slashing := &ethpb.ProposerSlashing{
@@ -61,8 +63,10 @@ func TestService_SubscribeDetectedAttesterSlashings(t *testing.T) {
client := mock.NewMockBeaconChainClient(ctrl)
bs := Service{
beaconClient: client,
attesterSlashingsFeed: new(event.Feed),
cfg: &Config{
BeaconClient: client,
AttesterSlashingsFeed: new(event.Feed),
},
}
slashing := &ethpb.AttesterSlashing{

View File

@@ -43,7 +43,7 @@ func (s *Service) FindOrGetPublicKeys(
if notFound == 0 {
return validators, nil
}
vc, err := s.beaconClient.ListValidators(ctx, &ethpb.ListValidatorsRequest{
vc, err := s.cfg.BeaconClient.ListValidators(ctx, &ethpb.ListValidatorsRequest{
Indices: validatorIndices,
})
if err != nil {

View File

@@ -24,7 +24,7 @@ func TestService_RequestValidator(t *testing.T) {
validatorCache, err := cache.NewPublicKeyCache(0, nil)
require.NoError(t, err, "Could not create new cache")
bs := Service{
beaconClient: client,
cfg: &Config{BeaconClient: client},
publicKeyCache: validatorCache,
}
wanted := &ethpb.Validators{

View File

@@ -73,7 +73,7 @@ func (s *Service) DetectAttesterSlashings(
}
}
if len(slashings) > 0 {
if err := s.slasherDB.SaveAttesterSlashings(ctx, status.Active, slashings); err != nil {
if err := s.cfg.SlasherDB.SaveAttesterSlashings(ctx, status.Active, slashings); err != nil {
return nil, err
}
}
@@ -187,7 +187,7 @@ func (s *Service) mapResultsToAtts(ctx context.Context, results []*types.Detecti
if _, ok := resultsToAtts[resultKey]; ok {
continue
}
matchingAtts, err := s.slasherDB.IndexedAttestationsWithPrefix(ctx, result.SlashableEpoch, result.SigBytes[:])
matchingAtts, err := s.cfg.SlasherDB.IndexedAttestationsWithPrefix(ctx, result.SlashableEpoch, result.SigBytes[:])
if err != nil {
return nil, err
}
@@ -214,7 +214,7 @@ func isDoubleVote(incomingAtt, prevAtt *ethpb.IndexedAttestation) bool {
// UpdateHighestAttestation updates to the db the highest source and target attestations for a each validator.
func (s *Service) UpdateHighestAttestation(ctx context.Context, att *ethpb.IndexedAttestation) error {
for _, idx := range att.AttestingIndices {
h, err := s.slasherDB.HighestAttestation(ctx, idx)
h, err := s.cfg.SlasherDB.HighestAttestation(ctx, idx)
if err != nil {
return err
}
@@ -238,7 +238,7 @@ func (s *Service) UpdateHighestAttestation(ctx context.Context, att *ethpb.Index
// If it's not a new instance of HighestAttestation, changing it will also change the cached instance.
if update {
if err := s.slasherDB.SaveHighestAttestation(ctx, h); err != nil {
if err := s.cfg.SlasherDB.SaveHighestAttestation(ctx, h); err != nil {
return err
}
}

View File

@@ -173,7 +173,7 @@ func TestDetect_detectAttesterSlashings_Surround(t *testing.T) {
ctx := context.Background()
ds := Service{
ctx: ctx,
slasherDB: db,
cfg: &Config{SlasherDB: db},
minMaxSpanDetector: attestations.NewSpanDetector(db),
}
require.NoError(t, db.SaveIndexedAttestations(ctx, tt.savedAtts))
@@ -324,7 +324,7 @@ func TestDetect_detectAttesterSlashings_Double(t *testing.T) {
ctx := context.Background()
ds := Service{
ctx: ctx,
slasherDB: db,
cfg: &Config{SlasherDB: db},
minMaxSpanDetector: attestations.NewSpanDetector(db),
}
require.NoError(t, db.SaveIndexedAttestations(ctx, tt.savedAtts))
@@ -479,7 +479,7 @@ func TestDetect_updateHighestAttestation(t *testing.T) {
ctx := context.Background()
ds := Service{
ctx: ctx,
slasherDB: db,
cfg: &Config{SlasherDB: db},
proposalsDetector: proposals.NewProposeDetector(db),
}
require.NoError(t, db.SaveHighestAttestation(ctx, tt.savedHighest))
@@ -537,7 +537,7 @@ func TestDetect_detectProposerSlashing(t *testing.T) {
ctx := context.Background()
ds := Service{
ctx: ctx,
slasherDB: db,
cfg: &Config{SlasherDB: db},
proposalsDetector: proposals.NewProposeDetector(db),
}
require.NoError(t, db.SaveBlockHeader(ctx, tt.blk))
@@ -616,7 +616,7 @@ func TestDetect_detectProposerSlashingNoUpdate(t *testing.T) {
ctx := context.Background()
ds := Service{
ctx: ctx,
slasherDB: db,
cfg: &Config{SlasherDB: db},
proposalsDetector: proposals.NewProposeDetector(db),
}
require.NoError(t, db.SaveBlockHeader(ctx, tt.blk))
@@ -632,8 +632,8 @@ func TestServer_MapResultsToAtts(t *testing.T) {
db := testDB.SetupSlasherDB(t, false)
ctx := context.Background()
ds := Service{
ctx: ctx,
slasherDB: db,
ctx: ctx,
cfg: &Config{SlasherDB: db},
}
// 3 unique results, but 7 validators in total.
results := []*types.DetectionResult{
@@ -702,7 +702,7 @@ func TestServer_MapResultsToAtts(t *testing.T) {
},
}
for _, atts := range expectedResultsToAtts {
require.NoError(t, ds.slasherDB.SaveIndexedAttestations(ctx, atts))
require.NoError(t, ds.cfg.SlasherDB.SaveIndexedAttestations(ctx, atts))
}
resultsToAtts, err := ds.mapResultsToAtts(ctx, results)

View File

@@ -21,7 +21,7 @@ import (
func (s *Service) detectIncomingBlocks(ctx context.Context, ch chan *ethpb.SignedBeaconBlock) {
ctx, span := trace.StartSpan(ctx, "detection.detectIncomingBlocks")
defer span.End()
sub := s.notifier.BlockFeed().Subscribe(ch)
sub := s.cfg.Notifier.BlockFeed().Subscribe(ch)
defer sub.Unsubscribe()
for {
select {
@@ -54,7 +54,7 @@ func (s *Service) detectIncomingBlocks(ctx context.Context, ch chan *ethpb.Signe
func (s *Service) detectIncomingAttestations(ctx context.Context, ch chan *ethpb.IndexedAttestation) {
ctx, span := trace.StartSpan(ctx, "detection.detectIncomingAttestations")
defer span.End()
sub := s.notifier.AttestationFeed().Subscribe(ch)
sub := s.cfg.Notifier.AttestationFeed().Subscribe(ch)
defer sub.Unsubscribe()
for {
select {

View File

@@ -38,7 +38,7 @@ func TestService_DetectIncomingBlocks(t *testing.T) {
hook := logTest.NewGlobal()
db := testDB.SetupSlasherDB(t, false)
ds := Service{
notifier: &mockNotifier{},
cfg: &Config{Notifier: &mockNotifier{}},
proposalsDetector: proposals.NewProposeDetector(db),
}
blk := &ethpb.SignedBeaconBlock{
@@ -61,9 +61,11 @@ func TestService_DetectIncomingBlocks(t *testing.T) {
func TestService_DetectIncomingAttestations(t *testing.T) {
hook := logTest.NewGlobal()
ds := Service{
notifier: &mockNotifier{},
minMaxSpanDetector: &attestations.MockSpanDetector{},
attesterSlashingsFeed: new(event.Feed),
cfg: &Config{
Notifier: &mockNotifier{},
AttesterSlashingsFeed: new(event.Feed),
},
minMaxSpanDetector: &attestations.MockSpanDetector{},
}
att := &ethpb.IndexedAttestation{
Data: &ethpb.AttestationData{

View File

@@ -48,20 +48,14 @@ func (s Status) String() string {
// Service struct for the detection service of the slasher.
type Service struct {
ctx context.Context
cancel context.CancelFunc
slasherDB db.Database
blocksChan chan *ethpb.SignedBeaconBlock
attsChan chan *ethpb.IndexedAttestation
notifier beaconclient.Notifier
chainFetcher beaconclient.ChainFetcher
beaconClient *beaconclient.Service
attesterSlashingsFeed *event.Feed
proposerSlashingsFeed *event.Feed
minMaxSpanDetector iface.SpanDetector
proposalsDetector proposerIface.ProposalsDetector
historicalDetection bool
status Status
cfg *Config
ctx context.Context
cancel context.CancelFunc
blocksChan chan *ethpb.SignedBeaconBlock
attsChan chan *ethpb.IndexedAttestation
minMaxSpanDetector iface.SpanDetector
proposalsDetector proposerIface.ProposalsDetector
status Status
}
// Config options for the detection service.
@@ -79,20 +73,14 @@ type Config struct {
func NewService(ctx context.Context, cfg *Config) *Service {
ctx, cancel := context.WithCancel(ctx)
return &Service{
ctx: ctx,
cancel: cancel,
notifier: cfg.Notifier,
chainFetcher: cfg.ChainFetcher,
slasherDB: cfg.SlasherDB,
beaconClient: cfg.BeaconClient,
blocksChan: make(chan *ethpb.SignedBeaconBlock, 1),
attsChan: make(chan *ethpb.IndexedAttestation, 1),
attesterSlashingsFeed: cfg.AttesterSlashingsFeed,
proposerSlashingsFeed: cfg.ProposerSlashingsFeed,
minMaxSpanDetector: attestations.NewSpanDetector(cfg.SlasherDB),
proposalsDetector: proposals.NewProposeDetector(cfg.SlasherDB),
historicalDetection: cfg.HistoricalDetection,
status: None,
cfg: cfg,
ctx: ctx,
cancel: cancel,
blocksChan: make(chan *ethpb.SignedBeaconBlock, 1),
attsChan: make(chan *ethpb.IndexedAttestation, 1),
minMaxSpanDetector: attestations.NewSpanDetector(cfg.SlasherDB),
proposalsDetector: proposals.NewProposeDetector(cfg.SlasherDB),
status: None,
}
}
@@ -117,12 +105,12 @@ func (s *Service) Start() {
// to be fully synced before proceeding.
s.status = Started
ch := make(chan bool)
sub := s.notifier.ClientReadyFeed().Subscribe(ch)
sub := s.cfg.Notifier.ClientReadyFeed().Subscribe(ch)
s.status = Syncing
<-ch
sub.Unsubscribe()
if s.historicalDetection {
if s.cfg.HistoricalDetection {
// The detection service runs detection on all historical
// chain data since genesis.
s.status = HistoricalDetection
@@ -130,8 +118,8 @@ func (s *Service) Start() {
}
s.status = Ready
// We listen to a stream of blocks and attestations from the beacon node.
go s.beaconClient.ReceiveBlocks(s.ctx)
go s.beaconClient.ReceiveAttestations(s.ctx)
go s.cfg.BeaconClient.ReceiveBlocks(s.ctx)
go s.cfg.BeaconClient.ReceiveAttestations(s.ctx)
// We subscribe to incoming blocks from the beacon node via
// our gRPC client to keep detecting slashable offenses.
go s.detectIncomingBlocks(s.ctx, s.blocksChan)
@@ -143,12 +131,12 @@ func (s *Service) detectHistoricalChainData(ctx context.Context) {
defer span.End()
// We fetch both the latest persisted chain head in our DB as well
// as the current chain head from the beacon node via gRPC.
latestStoredHead, err := s.slasherDB.ChainHead(ctx)
latestStoredHead, err := s.cfg.SlasherDB.ChainHead(ctx)
if err != nil {
log.WithError(err).Error("Could not retrieve chain head from DB")
return
}
currentChainHead, err := s.chainFetcher.ChainHead(ctx)
currentChainHead, err := s.cfg.ChainFetcher.ChainHead(ctx)
if err != nil {
log.WithError(err).Error("Cannot retrieve chain head from beacon node")
return
@@ -169,12 +157,12 @@ func (s *Service) detectHistoricalChainData(ctx context.Context) {
log.WithError(err).Errorf("Could not fetch attestations for epoch: %d", epoch)
return
}
indexedAtts, err := s.beaconClient.RequestHistoricalAttestations(ctx, epoch)
indexedAtts, err := s.cfg.BeaconClient.RequestHistoricalAttestations(ctx, epoch)
if err != nil {
log.WithError(err).Errorf("Could not fetch attestations for epoch: %d", epoch)
return
}
if err := s.slasherDB.SaveIndexedAttestations(ctx, indexedAtts); err != nil {
if err := s.cfg.SlasherDB.SaveIndexedAttestations(ctx, indexedAtts); err != nil {
log.WithError(err).Error("could not save indexed attestations")
return
}
@@ -201,13 +189,13 @@ func (s *Service) detectHistoricalChainData(ctx context.Context) {
}
}
latestStoredHead = &ethpb.ChainHead{HeadEpoch: epoch}
if err := s.slasherDB.SaveChainHead(ctx, latestStoredHead); err != nil {
if err := s.cfg.SlasherDB.SaveChainHead(ctx, latestStoredHead); err != nil {
log.WithError(err).Error("Could not persist chain head to disk")
}
storedEpoch = epoch
s.slasherDB.RemoveOldestFromCache(ctx)
s.cfg.SlasherDB.RemoveOldestFromCache(ctx)
if epoch == currentChainHead.HeadEpoch-1 {
currentChainHead, err = s.chainFetcher.ChainHead(ctx)
currentChainHead, err = s.cfg.ChainFetcher.ChainHead(ctx)
if err != nil {
log.WithError(err).Error("Cannot retrieve chain head from beacon node")
return
@@ -224,7 +212,7 @@ func (s *Service) submitAttesterSlashings(ctx context.Context, slashings []*ethp
ctx, span := trace.StartSpan(ctx, "detection.submitAttesterSlashings")
defer span.End()
for i := 0; i < len(slashings); i++ {
s.attesterSlashingsFeed.Send(slashings[i])
s.cfg.AttesterSlashingsFeed.Send(slashings[i])
}
}
@@ -238,6 +226,6 @@ func (s *Service) submitProposerSlashing(ctx context.Context, slashing *ethpb.Pr
"proposerIdxHeader1": slashing.Header_1.Header.ProposerIndex,
"proposerIdxHeader2": slashing.Header_2.Header.ProposerIndex,
}).Info("Found a proposer slashing! Submitting to beacon node")
s.proposerSlashingsFeed.Send(slashing)
s.cfg.ProposerSlashingsFeed.Send(slashing)
}
}

View File

@@ -26,18 +26,12 @@ import (
// Service defines a server implementation of the gRPC Slasher service,
// providing RPC endpoints for retrieving slashing proofs for malicious validators.
type Service struct {
cfg *Config
ctx context.Context
cancel context.CancelFunc
host string
port string
detector *detection.Service
listener net.Listener
grpcServer *grpc.Server
slasherDB db.Database
withCert string
withKey string
credentialError error
beaconclient *beaconclient.Service
}
// Config options for the slasher node RPC server.
@@ -56,21 +50,15 @@ type Config struct {
func NewService(ctx context.Context, cfg *Config) *Service {
ctx, cancel := context.WithCancel(ctx)
return &Service{
ctx: ctx,
cancel: cancel,
host: cfg.Host,
port: cfg.Port,
detector: cfg.Detector,
slasherDB: cfg.SlasherDB,
withCert: cfg.CertFlag,
withKey: cfg.KeyFlag,
beaconclient: cfg.BeaconClient,
cfg: cfg,
ctx: ctx,
cancel: cancel,
}
}
// Start the gRPC service.
func (s *Service) Start() {
address := fmt.Sprintf("%s:%s", s.host, s.port)
address := fmt.Sprintf("%s:%s", s.cfg.Host, s.cfg.Port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.Errorf("Could not listen to port in Start() %s: %v", address, err)
@@ -98,8 +86,8 @@ func (s *Service) Start() {
grpc_prometheus.EnableHandlingTimeHistogram()
// TODO(#791): Utilize a certificate for secure connections
// between beacon nodes and validator clients.
if s.withCert != "" && s.withKey != "" {
creds, err := credentials.NewServerTLSFromFile(s.withCert, s.withKey)
if s.cfg.CertFlag != "" && s.cfg.KeyFlag != "" {
creds, err := credentials.NewServerTLSFromFile(s.cfg.CertFlag, s.cfg.KeyFlag)
if err != nil {
log.Errorf("Could not load TLS keys: %s", err)
s.credentialError = err
@@ -114,9 +102,9 @@ func (s *Service) Start() {
slasherServer := &Server{
ctx: s.ctx,
detector: s.detector,
slasherDB: s.slasherDB,
beaconClient: s.beaconclient,
detector: s.cfg.Detector,
slasherDB: s.cfg.SlasherDB,
beaconClient: s.cfg.BeaconClient,
}
slashpb.RegisterSlasherServer(s.grpcServer, slasherServer)
@@ -149,8 +137,8 @@ func (s *Service) Status() error {
if s.credentialError != nil {
return s.credentialError
}
if bs := s.beaconclient.Status(); bs != nil {
if bs := s.cfg.BeaconClient.Status(); bs != nil {
return bs
}
return s.detector.Status()
return s.cfg.Detector.Status()
}

View File

@@ -22,16 +22,12 @@ import (
// Service represents a service to manage the validator
// ￿slashing protection.
type Service struct {
ctx context.Context
cancel context.CancelFunc
conn *grpc.ClientConn
endpoint string
withCert string
maxCallRecvMsgSize int
grpcRetries uint
grpcHeaders []string
slasherClient ethsl.SlasherClient
grpcRetryDelay time.Duration
cfg *Config
ctx context.Context
cancel context.CancelFunc
conn *grpc.ClientConn
grpcHeaders []string
slasherClient ethsl.SlasherClient
}
// Config for the validator service.
@@ -49,20 +45,16 @@ type Config struct {
func NewService(ctx context.Context, cfg *Config) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
return &Service{
ctx: ctx,
cancel: cancel,
endpoint: cfg.Endpoint,
withCert: cfg.CertFlag,
maxCallRecvMsgSize: cfg.GrpcMaxCallRecvMsgSizeFlag,
grpcRetries: cfg.GrpcRetriesFlag,
grpcRetryDelay: cfg.GrpcRetryDelay,
grpcHeaders: strings.Split(cfg.GrpcHeadersFlag, ","),
cfg: cfg,
ctx: ctx,
cancel: cancel,
grpcHeaders: strings.Split(cfg.GrpcHeadersFlag, ","),
}, nil
}
// Start the slasher protection service and grpc client.
func (s *Service) Start() {
if s.endpoint != "" {
if s.cfg.Endpoint != "" {
s.slasherClient = s.startSlasherClient()
}
}
@@ -70,8 +62,8 @@ func (s *Service) Start() {
func (s *Service) startSlasherClient() ethsl.SlasherClient {
var dialOpt grpc.DialOption
if s.withCert != "" {
creds, err := credentials.NewClientTLSFromFile(s.withCert, "")
if s.cfg.CertFlag != "" {
creds, err := credentials.NewClientTLSFromFile(s.cfg.CertFlag, "")
if err != nil {
log.Errorf("Could not get valid slasher credentials: %v", err)
return nil
@@ -87,8 +79,8 @@ func (s *Service) startSlasherClient() ethsl.SlasherClient {
opts := []grpc.DialOption{
dialOpt,
grpc.WithDefaultCallOptions(
grpc_retry.WithMax(s.grpcRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffLinear(s.grpcRetryDelay)),
grpc_retry.WithMax(s.cfg.GrpcRetriesFlag),
grpc_retry.WithBackoff(grpc_retry.BackoffLinear(s.cfg.GrpcRetryDelay)),
),
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
grpc.WithStreamInterceptor(middleware.ChainStreamClient(
@@ -103,9 +95,9 @@ func (s *Service) startSlasherClient() ethsl.SlasherClient {
grpcutils.LogRequests,
)),
}
conn, err := grpc.DialContext(s.ctx, s.endpoint, opts...)
conn, err := grpc.DialContext(s.ctx, s.cfg.Endpoint, opts...)
if err != nil {
log.Errorf("Could not dial slasher endpoint: %s, %v", s.endpoint, err)
log.Errorf("Could not dial slasher endpoint: %s, %v", s.cfg.Endpoint, err)
return nil
}
log.Debug("Successfully started slasher gRPC connection")
@@ -131,7 +123,7 @@ func (s *Service) Status() error {
return errors.New("no connection to slasher RPC")
}
if s.conn.GetState() != connectivity.Ready {
return fmt.Errorf("can`t connect to slasher server at: %v connection status: %v ", s.endpoint, s.conn.GetState())
return fmt.Errorf("can`t connect to slasher server at: %v connection status: %v ", s.cfg.Endpoint, s.conn.GetState())
}
return nil
}

View File

@@ -11,6 +11,7 @@ import (
func TestGrpcHeaders(t *testing.T) {
s := &Service{
cfg: &Config{},
ctx: context.Background(),
grpcHeaders: []string{"first=value1", "second=value2"},
}