Validator: Add trace span information for AttestationRecord save requests (#11742)

* Add additional span information when saving attestation records

* Remove keymanager.sign span. It's very noisy and not a helpful data point.

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Preston Van Loon
2022-12-13 18:06:32 -06:00
committed by GitHub
parent 2e49fdb3d2
commit 5ed52b1e44
3 changed files with 35 additions and 12 deletions

View File

@@ -20,6 +20,13 @@ import (
// SlashingKind used for helpful information upon detection.
type SlashingKind int
// AttestationRecordSaveRequest includes the attestation record to save along
// with the appropriate call context.
type AttestationRecordSaveRequest struct {
ctx context.Context
record *AttestationRecord
}
// AttestationRecord which can be represented by these simple values
// for manipulation by database methods.
type AttestationRecord struct {
@@ -304,17 +311,23 @@ func (s *Store) SaveAttestationForPubKey(
) error {
ctx, span := trace.StartSpan(ctx, "Validator.SaveAttestationForPubKey")
defer span.End()
s.batchedAttestationsChan <- &AttestationRecord{
PubKey: pubKey,
Source: att.Data.Source.Epoch,
Target: att.Data.Target.Epoch,
SigningRoot: signingRoot,
s.batchedAttestationsChan <- &AttestationRecordSaveRequest{
ctx: ctx,
record: &AttestationRecord{
PubKey: pubKey,
Source: att.Data.Source.Epoch,
Target: att.Data.Target.Epoch,
SigningRoot: signingRoot,
},
}
// Subscribe to be notified when the attestation record queued
// for saving to the DB is indeed saved. If an error occurred
// during the process of saving the attestation record, the sender
// will give us that error. We use a buffered channel
// to prevent blocking the sender from notifying us of the result.
_, innerSpan := trace.StartSpan(ctx, "Validator.SaveAttestationForPubKey.WaitForResponse")
defer innerSpan.End()
responseChan := make(chan saveAttestationsResponse, 1)
defer close(responseChan)
sub := s.batchAttestationsFlushedFeed.Subscribe(responseChan)
@@ -335,15 +348,24 @@ func (s *Store) batchAttestationWrites(ctx context.Context) {
for {
select {
case v := <-s.batchedAttestationsChan:
s.batchedAttestations.Append(v)
_, span := trace.StartSpan(v.ctx, "batchAttestationWrites.handleBatchedAttestationSaveRequest")
s.batchedAttestations.Append(v.record)
span.AddAttributes(trace.Int64Attribute("num_records", int64(s.batchedAttestations.Len())))
if numRecords := s.batchedAttestations.Len(); numRecords >= attestationBatchCapacity {
log.WithField("numRecords", numRecords).Debug(
"Reached max capacity of batched attestation records, flushing to DB",
)
if s.batchedAttestationsFlushInProgress.IsNotSet() {
s.flushAttestationRecords(ctx, s.batchedAttestations.Flush())
// Create a new context with the span information from the chan. This is to
// prevent any context deadlines from the caller while maintaining the trace
// relationships.
ctx2 := trace.NewContext(ctx, span)
s.flushAttestationRecords(ctx2, s.batchedAttestations.Flush())
}
}
span.End()
case <-ticker.C:
if numRecords := s.batchedAttestations.Len(); numRecords > 0 {
log.WithField("numRecords", numRecords).Debug(
@@ -364,6 +386,9 @@ func (s *Store) batchAttestationWrites(ctx context.Context) {
// This function notifies all subscribers for flushed attestations
// of the result of the save operation.
func (s *Store) flushAttestationRecords(ctx context.Context, records []*AttestationRecord) {
ctx, span := trace.StartSpan(ctx, "validatorDB.flushAttestationRecords")
defer span.End()
if s.batchedAttestationsFlushInProgress.IsSet() {
// This should never happen. This method should not be called when a flush is already in
// progress. If you are seeing this log, check the atomic bool before calling this method.
@@ -381,6 +406,7 @@ func (s *Store) flushAttestationRecords(ctx context.Context, records []*Attestat
} else {
// This should never happen.
log.WithError(err).Error("Failed to batch save attestation records, retrying in queue")
tracing.AnnotateError(span, err)
for _, ar := range records {
s.batchedAttestations.Append(ar)
}

View File

@@ -64,7 +64,7 @@ type Store struct {
db *bolt.DB
databasePath string
batchedAttestations *QueuedAttestationRecords
batchedAttestationsChan chan *AttestationRecord
batchedAttestationsChan chan *AttestationRecordSaveRequest
batchAttestationsFlushedFeed *event.Feed
batchedAttestationsFlushInProgress abool.AtomicBool
}
@@ -134,7 +134,7 @@ func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, er
db: boltDB,
databasePath: dirPath,
batchedAttestations: NewQueuedAttestationRecords(),
batchedAttestationsChan: make(chan *AttestationRecord, attestationBatchCapacity),
batchedAttestationsChan: make(chan *AttestationRecordSaveRequest, attestationBatchCapacity),
batchAttestationsFlushedFeed: new(event.Feed),
}

View File

@@ -200,9 +200,6 @@ func (km *Keymanager) FetchValidatingPrivateKeys(ctx context.Context) ([][32]byt
// Sign signs a message using a validator key.
func (_ *Keymanager) Sign(ctx context.Context, req *validatorpb.SignRequest) (bls.Signature, error) {
ctx, span := trace.StartSpan(ctx, "keymanager.Sign")
defer span.End()
publicKey := req.PublicKey
if publicKey == nil {
return nil, errors.New("nil public key in request")