Add Spans To Attestation Caches (#8556)

* add spans

* preston's comments

* add span
This commit is contained in:
Nishant Das
2021-03-05 23:17:27 +08:00
committed by GitHub
parent 32f2f711db
commit 067a519b37
9 changed files with 65 additions and 40 deletions

View File

@@ -25,6 +25,7 @@ go_library(
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
)

View File

@@ -1,6 +1,8 @@
package kv
import (
"context"
"github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
@@ -8,29 +10,37 @@ import (
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
attaggregation "github.com/prysmaticlabs/prysm/shared/aggregation/attestations"
log "github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
// AggregateUnaggregatedAttestations aggregates the unaggregated attestations and saves the
// newly aggregated attestations in the pool.
// It tracks the unaggregated attestations that weren't able to aggregate to prevent
// the deletion of unaggregated attestations in the pool.
func (c *AttCaches) AggregateUnaggregatedAttestations() error {
func (c *AttCaches) AggregateUnaggregatedAttestations(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "operations.attestations.kv.AggregateUnaggregatedAttestations")
defer span.End()
unaggregatedAtts, err := c.UnaggregatedAttestations()
if err != nil {
return err
}
return c.aggregateUnaggregatedAttestations(unaggregatedAtts)
return c.aggregateUnaggregatedAttestations(ctx, unaggregatedAtts)
}
// AggregateUnaggregatedAttestationsBySlotIndex aggregates the unaggregated attestations and saves
// newly aggregated attestations in the pool. Unaggregated attestations are filtered by slot and
// committee index.
func (c *AttCaches) AggregateUnaggregatedAttestationsBySlotIndex(slot types.Slot, committeeIndex types.CommitteeIndex) error {
unaggregatedAtts := c.UnaggregatedAttestationsBySlotIndex(slot, committeeIndex)
return c.aggregateUnaggregatedAttestations(unaggregatedAtts)
func (c *AttCaches) AggregateUnaggregatedAttestationsBySlotIndex(ctx context.Context, slot types.Slot, committeeIndex types.CommitteeIndex) error {
ctx, span := trace.StartSpan(ctx, "operations.attestations.kv.AggregateUnaggregatedAttestationsBySlotIndex")
defer span.End()
unaggregatedAtts := c.UnaggregatedAttestationsBySlotIndex(ctx, slot, committeeIndex)
return c.aggregateUnaggregatedAttestations(ctx, unaggregatedAtts)
}
func (c *AttCaches) aggregateUnaggregatedAttestations(unaggregatedAtts []*ethpb.Attestation) error {
func (c *AttCaches) aggregateUnaggregatedAttestations(ctx context.Context, unaggregatedAtts []*ethpb.Attestation) error {
ctx, span := trace.StartSpan(ctx, "operations.attestations.kv.aggregateUnaggregatedAttestations")
defer span.End()
attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation, len(unaggregatedAtts))
for _, att := range unaggregatedAtts {
attDataRoot, err := att.Data.HashTreeRoot()
@@ -158,7 +168,10 @@ func (c *AttCaches) AggregatedAttestations() []*ethpb.Attestation {
// AggregatedAttestationsBySlotIndex returns the aggregated attestations in cache,
// filtered by committee index and slot.
func (c *AttCaches) AggregatedAttestationsBySlotIndex(slot types.Slot, committeeIndex types.CommitteeIndex) []*ethpb.Attestation {
func (c *AttCaches) AggregatedAttestationsBySlotIndex(ctx context.Context, slot types.Slot, committeeIndex types.CommitteeIndex) []*ethpb.Attestation {
ctx, span := trace.StartSpan(ctx, "operations.attestations.kv.AggregatedAttestationsBySlotIndex")
defer span.End()
atts := make([]*ethpb.Attestation, 0)
c.aggregatedAttLock.RLock()

View File

@@ -1,6 +1,7 @@
package kv
import (
"context"
"sort"
"testing"
@@ -31,10 +32,10 @@ func TestKV_Aggregated_AggregateUnaggregatedAttestations(t *testing.T) {
att8 := testutil.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1001}, Signature: sig2.Marshal()})
atts := []*ethpb.Attestation{att1, att2, att3, att4, att5, att6, att7, att8}
require.NoError(t, cache.SaveUnaggregatedAttestations(atts))
require.NoError(t, cache.AggregateUnaggregatedAttestations())
require.NoError(t, cache.AggregateUnaggregatedAttestations(context.Background()))
require.Equal(t, 1, len(cache.AggregatedAttestationsBySlotIndex(1, 0)), "Did not aggregate correctly")
require.Equal(t, 1, len(cache.AggregatedAttestationsBySlotIndex(2, 0)), "Did not aggregate correctly")
require.Equal(t, 1, len(cache.AggregatedAttestationsBySlotIndex(context.Background(), 1, 0)), "Did not aggregate correctly")
require.Equal(t, 1, len(cache.AggregatedAttestationsBySlotIndex(context.Background(), 2, 0)), "Did not aggregate correctly")
}
func TestKV_Aggregated_AggregateUnaggregatedAttestationsBySlotIndex(t *testing.T) {
@@ -63,32 +64,33 @@ func TestKV_Aggregated_AggregateUnaggregatedAttestationsBySlotIndex(t *testing.T
{AggregationBits: bitfield.Bitlist{0b1010}, Data: genData(2, 3), Signature: genSign()},
{AggregationBits: bitfield.Bitlist{0b1100}, Data: genData(2, 4), Signature: genSign()},
}
ctx := context.Background()
// Make sure that no error is produced if aggregation is requested on empty unaggregated list.
require.NoError(t, cache.AggregateUnaggregatedAttestationsBySlotIndex(1, 2))
require.NoError(t, cache.AggregateUnaggregatedAttestationsBySlotIndex(2, 3))
require.Equal(t, 0, len(cache.UnaggregatedAttestationsBySlotIndex(1, 2)))
require.Equal(t, 0, len(cache.AggregatedAttestationsBySlotIndex(1, 2)), "Did not aggregate correctly")
require.Equal(t, 0, len(cache.UnaggregatedAttestationsBySlotIndex(1, 3)))
require.Equal(t, 0, len(cache.AggregatedAttestationsBySlotIndex(1, 3)), "Did not aggregate correctly")
require.NoError(t, cache.AggregateUnaggregatedAttestationsBySlotIndex(ctx, 1, 2))
require.NoError(t, cache.AggregateUnaggregatedAttestationsBySlotIndex(ctx, 2, 3))
require.Equal(t, 0, len(cache.UnaggregatedAttestationsBySlotIndex(ctx, 1, 2)))
require.Equal(t, 0, len(cache.AggregatedAttestationsBySlotIndex(ctx, 1, 2)), "Did not aggregate correctly")
require.Equal(t, 0, len(cache.UnaggregatedAttestationsBySlotIndex(ctx, 1, 3)))
require.Equal(t, 0, len(cache.AggregatedAttestationsBySlotIndex(ctx, 1, 3)), "Did not aggregate correctly")
// Persist unaggregated attestations, and aggregate on per slot/committee index base.
require.NoError(t, cache.SaveUnaggregatedAttestations(atts))
require.NoError(t, cache.AggregateUnaggregatedAttestationsBySlotIndex(1, 2))
require.NoError(t, cache.AggregateUnaggregatedAttestationsBySlotIndex(2, 3))
require.NoError(t, cache.AggregateUnaggregatedAttestationsBySlotIndex(ctx, 1, 2))
require.NoError(t, cache.AggregateUnaggregatedAttestationsBySlotIndex(ctx, 2, 3))
// Committee attestations at a slot should be aggregated.
require.Equal(t, 0, len(cache.UnaggregatedAttestationsBySlotIndex(1, 2)))
require.Equal(t, 1, len(cache.AggregatedAttestationsBySlotIndex(1, 2)), "Did not aggregate correctly")
require.Equal(t, 0, len(cache.UnaggregatedAttestationsBySlotIndex(ctx, 1, 2)))
require.Equal(t, 1, len(cache.AggregatedAttestationsBySlotIndex(ctx, 1, 2)), "Did not aggregate correctly")
// Committee attestations haven't been aggregated.
require.Equal(t, 2, len(cache.UnaggregatedAttestationsBySlotIndex(1, 3)))
require.Equal(t, 0, len(cache.AggregatedAttestationsBySlotIndex(1, 3)), "Did not aggregate correctly")
require.Equal(t, 2, len(cache.UnaggregatedAttestationsBySlotIndex(ctx, 1, 3)))
require.Equal(t, 0, len(cache.AggregatedAttestationsBySlotIndex(ctx, 1, 3)), "Did not aggregate correctly")
// Committee at a second slot is aggregated.
require.Equal(t, 0, len(cache.UnaggregatedAttestationsBySlotIndex(2, 3)))
require.Equal(t, 1, len(cache.AggregatedAttestationsBySlotIndex(2, 3)), "Did not aggregate correctly")
require.Equal(t, 0, len(cache.UnaggregatedAttestationsBySlotIndex(ctx, 2, 3)))
require.Equal(t, 1, len(cache.AggregatedAttestationsBySlotIndex(ctx, 2, 3)), "Did not aggregate correctly")
// The second committee at second slot is not aggregated.
require.Equal(t, 1, len(cache.UnaggregatedAttestationsBySlotIndex(2, 4)))
require.Equal(t, 0, len(cache.AggregatedAttestationsBySlotIndex(2, 4)), "Did not aggregate correctly")
require.Equal(t, 1, len(cache.UnaggregatedAttestationsBySlotIndex(ctx, 2, 4)))
require.Equal(t, 0, len(cache.AggregatedAttestationsBySlotIndex(ctx, 2, 4)), "Did not aggregate correctly")
}
func TestKV_Aggregated_SaveAggregatedAttestation(t *testing.T) {

View File

@@ -1,11 +1,14 @@
package kv
import (
"context"
"github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"go.opencensus.io/trace"
)
// SaveUnaggregatedAttestation saves an unaggregated attestation in cache.
@@ -68,7 +71,10 @@ func (c *AttCaches) UnaggregatedAttestations() ([]*ethpb.Attestation, error) {
// UnaggregatedAttestationsBySlotIndex returns the unaggregated attestations in cache,
// filtered by committee index and slot.
func (c *AttCaches) UnaggregatedAttestationsBySlotIndex(slot types.Slot, committeeIndex types.CommitteeIndex) []*ethpb.Attestation {
func (c *AttCaches) UnaggregatedAttestationsBySlotIndex(ctx context.Context, slot types.Slot, committeeIndex types.CommitteeIndex) []*ethpb.Attestation {
ctx, span := trace.StartSpan(ctx, "operations.attestations.kv.UnaggregatedAttestationsBySlotIndex")
defer span.End()
atts := make([]*ethpb.Attestation, 0)
c.unAggregateAttLock.RLock()

View File

@@ -2,6 +2,7 @@ package kv
import (
"bytes"
"context"
"sort"
"testing"
@@ -242,11 +243,11 @@ func TestKV_Unaggregated_UnaggregatedAttestationsBySlotIndex(t *testing.T) {
for _, att := range atts {
require.NoError(t, cache.SaveUnaggregatedAttestation(att))
}
returned := cache.UnaggregatedAttestationsBySlotIndex(1, 1)
ctx := context.Background()
returned := cache.UnaggregatedAttestationsBySlotIndex(ctx, 1, 1)
assert.DeepEqual(t, []*ethpb.Attestation{att1}, returned)
returned = cache.UnaggregatedAttestationsBySlotIndex(1, 2)
returned = cache.UnaggregatedAttestationsBySlotIndex(ctx, 1, 2)
assert.DeepEqual(t, []*ethpb.Attestation{att2}, returned)
returned = cache.UnaggregatedAttestationsBySlotIndex(2, 1)
returned = cache.UnaggregatedAttestationsBySlotIndex(ctx, 2, 1)
assert.DeepEqual(t, []*ethpb.Attestation{att3}, returned)
}

View File

@@ -1,6 +1,8 @@
package attestations
import (
"context"
types "github.com/prysmaticlabs/eth2-types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations/kv"
@@ -12,12 +14,12 @@ import (
// aggregator actor.
type Pool interface {
// For Aggregated attestations
AggregateUnaggregatedAttestations() error
AggregateUnaggregatedAttestationsBySlotIndex(slot types.Slot, committeeIndex types.CommitteeIndex) error
AggregateUnaggregatedAttestations(ctx context.Context) error
AggregateUnaggregatedAttestationsBySlotIndex(ctx context.Context, slot types.Slot, committeeIndex types.CommitteeIndex) error
SaveAggregatedAttestation(att *ethpb.Attestation) error
SaveAggregatedAttestations(atts []*ethpb.Attestation) error
AggregatedAttestations() []*ethpb.Attestation
AggregatedAttestationsBySlotIndex(slot types.Slot, committeeIndex types.CommitteeIndex) []*ethpb.Attestation
AggregatedAttestationsBySlotIndex(ctx context.Context, slot types.Slot, committeeIndex types.CommitteeIndex) []*ethpb.Attestation
DeleteAggregatedAttestation(att *ethpb.Attestation) error
HasAggregatedAttestation(att *ethpb.Attestation) (bool, error)
AggregatedAttestationCount() int
@@ -25,7 +27,7 @@ type Pool interface {
SaveUnaggregatedAttestation(att *ethpb.Attestation) error
SaveUnaggregatedAttestations(atts []*ethpb.Attestation) error
UnaggregatedAttestations() ([]*ethpb.Attestation, error)
UnaggregatedAttestationsBySlotIndex(slot types.Slot, committeeIndex types.CommitteeIndex) []*ethpb.Attestation
UnaggregatedAttestationsBySlotIndex(ctx context.Context, slot types.Slot, committeeIndex types.CommitteeIndex) []*ethpb.Attestation
DeleteUnaggregatedAttestation(att *ethpb.Attestation) error
DeleteSeenUnaggregatedAttestations() (int, error)
UnaggregatedAttestationCount() int

View File

@@ -43,7 +43,7 @@ func (s *Service) batchForkChoiceAtts(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "Operations.attestations.batchForkChoiceAtts")
defer span.End()
if err := s.pool.AggregateUnaggregatedAttestations(); err != nil {
if err := s.pool.AggregateUnaggregatedAttestations(ctx); err != nil {
return err
}
atts := append(s.pool.AggregatedAttestations(), s.pool.BlockAttestations()...)

View File

@@ -100,7 +100,7 @@ func TestBatchAttestations_Multiple(t *testing.T) {
require.NoError(t, err)
wanted = append(wanted, aggregated...)
require.NoError(t, s.pool.AggregateUnaggregatedAttestations())
require.NoError(t, s.pool.AggregateUnaggregatedAttestations(context.Background()))
received := s.pool.ForkchoiceAttestations()
sort.Slice(received, func(i, j int) bool {

View File

@@ -59,14 +59,14 @@ func (vs *Server) SubmitAggregateSelectionProof(ctx context.Context, req *ethpb.
return nil, status.Errorf(codes.InvalidArgument, "Validator is not an aggregator")
}
if err := vs.AttPool.AggregateUnaggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex); err != nil {
if err := vs.AttPool.AggregateUnaggregatedAttestationsBySlotIndex(ctx, req.Slot, req.CommitteeIndex); err != nil {
return nil, status.Errorf(codes.Internal, "Could not aggregate unaggregated attestations")
}
aggregatedAtts := vs.AttPool.AggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex)
aggregatedAtts := vs.AttPool.AggregatedAttestationsBySlotIndex(ctx, req.Slot, req.CommitteeIndex)
// Filter out the best aggregated attestation (ie. the one with the most aggregated bits).
if len(aggregatedAtts) == 0 {
aggregatedAtts = vs.AttPool.UnaggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex)
aggregatedAtts = vs.AttPool.UnaggregatedAttestationsBySlotIndex(ctx, req.Slot, req.CommitteeIndex)
if len(aggregatedAtts) == 0 {
return nil, status.Errorf(codes.NotFound, "Could not find attestation for slot and committee in pool")
}