From 5ed52b1e4465f2998dfe686bd1e0eb588e1117ff Mon Sep 17 00:00:00 2001 From: Preston Van Loon Date: Tue, 13 Dec 2022 18:06:32 -0600 Subject: [PATCH] Validator: Add trace span information for AttestationRecord save requests (#11742) * Add additional span information when saving attestation records * Remove keymanager.sign span. It's very noisy and not a helpful data point. Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- validator/db/kv/attester_protection.go | 40 +++++++++++++++++++----- validator/db/kv/db.go | 4 +-- validator/keymanager/local/keymanager.go | 3 -- 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/validator/db/kv/attester_protection.go b/validator/db/kv/attester_protection.go index 92966586b8..9fed67376d 100644 --- a/validator/db/kv/attester_protection.go +++ b/validator/db/kv/attester_protection.go @@ -20,6 +20,13 @@ import ( // SlashingKind used for helpful information upon detection. type SlashingKind int +// AttestationRecordSaveRequest includes the attestation record to save along +// with the appropriate call context. +type AttestationRecordSaveRequest struct { + ctx context.Context + record *AttestationRecord +} + // AttestationRecord which can be represented by these simple values // for manipulation by database methods. type AttestationRecord struct { @@ -304,17 +311,23 @@ func (s *Store) SaveAttestationForPubKey( ) error { ctx, span := trace.StartSpan(ctx, "Validator.SaveAttestationForPubKey") defer span.End() - s.batchedAttestationsChan <- &AttestationRecord{ - PubKey: pubKey, - Source: att.Data.Source.Epoch, - Target: att.Data.Target.Epoch, - SigningRoot: signingRoot, + s.batchedAttestationsChan <- &AttestationRecordSaveRequest{ + ctx: ctx, + record: &AttestationRecord{ + PubKey: pubKey, + Source: att.Data.Source.Epoch, + Target: att.Data.Target.Epoch, + SigningRoot: signingRoot, + }, } + // Subscribe to be notified when the attestation record queued // for saving to the DB is indeed saved. If an error occurred // during the process of saving the attestation record, the sender // will give us that error. We use a buffered channel // to prevent blocking the sender from notifying us of the result. + _, innerSpan := trace.StartSpan(ctx, "Validator.SaveAttestationForPubKey.WaitForResponse") + defer innerSpan.End() responseChan := make(chan saveAttestationsResponse, 1) defer close(responseChan) sub := s.batchAttestationsFlushedFeed.Subscribe(responseChan) @@ -335,15 +348,24 @@ func (s *Store) batchAttestationWrites(ctx context.Context) { for { select { case v := <-s.batchedAttestationsChan: - s.batchedAttestations.Append(v) + _, span := trace.StartSpan(v.ctx, "batchAttestationWrites.handleBatchedAttestationSaveRequest") + s.batchedAttestations.Append(v.record) + + span.AddAttributes(trace.Int64Attribute("num_records", int64(s.batchedAttestations.Len()))) + if numRecords := s.batchedAttestations.Len(); numRecords >= attestationBatchCapacity { log.WithField("numRecords", numRecords).Debug( "Reached max capacity of batched attestation records, flushing to DB", ) if s.batchedAttestationsFlushInProgress.IsNotSet() { - s.flushAttestationRecords(ctx, s.batchedAttestations.Flush()) + // Create a new context with the span information from the chan. This is to + // prevent any context deadlines from the caller while maintaining the trace + // relationships. + ctx2 := trace.NewContext(ctx, span) + s.flushAttestationRecords(ctx2, s.batchedAttestations.Flush()) } } + span.End() case <-ticker.C: if numRecords := s.batchedAttestations.Len(); numRecords > 0 { log.WithField("numRecords", numRecords).Debug( @@ -364,6 +386,9 @@ func (s *Store) batchAttestationWrites(ctx context.Context) { // This function notifies all subscribers for flushed attestations // of the result of the save operation. func (s *Store) flushAttestationRecords(ctx context.Context, records []*AttestationRecord) { + ctx, span := trace.StartSpan(ctx, "validatorDB.flushAttestationRecords") + defer span.End() + if s.batchedAttestationsFlushInProgress.IsSet() { // This should never happen. This method should not be called when a flush is already in // progress. If you are seeing this log, check the atomic bool before calling this method. @@ -381,6 +406,7 @@ func (s *Store) flushAttestationRecords(ctx context.Context, records []*Attestat } else { // This should never happen. log.WithError(err).Error("Failed to batch save attestation records, retrying in queue") + tracing.AnnotateError(span, err) for _, ar := range records { s.batchedAttestations.Append(ar) } diff --git a/validator/db/kv/db.go b/validator/db/kv/db.go index 8aac257ece..5c409e7571 100644 --- a/validator/db/kv/db.go +++ b/validator/db/kv/db.go @@ -64,7 +64,7 @@ type Store struct { db *bolt.DB databasePath string batchedAttestations *QueuedAttestationRecords - batchedAttestationsChan chan *AttestationRecord + batchedAttestationsChan chan *AttestationRecordSaveRequest batchAttestationsFlushedFeed *event.Feed batchedAttestationsFlushInProgress abool.AtomicBool } @@ -134,7 +134,7 @@ func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, er db: boltDB, databasePath: dirPath, batchedAttestations: NewQueuedAttestationRecords(), - batchedAttestationsChan: make(chan *AttestationRecord, attestationBatchCapacity), + batchedAttestationsChan: make(chan *AttestationRecordSaveRequest, attestationBatchCapacity), batchAttestationsFlushedFeed: new(event.Feed), } diff --git a/validator/keymanager/local/keymanager.go b/validator/keymanager/local/keymanager.go index 72af81adda..45b94383d1 100644 --- a/validator/keymanager/local/keymanager.go +++ b/validator/keymanager/local/keymanager.go @@ -200,9 +200,6 @@ func (km *Keymanager) FetchValidatingPrivateKeys(ctx context.Context) ([][32]byt // Sign signs a message using a validator key. func (_ *Keymanager) Sign(ctx context.Context, req *validatorpb.SignRequest) (bls.Signature, error) { - ctx, span := trace.StartSpan(ctx, "keymanager.Sign") - defer span.End() - publicKey := req.PublicKey if publicKey == nil { return nil, errors.New("nil public key in request")