mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 21:38:05 -05:00
Fix locks and fallback to db read if attestation history map is missing a pub key data (#7937)
* minimal change to handle nil attesterHistoryByPubKey
* Revert "Always Update Attesting History If Not Slashable (#7935)"
This reverts commit 3cc2ebc5d5.
* remove unused functions
* move save before propose
* wait before go func
* move wait into the go routine
* handling map mutation
* remove map handling in this case
* log in case it is still not found
* fix log
* fix locks
* Update validator/client/attest_protect.go
* remove code duplication
* remove method extraction
* move metrics to their appropriate place
Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
This commit is contained in:
@@ -113,11 +113,9 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot uint64, pubKey [
|
||||
).Debug("Attempted slashable attestation details")
|
||||
return
|
||||
}
|
||||
|
||||
if err := v.SaveProtection(ctx, pubKey); err != nil {
|
||||
log.WithError(err).Errorf("Could not save validator: %#x protection", pubKey)
|
||||
}
|
||||
|
||||
attResp, err := v.validatorClient.ProposeAttestation(ctx, attestation)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not submit attestation to beacon node")
|
||||
|
||||
@@ -19,36 +19,43 @@ var failedPostAttSignExternalErr = "external slasher service detected a submitte
|
||||
|
||||
func (v *validator) preAttSignValidations(ctx context.Context, indexedAtt *ethpb.IndexedAttestation, pubKey [48]byte) error {
|
||||
fmtKey := fmt.Sprintf("%#x", pubKey[:])
|
||||
|
||||
v.attesterHistoryByPubKeyLock.RLock()
|
||||
attesterHistory, ok := v.attesterHistoryByPubKey[pubKey]
|
||||
v.attesterHistoryByPubKeyLock.RUnlock()
|
||||
if !ok {
|
||||
AttestationMapMiss.Inc()
|
||||
attesterHistoryMap, err := v.db.AttestationHistoryForPubKeysV2(ctx, [][48]byte{pubKey})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get attester history")
|
||||
}
|
||||
attesterHistory, ok = attesterHistoryMap[pubKey]
|
||||
if !ok {
|
||||
log.WithField("publicKey", fmtKey).Debug("Could not get local slashing protection data for validator in pre validation")
|
||||
}
|
||||
} else {
|
||||
AttestationMapHit.Inc()
|
||||
}
|
||||
_, sr, err := v.getDomainAndSigningRoot(ctx, indexedAtt.Data)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get domain and signing root from attestation")
|
||||
return err
|
||||
}
|
||||
if ok {
|
||||
slashable, err := isNewAttSlashable(
|
||||
ctx,
|
||||
attesterHistory,
|
||||
indexedAtt.Data.Source.Epoch,
|
||||
indexedAtt.Data.Target.Epoch,
|
||||
sr,
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not check if attestation is slashable")
|
||||
}
|
||||
if slashable {
|
||||
if v.emitAccountMetrics {
|
||||
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()
|
||||
}
|
||||
return errors.New(failedAttLocalProtectionErr)
|
||||
}
|
||||
} else {
|
||||
log.WithField("publicKey", fmtKey).Debug("Could not get local slashing protection data for validator in pre validation")
|
||||
slashable, err := isNewAttSlashable(
|
||||
ctx,
|
||||
attesterHistory,
|
||||
indexedAtt.Data.Source.Epoch,
|
||||
indexedAtt.Data.Target.Epoch,
|
||||
sr,
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not check if attestation is slashable")
|
||||
}
|
||||
if slashable {
|
||||
if v.emitAccountMetrics {
|
||||
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()
|
||||
}
|
||||
return errors.New(failedAttLocalProtectionErr)
|
||||
}
|
||||
|
||||
if featureconfig.Get().SlasherProtection && v.protector != nil {
|
||||
if !v.protector.CheckAttestationSafety(ctx, indexedAtt) {
|
||||
if v.emitAccountMetrics {
|
||||
@@ -64,33 +71,39 @@ func (v *validator) postAttSignUpdate(ctx context.Context, indexedAtt *ethpb.Ind
|
||||
fmtKey := fmt.Sprintf("%#x", pubKey[:])
|
||||
v.attesterHistoryByPubKeyLock.Lock()
|
||||
defer v.attesterHistoryByPubKeyLock.Unlock()
|
||||
var newHistory kv.EncHistoryData
|
||||
attesterHistory, ok := v.attesterHistoryByPubKey[pubKey]
|
||||
if ok {
|
||||
slashable, err := isNewAttSlashable(
|
||||
ctx,
|
||||
attesterHistory,
|
||||
indexedAtt.Data.Source.Epoch,
|
||||
indexedAtt.Data.Target.Epoch,
|
||||
signingRoot,
|
||||
)
|
||||
if !ok {
|
||||
AttestationMapMiss.Inc()
|
||||
attesterHistoryMap, err := v.db.AttestationHistoryForPubKeysV2(ctx, [][48]byte{pubKey})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not check if attestation is slashable")
|
||||
return errors.Wrap(err, "could not get attester history")
|
||||
}
|
||||
if slashable {
|
||||
if v.emitAccountMetrics {
|
||||
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()
|
||||
}
|
||||
return errors.New(failedAttLocalProtectionErr)
|
||||
attesterHistory, ok = attesterHistoryMap[pubKey]
|
||||
if !ok {
|
||||
log.WithField("publicKey", fmtKey).Debug("Could not get local slashing protection data for validator in post validation")
|
||||
}
|
||||
newHistory = attesterHistory
|
||||
} else {
|
||||
log.WithField("publicKey", fmtKey).Debug("Could not get local slashing protection data for validator in post validation")
|
||||
newHistory = kv.NewAttestationHistoryArray(indexedAtt.Data.Target.Epoch)
|
||||
AttestationMapHit.Inc()
|
||||
}
|
||||
updatedHistory, err := kv.MarkAllAsAttestedSinceLatestWrittenEpoch(
|
||||
slashable, err := isNewAttSlashable(
|
||||
ctx,
|
||||
newHistory,
|
||||
attesterHistory,
|
||||
indexedAtt.Data.Source.Epoch,
|
||||
indexedAtt.Data.Target.Epoch,
|
||||
signingRoot,
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not check if attestation is slashable")
|
||||
}
|
||||
if slashable {
|
||||
if v.emitAccountMetrics {
|
||||
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()
|
||||
}
|
||||
return errors.New(failedAttLocalProtectionErr)
|
||||
}
|
||||
newHistory, err := kv.MarkAllAsAttestedSinceLatestWrittenEpoch(
|
||||
ctx,
|
||||
attesterHistory,
|
||||
indexedAtt.Data.Target.Epoch,
|
||||
&kv.HistoryData{
|
||||
Source: indexedAtt.Data.Source.Epoch,
|
||||
@@ -100,7 +113,7 @@ func (v *validator) postAttSignUpdate(ctx context.Context, indexedAtt *ethpb.Ind
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "could not mark epoch %d as attested", indexedAtt.Data.Target.Epoch)
|
||||
}
|
||||
v.attesterHistoryByPubKey[pubKey] = updatedHistory
|
||||
v.attesterHistoryByPubKey[pubKey] = newHistory
|
||||
|
||||
if featureconfig.Get().SlasherProtection && v.protector != nil {
|
||||
if !v.protector.CommitAttestation(ctx, indexedAtt) {
|
||||
|
||||
@@ -14,6 +14,18 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// AttestationMapMiss used to track the success rate of historical
|
||||
// attestation map for slashing detection flow.
|
||||
AttestationMapHit = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "attestation_history_map_hit",
|
||||
Help: "The number of attestation history calls that are present in the map.",
|
||||
})
|
||||
// AttestationMapMiss used to track the use of the fallback db read when
|
||||
// attestation map is being mutated while being used in the slashing detection flow.
|
||||
AttestationMapMiss = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "attestation_history_map_miss",
|
||||
Help: "The number of attestation history calls that are'nt present in the map.",
|
||||
})
|
||||
// ValidatorStatusesGaugeVec used to track validator statuses by public key.
|
||||
ValidatorStatusesGaugeVec = promauto.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
|
||||
@@ -119,12 +119,6 @@ func (fv *FakeValidator) LogValidatorGainsAndLosses(_ context.Context, _ uint64)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SaveProtections for mocking.
|
||||
func (fv *FakeValidator) SaveProtections(_ context.Context) error {
|
||||
fv.SaveProtectionsCalled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResetAttesterProtectionData for mocking.
|
||||
func (fv *FakeValidator) ResetAttesterProtectionData() {
|
||||
fv.DeleteProtectionCalled = true
|
||||
|
||||
@@ -33,7 +33,6 @@ type Validator interface {
|
||||
ProposeBlock(ctx context.Context, slot uint64, pubKey [48]byte)
|
||||
SubmitAggregateAndProof(ctx context.Context, slot uint64, pubKey [48]byte)
|
||||
LogAttestationsSubmitted()
|
||||
SaveProtections(ctx context.Context) error
|
||||
ResetAttesterProtectionData()
|
||||
UpdateDomainDataCaches(ctx context.Context, slot uint64)
|
||||
WaitForWalletInitialization(ctx context.Context) error
|
||||
@@ -155,11 +154,11 @@ func run(ctx context.Context, v Validator) {
|
||||
}
|
||||
}
|
||||
// Wait for all processes to complete, then report span complete.
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
v.ResetAttesterProtectionData()
|
||||
v.LogAttestationsSubmitted()
|
||||
// Log this client performance in the previous epoch
|
||||
v.LogAttestationsSubmitted()
|
||||
if err := v.LogValidatorGainsAndLosses(slotCtx, slot); err != nil {
|
||||
log.WithError(err).Error("Could not report validator's rewards/penalties")
|
||||
}
|
||||
|
||||
@@ -525,20 +525,6 @@ func (v *validator) UpdateProtections(ctx context.Context, slot uint64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SaveProtections saves the attestation information currently in validator state.
|
||||
func (v *validator) SaveProtections(ctx context.Context) error {
|
||||
v.attesterHistoryByPubKeyLock.RLock()
|
||||
if err := v.db.SaveAttestationHistoryForPubKeysV2(ctx, v.attesterHistoryByPubKey); err != nil {
|
||||
return errors.Wrap(err, "could not save attester history to DB")
|
||||
}
|
||||
v.attesterHistoryByPubKeyLock.RUnlock()
|
||||
v.attesterHistoryByPubKeyLock.Lock()
|
||||
v.attesterHistoryByPubKey = make(map[[48]byte]kv.EncHistoryData)
|
||||
v.attesterHistoryByPubKeyLock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResetAttesterProtectionData reset validators protection data.
|
||||
func (v *validator) ResetAttesterProtectionData() {
|
||||
v.attesterHistoryByPubKeyLock.Lock()
|
||||
@@ -549,11 +535,11 @@ func (v *validator) ResetAttesterProtectionData() {
|
||||
// SaveProtection saves the attestation information currently in validator state.
|
||||
func (v *validator) SaveProtection(ctx context.Context, pubKey [48]byte) error {
|
||||
v.attesterHistoryByPubKeyLock.RLock()
|
||||
|
||||
defer v.attesterHistoryByPubKeyLock.RUnlock()
|
||||
if err := v.db.SaveAttestationHistoryForPubKeyV2(ctx, pubKey, v.attesterHistoryByPubKey[pubKey]); err != nil {
|
||||
return errors.Wrapf(err, "could not save attester with public key %#x history to DB", pubKey)
|
||||
}
|
||||
v.attesterHistoryByPubKeyLock.RUnlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -784,51 +784,6 @@ func TestUpdateProtections_OK(t *testing.T) {
|
||||
require.DeepEqual(t, history2, v.attesterHistoryByPubKey[pubKey2], "Unexpected retrieved history")
|
||||
}
|
||||
|
||||
func TestSaveProtections_OK(t *testing.T) {
|
||||
pubKey1 := [48]byte{1}
|
||||
pubKey2 := [48]byte{2}
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
client := mock.NewMockBeaconNodeValidatorClient(ctrl)
|
||||
db := dbTest.SetupDB(t, [][48]byte{pubKey1, pubKey2})
|
||||
ctx := context.Background()
|
||||
|
||||
cleanHistories, err := db.AttestationHistoryForPubKeysV2(context.Background(), [][48]byte{pubKey1, pubKey2})
|
||||
require.NoError(t, err)
|
||||
v := validator{
|
||||
db: db,
|
||||
validatorClient: client,
|
||||
attesterHistoryByPubKey: cleanHistories,
|
||||
}
|
||||
|
||||
sr := [32]byte{1}
|
||||
history1 := cleanHistories[pubKey1]
|
||||
newHist, err := kv.MarkAllAsAttestedSinceLatestWrittenEpoch(ctx, history1, 1, &kv.HistoryData{
|
||||
Source: 0,
|
||||
SigningRoot: sr[:],
|
||||
})
|
||||
require.NoError(t, err)
|
||||
history1 = newHist
|
||||
|
||||
sr2 := [32]byte{2}
|
||||
history2, err := kv.MarkAllAsAttestedSinceLatestWrittenEpoch(ctx, history1, 3, &kv.HistoryData{
|
||||
Source: 2,
|
||||
SigningRoot: sr2[:],
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
cleanHistories[pubKey1] = history1
|
||||
cleanHistories[pubKey2] = history2
|
||||
|
||||
v.attesterHistoryByPubKey = cleanHistories
|
||||
require.NoError(t, v.SaveProtections(context.Background()), "Could not update assignments")
|
||||
savedHistories, err := db.AttestationHistoryForPubKeysV2(context.Background(), [][48]byte{pubKey1, pubKey2})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.DeepEqual(t, history1, savedHistories[pubKey1], "Unexpected retrieved history")
|
||||
require.DeepEqual(t, history2, savedHistories[pubKey2], "Unexpected retrieved history")
|
||||
}
|
||||
|
||||
func TestSaveProtection_OK(t *testing.T) {
|
||||
pubKey1 := [48]byte{1}
|
||||
ctrl := gomock.NewController(t)
|
||||
|
||||
@@ -189,7 +189,8 @@ func (store *Store) AttestationHistoryForPubKeysV2(ctx context.Context, publicKe
|
||||
if len(enc) == 0 {
|
||||
attestationHistory = NewAttestationHistoryArray(0)
|
||||
} else {
|
||||
attestationHistory = enc
|
||||
attestationHistory = make(EncHistoryData, len(enc))
|
||||
copy(attestationHistory, enc)
|
||||
}
|
||||
attestationHistoryForVals[key] = attestationHistory
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user