Compare commits

...

2 Commits

Author SHA1 Message Date
james-prysm
edcf9e86c5 changelog and ci complaints 2026-01-08 13:52:38 -06:00
james-prysm
2e8d579d31 adding cache for attestation data so we don't call it multiple times 2026-01-08 13:38:06 -06:00
4 changed files with 260 additions and 9 deletions

View File

@@ -0,0 +1,3 @@
### Changed
- post electra we now call attestation data once per slot and use a cache for subsequent requests

View File

@@ -71,17 +71,9 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
return
}
committeeIndex := duty.CommitteeIndex
postElectra := slots.ToEpoch(slot) >= params.BeaconConfig().ElectraForkEpoch
if postElectra {
committeeIndex = 0
}
req := &ethpb.AttestationDataRequest{
Slot: slot,
CommitteeIndex: committeeIndex,
}
data, err := v.validatorClient.AttestationData(ctx, req)
data, err := v.getAttestationData(ctx, slot, duty.CommitteeIndex)
if err != nil {
log.WithError(err).Error("Could not request attestation to sign at slot")
if v.emitAccountMetrics {

View File

@@ -112,6 +112,9 @@ type validator struct {
blacklistedPubkeysLock sync.RWMutex
attSelectionLock sync.Mutex
dutiesLock sync.RWMutex
attestationDataCacheLock sync.RWMutex
attestationDataCache *ethpb.AttestationData
attestationDataCacheSlot primitives.Slot
disableDutiesPolling bool
accountsChangedChannel chan [][fieldparams.BLSPubkeyLength]byte
eventsChannel chan *eventClient.Event
@@ -977,6 +980,55 @@ func (v *validator) domainData(ctx context.Context, epoch primitives.Epoch, doma
return res, nil
}
// getAttestationData fetches attestation data from the beacon node with caching for post-Electra.
// Post-Electra, attestation data is identical for all validators in the same slot (committee index is always 0),
// so we cache it to avoid redundant beacon node requests.
func (v *validator) getAttestationData(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) (*ethpb.AttestationData, error) {
ctx, span := trace.StartSpan(ctx, "validator.getAttestationData")
defer span.End()
postElectra := slots.ToEpoch(slot) >= params.BeaconConfig().ElectraForkEpoch
// Pre-Electra: no caching since committee index varies per validator
if !postElectra {
return v.validatorClient.AttestationData(ctx, &ethpb.AttestationDataRequest{
Slot: slot,
CommitteeIndex: committeeIndex,
})
}
// Post-Electra: check cache first (committee index is always 0)
v.attestationDataCacheLock.RLock()
if v.attestationDataCacheSlot == slot && v.attestationDataCache != nil {
data := v.attestationDataCache
v.attestationDataCacheLock.RUnlock()
return data, nil
}
v.attestationDataCacheLock.RUnlock()
// Cache miss - acquire write lock and fetch
v.attestationDataCacheLock.Lock()
defer v.attestationDataCacheLock.Unlock()
// Double-check after acquiring write lock (another goroutine may have filled the cache)
if v.attestationDataCacheSlot == slot && v.attestationDataCache != nil {
return v.attestationDataCache, nil
}
data, err := v.validatorClient.AttestationData(ctx, &ethpb.AttestationDataRequest{
Slot: slot,
CommitteeIndex: 0,
})
if err != nil {
return nil, err
}
v.attestationDataCache = data
v.attestationDataCacheSlot = slot
return data, nil
}
func (v *validator) logDuties(slot primitives.Slot, currentEpochDuties []*ethpb.ValidatorDuty, nextEpochDuties []*ethpb.ValidatorDuty) {
attesterKeys := make([][]string, params.BeaconConfig().SlotsPerEpoch)
for i := range attesterKeys {

View File

@@ -2977,3 +2977,207 @@ func TestValidator_CheckDependentRoots(t *testing.T) {
require.NoError(t, v.checkDependentRoots(ctx, head))
})
}
func TestGetAttestationData_PreElectraNoCaching(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := validatormock.NewMockValidatorClient(ctrl)
v := &validator{validatorClient: client}
// Pre-Electra slot (Electra fork epoch is far in the future by default)
preElectraSlot := primitives.Slot(10)
expectedData := &ethpb.AttestationData{
Slot: preElectraSlot,
CommitteeIndex: 5,
BeaconBlockRoot: bytesutil.PadTo([]byte("root"), 32),
Source: &ethpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte("source"), 32)},
Target: &ethpb.Checkpoint{Epoch: 2, Root: bytesutil.PadTo([]byte("target"), 32)},
}
// Each call should go to the beacon node (no caching pre-Electra)
client.EXPECT().AttestationData(gomock.Any(), &ethpb.AttestationDataRequest{
Slot: preElectraSlot,
CommitteeIndex: 5,
}).Return(expectedData, nil)
client.EXPECT().AttestationData(gomock.Any(), &ethpb.AttestationDataRequest{
Slot: preElectraSlot,
CommitteeIndex: 7,
}).Return(expectedData, nil)
// First call with committee index 5
data1, err := v.getAttestationData(context.Background(), preElectraSlot, 5)
require.NoError(t, err)
require.DeepEqual(t, expectedData, data1)
// Second call with different committee index 7 - should still call beacon node
data2, err := v.getAttestationData(context.Background(), preElectraSlot, 7)
require.NoError(t, err)
require.DeepEqual(t, expectedData, data2)
}
func TestGetAttestationData_PostElectraCaching(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
// Set up Electra fork epoch for this test
cfg := params.BeaconConfig().Copy()
originalElectraForkEpoch := cfg.ElectraForkEpoch
cfg.ElectraForkEpoch = 1
params.OverrideBeaconConfig(cfg)
defer func() {
cfg.ElectraForkEpoch = originalElectraForkEpoch
params.OverrideBeaconConfig(cfg)
}()
client := validatormock.NewMockValidatorClient(ctrl)
v := &validator{validatorClient: client}
// Post-Electra slot
postElectraSlot := primitives.Slot(params.BeaconConfig().SlotsPerEpoch + 5)
expectedData := &ethpb.AttestationData{
Slot: postElectraSlot,
CommitteeIndex: 0,
BeaconBlockRoot: bytesutil.PadTo([]byte("root"), 32),
Source: &ethpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte("source"), 32)},
Target: &ethpb.Checkpoint{Epoch: 2, Root: bytesutil.PadTo([]byte("target"), 32)},
}
// Only ONE call should go to the beacon node (caching post-Electra)
client.EXPECT().AttestationData(gomock.Any(), &ethpb.AttestationDataRequest{
Slot: postElectraSlot,
CommitteeIndex: 0,
}).Return(expectedData, nil).Times(1)
// First call - should hit beacon node
data1, err := v.getAttestationData(context.Background(), postElectraSlot, 5)
require.NoError(t, err)
require.DeepEqual(t, expectedData, data1)
// Second call with different committee index - should use cache
data2, err := v.getAttestationData(context.Background(), postElectraSlot, 7)
require.NoError(t, err)
require.DeepEqual(t, expectedData, data2)
// Third call - should still use cache
data3, err := v.getAttestationData(context.Background(), postElectraSlot, 10)
require.NoError(t, err)
require.DeepEqual(t, expectedData, data3)
}
func TestGetAttestationData_PostElectraCacheInvalidatesOnNewSlot(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
// Set up Electra fork epoch for this test
cfg := params.BeaconConfig().Copy()
originalElectraForkEpoch := cfg.ElectraForkEpoch
cfg.ElectraForkEpoch = 1
params.OverrideBeaconConfig(cfg)
defer func() {
cfg.ElectraForkEpoch = originalElectraForkEpoch
params.OverrideBeaconConfig(cfg)
}()
client := validatormock.NewMockValidatorClient(ctrl)
v := &validator{validatorClient: client}
slot1 := primitives.Slot(params.BeaconConfig().SlotsPerEpoch + 5)
slot2 := primitives.Slot(params.BeaconConfig().SlotsPerEpoch + 6)
dataSlot1 := &ethpb.AttestationData{
Slot: slot1,
CommitteeIndex: 0,
BeaconBlockRoot: bytesutil.PadTo([]byte("root1"), 32),
Source: &ethpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte("source"), 32)},
Target: &ethpb.Checkpoint{Epoch: 2, Root: bytesutil.PadTo([]byte("target"), 32)},
}
dataSlot2 := &ethpb.AttestationData{
Slot: slot2,
CommitteeIndex: 0,
BeaconBlockRoot: bytesutil.PadTo([]byte("root2"), 32),
Source: &ethpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte("source"), 32)},
Target: &ethpb.Checkpoint{Epoch: 2, Root: bytesutil.PadTo([]byte("target"), 32)},
}
// Expect one call per slot
client.EXPECT().AttestationData(gomock.Any(), &ethpb.AttestationDataRequest{
Slot: slot1,
CommitteeIndex: 0,
}).Return(dataSlot1, nil).Times(1)
client.EXPECT().AttestationData(gomock.Any(), &ethpb.AttestationDataRequest{
Slot: slot2,
CommitteeIndex: 0,
}).Return(dataSlot2, nil).Times(1)
// First slot - should hit beacon node
data1, err := v.getAttestationData(context.Background(), slot1, 5)
require.NoError(t, err)
require.DeepEqual(t, dataSlot1, data1)
// Same slot - should use cache
data1Again, err := v.getAttestationData(context.Background(), slot1, 7)
require.NoError(t, err)
require.DeepEqual(t, dataSlot1, data1Again)
// New slot - should invalidate cache and hit beacon node
data2, err := v.getAttestationData(context.Background(), slot2, 5)
require.NoError(t, err)
require.DeepEqual(t, dataSlot2, data2)
}
func TestGetAttestationData_PostElectraConcurrentAccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
// Set up Electra fork epoch for this test
cfg := params.BeaconConfig().Copy()
originalElectraForkEpoch := cfg.ElectraForkEpoch
cfg.ElectraForkEpoch = 1
params.OverrideBeaconConfig(cfg)
defer func() {
cfg.ElectraForkEpoch = originalElectraForkEpoch
params.OverrideBeaconConfig(cfg)
}()
client := validatormock.NewMockValidatorClient(ctrl)
v := &validator{validatorClient: client}
postElectraSlot := primitives.Slot(params.BeaconConfig().SlotsPerEpoch + 5)
expectedData := &ethpb.AttestationData{
Slot: postElectraSlot,
CommitteeIndex: 0,
BeaconBlockRoot: bytesutil.PadTo([]byte("root"), 32),
Source: &ethpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte("source"), 32)},
Target: &ethpb.Checkpoint{Epoch: 2, Root: bytesutil.PadTo([]byte("target"), 32)},
}
// Should only call beacon node once despite concurrent requests
client.EXPECT().AttestationData(gomock.Any(), &ethpb.AttestationDataRequest{
Slot: postElectraSlot,
CommitteeIndex: 0,
}).Return(expectedData, nil).Times(1)
var wg sync.WaitGroup
numGoroutines := 10
results := make([]*ethpb.AttestationData, numGoroutines)
errs := make([]error, numGoroutines)
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
results[idx], errs[idx] = v.getAttestationData(context.Background(), postElectraSlot, primitives.CommitteeIndex(idx))
}(i)
}
wg.Wait()
for i := 0; i < numGoroutines; i++ {
require.NoError(t, errs[i])
require.DeepEqual(t, expectedData, results[i])
}
}