Validator: Safer pending attestation records flushing (#8433)

* Add a debug log to show duration

* merge from dev

* use safe pending attestation records struct

* fix build, use atomic bool

* Add deadline checks to CheckSlashableAttestation

* Go fmt

* Add test for in-progress log

* GoDocs

* Rename pending attestation records to queued attestation records

* rename and add commentary on log

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Preston Van Loon
2021-02-12 14:19:01 -06:00
committed by GitHub
parent 473172ca8b
commit e2c5ae53e7
4 changed files with 122 additions and 20 deletions

View File

@@ -21,6 +21,7 @@ go_library(
visibility = ["//validator:__subpackages__"],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//shared/abool:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/event:go_default_library",
"//shared/fileutil:go_default_library",

View File

@@ -3,6 +3,7 @@ package kv
import (
"context"
"fmt"
"sync"
"time"
"github.com/pkg/errors"
@@ -26,6 +27,45 @@ type AttestationRecord struct {
SigningRoot [32]byte
}
// NewQueuedAttestationRecords constructor allocates the underlying slice and
// required attributes for managing pending attestation records.
func NewQueuedAttestationRecords() *QueuedAttestationRecords {
return &QueuedAttestationRecords{
records: make([]*AttestationRecord, 0, attestationBatchCapacity),
}
}
// QueuedAttestationRecords is a thread-safe struct for managing a queue of
// attestation records to save to validator database.
type QueuedAttestationRecords struct {
records []*AttestationRecord
lock sync.RWMutex
}
// Append a new attestation record to the queue.
func (p *QueuedAttestationRecords) Append(ar *AttestationRecord) {
p.lock.Lock()
defer p.lock.Unlock()
p.records = append(p.records, ar)
}
// Flush all records. This method returns the current pending records and resets
// the pending records slice.
func (p *QueuedAttestationRecords) Flush() []*AttestationRecord {
p.lock.Lock()
defer p.lock.Unlock()
recs := p.records
p.records = make([]*AttestationRecord, 0, attestationBatchCapacity)
return recs
}
// Len returns the current length of records.
func (p *QueuedAttestationRecords) Len() int {
p.lock.RLock()
defer p.lock.RUnlock()
return len(p.records)
}
// A wrapper over an error received from a background routine
// saving batched attestations for slashing protection.
// This wrapper allows us to send this response over event feeds,
@@ -97,6 +137,9 @@ func (s *Store) CheckSlashableAttestation(
defer span.End()
var slashKind SlashingKind
err := s.view(func(tx *bolt.Tx) error {
if ctx.Err() != nil {
return ctx.Err()
}
bucket := tx.Bucket(pubKeysBucket)
pkBucket := bucket.Bucket(pubKey[:])
if pkBucket == nil {
@@ -124,6 +167,10 @@ func (s *Store) CheckSlashableAttestation(
}
// Check for surround votes.
return sourceEpochsBucket.ForEach(func(sourceEpochBytes []byte, targetEpochsBytes []byte) error {
if ctx.Err() != nil {
return ctx.Err()
}
existingSourceEpoch := bytesutil.BytesToEpochBigEndian(sourceEpochBytes)
// There can be multiple target epochs attested per source epoch.
@@ -234,19 +281,23 @@ func (s *Store) batchAttestationWrites(ctx context.Context) {
for {
select {
case v := <-s.batchedAttestationsChan:
s.batchedAttestations = append(s.batchedAttestations, v)
if len(s.batchedAttestations) == attestationBatchCapacity {
log.WithField("numRecords", attestationBatchCapacity).Debug(
s.batchedAttestations.Append(v)
if numRecords := s.batchedAttestations.Len(); numRecords >= attestationBatchCapacity {
log.WithField("numRecords", numRecords).Debug(
"Reached max capacity of batched attestation records, flushing to DB",
)
s.flushAttestationRecords(ctx)
if s.batchedAttestationsFlushInProgress.IsNotSet() {
s.flushAttestationRecords(ctx, s.batchedAttestations.Flush())
}
}
case <-ticker.C:
if len(s.batchedAttestations) > 0 {
log.WithField("numRecords", len(s.batchedAttestations)).Debug(
if numRecords := s.batchedAttestations.Len(); numRecords > 0 {
log.WithField("numRecords", numRecords).Debug(
"Batched attestation records write interval reached, flushing to DB",
)
s.flushAttestationRecords(ctx)
if s.batchedAttestationsFlushInProgress.IsNotSet() {
s.flushAttestationRecords(ctx, s.batchedAttestations.Flush())
}
}
case <-ctx.Done():
return
@@ -258,13 +309,27 @@ func (s *Store) batchAttestationWrites(ctx context.Context) {
// and resets the list of batched attestations for future writes.
// This function notifies all subscribers for flushed attestations
// of the result of the save operation.
func (s *Store) flushAttestationRecords(ctx context.Context) {
func (s *Store) flushAttestationRecords(ctx context.Context, records []*AttestationRecord) {
if s.batchedAttestationsFlushInProgress.IsSet() {
// This should never happen. This method should not be called when a flush is already in
// progress. If you are seeing this log, check the atomic bool before calling this method.
log.Error("Attempted to flush attestation records when already in progress")
return
}
s.batchedAttestationsFlushInProgress.Set()
defer s.batchedAttestationsFlushInProgress.UnSet()
start := time.Now()
err := s.saveAttestationRecords(ctx, s.batchedAttestations)
// If there was no error, we reset the batched attestations slice.
err := s.saveAttestationRecords(ctx, records)
// If there was any error, retry the records since the TX would have been reverted.
if err == nil {
log.WithField("duration", time.Since(start)).Debug("Successfully flushed batched attestations to DB")
s.batchedAttestations = make([]*AttestationRecord, 0, attestationBatchCapacity)
} else {
// This should never happen.
log.WithError(err).Error("Failed to batch save attestation records, retrying in queue")
for _, ar := range records {
s.batchedAttestations.Append(ar)
}
}
// Forward the error, if any, to all subscribers via an event feed.
// We use a struct wrapper around the error as the event feed

View File

@@ -8,7 +8,7 @@ import (
"sync"
"testing"
"github.com/prysmaticlabs/eth2-types"
types "github.com/prysmaticlabs/eth2-types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
@@ -17,6 +17,31 @@ import (
bolt "go.etcd.io/bbolt"
)
func TestPendingAttestationRecords_Flush(t *testing.T) {
queue := NewQueuedAttestationRecords()
// Add 5 atts
num := 5
for i := 0; i < num; i++ {
queue.Append(&AttestationRecord{
Target: types.Epoch(i),
})
}
res := queue.Flush()
assert.Equal(t, len(res), num, "Wrong number of flushed attestations")
assert.Equal(t, len(queue.records), 0, "Records were not cleared/flushed")
}
func TestPendingAttestationRecords_Len(t *testing.T) {
queue := NewQueuedAttestationRecords()
assert.Equal(t, queue.Len(), 0)
queue.Append(&AttestationRecord{})
assert.Equal(t, queue.Len(), 1)
queue.Flush()
assert.Equal(t, queue.Len(), 0)
}
func TestStore_CheckSlashableAttestation_DoubleVote(t *testing.T) {
ctx := context.Background()
numValidators := 1
@@ -372,7 +397,7 @@ func TestSaveAttestationForPubKey_BatchWrites_FullCapacity(t *testing.T) {
require.LogsContain(t, hook, "Reached max capacity of batched attestation records")
require.LogsDoNotContain(t, hook, "Batched attestation records write interval reached")
require.LogsContain(t, hook, "Successfully flushed batched attestations to DB")
require.Equal(t, 0, len(validatorDB.batchedAttestations))
require.Equal(t, 0, validatorDB.batchedAttestations.Len())
// We then verify all the data we wanted to save is indeed saved to disk.
err := validatorDB.view(func(tx *bolt.Tx) error {
@@ -429,7 +454,7 @@ func TestSaveAttestationForPubKey_BatchWrites_LowCapacity_TimerReached(t *testin
require.LogsDoNotContain(t, hook, "Reached max capacity of batched attestation records")
require.LogsContain(t, hook, "Batched attestation records write interval reached")
require.LogsContain(t, hook, "Successfully flushed batched attestations to DB")
require.Equal(t, 0, len(validatorDB.batchedAttestations))
require.Equal(t, 0, validatorDB.batchedAttestations.Len())
// We then verify all the data we wanted to save is indeed saved to disk.
err := validatorDB.view(func(tx *bolt.Tx) error {
@@ -540,3 +565,12 @@ func createAttestation(source, target types.Epoch) *ethpb.IndexedAttestation {
},
}
}
func TestStore_flushAttestationRecords_InProgress(t *testing.T) {
s := &Store{}
s.batchedAttestationsFlushInProgress.Set()
hook := logTest.NewGlobal()
s.flushAttestationRecords(context.Background(), nil)
assert.LogsContain(t, hook, "Attempted to flush attestation records when already in progress")
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
prombolt "github.com/prysmaticlabs/prombbolt"
"github.com/prysmaticlabs/prysm/shared/abool"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/fileutil"
"github.com/prysmaticlabs/prysm/shared/params"
@@ -50,11 +51,12 @@ var blockedBuckets = [][]byte{
// Store defines an implementation of the Prysm Database interface
// using BoltDB as the underlying persistent kv-store for eth2.
type Store struct {
db *bolt.DB
databasePath string
batchedAttestations []*AttestationRecord
batchedAttestationsChan chan *AttestationRecord
batchAttestationsFlushedFeed *event.Feed
db *bolt.DB
databasePath string
batchedAttestations *QueuedAttestationRecords
batchedAttestationsChan chan *AttestationRecord
batchAttestationsFlushedFeed *event.Feed
batchedAttestationsFlushInProgress abool.AtomicBool
}
// Close closes the underlying boltdb database.
@@ -118,7 +120,7 @@ func NewKVStore(ctx context.Context, dirPath string, pubKeys [][48]byte) (*Store
kv := &Store{
db: boltDB,
databasePath: dirPath,
batchedAttestations: make([]*AttestationRecord, 0, attestationBatchCapacity),
batchedAttestations: NewQueuedAttestationRecords(),
batchedAttestationsChan: make(chan *AttestationRecord, attestationBatchCapacity),
batchAttestationsFlushedFeed: new(event.Feed),
}