mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-11 06:18:05 -05:00
Compare commits
2 Commits
process-ex
...
cache-atte
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
edcf9e86c5 | ||
|
|
2e8d579d31 |
3
changelog/james-prysm_cache-attestation-data.md
Normal file
3
changelog/james-prysm_cache-attestation-data.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Changed
|
||||
|
||||
- post electra we now call attestation data once per slot and use a cache for subsequent requests
|
||||
@@ -71,17 +71,9 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
|
||||
return
|
||||
}
|
||||
|
||||
committeeIndex := duty.CommitteeIndex
|
||||
postElectra := slots.ToEpoch(slot) >= params.BeaconConfig().ElectraForkEpoch
|
||||
if postElectra {
|
||||
committeeIndex = 0
|
||||
}
|
||||
|
||||
req := ðpb.AttestationDataRequest{
|
||||
Slot: slot,
|
||||
CommitteeIndex: committeeIndex,
|
||||
}
|
||||
data, err := v.validatorClient.AttestationData(ctx, req)
|
||||
data, err := v.getAttestationData(ctx, slot, duty.CommitteeIndex)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not request attestation to sign at slot")
|
||||
if v.emitAccountMetrics {
|
||||
|
||||
@@ -112,6 +112,9 @@ type validator struct {
|
||||
blacklistedPubkeysLock sync.RWMutex
|
||||
attSelectionLock sync.Mutex
|
||||
dutiesLock sync.RWMutex
|
||||
attestationDataCacheLock sync.RWMutex
|
||||
attestationDataCache *ethpb.AttestationData
|
||||
attestationDataCacheSlot primitives.Slot
|
||||
disableDutiesPolling bool
|
||||
accountsChangedChannel chan [][fieldparams.BLSPubkeyLength]byte
|
||||
eventsChannel chan *eventClient.Event
|
||||
@@ -977,6 +980,55 @@ func (v *validator) domainData(ctx context.Context, epoch primitives.Epoch, doma
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// getAttestationData fetches attestation data from the beacon node with caching for post-Electra.
|
||||
// Post-Electra, attestation data is identical for all validators in the same slot (committee index is always 0),
|
||||
// so we cache it to avoid redundant beacon node requests.
|
||||
func (v *validator) getAttestationData(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) (*ethpb.AttestationData, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "validator.getAttestationData")
|
||||
defer span.End()
|
||||
|
||||
postElectra := slots.ToEpoch(slot) >= params.BeaconConfig().ElectraForkEpoch
|
||||
|
||||
// Pre-Electra: no caching since committee index varies per validator
|
||||
if !postElectra {
|
||||
return v.validatorClient.AttestationData(ctx, ðpb.AttestationDataRequest{
|
||||
Slot: slot,
|
||||
CommitteeIndex: committeeIndex,
|
||||
})
|
||||
}
|
||||
|
||||
// Post-Electra: check cache first (committee index is always 0)
|
||||
v.attestationDataCacheLock.RLock()
|
||||
if v.attestationDataCacheSlot == slot && v.attestationDataCache != nil {
|
||||
data := v.attestationDataCache
|
||||
v.attestationDataCacheLock.RUnlock()
|
||||
return data, nil
|
||||
}
|
||||
v.attestationDataCacheLock.RUnlock()
|
||||
|
||||
// Cache miss - acquire write lock and fetch
|
||||
v.attestationDataCacheLock.Lock()
|
||||
defer v.attestationDataCacheLock.Unlock()
|
||||
|
||||
// Double-check after acquiring write lock (another goroutine may have filled the cache)
|
||||
if v.attestationDataCacheSlot == slot && v.attestationDataCache != nil {
|
||||
return v.attestationDataCache, nil
|
||||
}
|
||||
|
||||
data, err := v.validatorClient.AttestationData(ctx, ðpb.AttestationDataRequest{
|
||||
Slot: slot,
|
||||
CommitteeIndex: 0,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
v.attestationDataCache = data
|
||||
v.attestationDataCacheSlot = slot
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (v *validator) logDuties(slot primitives.Slot, currentEpochDuties []*ethpb.ValidatorDuty, nextEpochDuties []*ethpb.ValidatorDuty) {
|
||||
attesterKeys := make([][]string, params.BeaconConfig().SlotsPerEpoch)
|
||||
for i := range attesterKeys {
|
||||
|
||||
@@ -2977,3 +2977,207 @@ func TestValidator_CheckDependentRoots(t *testing.T) {
|
||||
require.NoError(t, v.checkDependentRoots(ctx, head))
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetAttestationData_PreElectraNoCaching(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
client := validatormock.NewMockValidatorClient(ctrl)
|
||||
v := &validator{validatorClient: client}
|
||||
|
||||
// Pre-Electra slot (Electra fork epoch is far in the future by default)
|
||||
preElectraSlot := primitives.Slot(10)
|
||||
|
||||
expectedData := ðpb.AttestationData{
|
||||
Slot: preElectraSlot,
|
||||
CommitteeIndex: 5,
|
||||
BeaconBlockRoot: bytesutil.PadTo([]byte("root"), 32),
|
||||
Source: ðpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte("source"), 32)},
|
||||
Target: ðpb.Checkpoint{Epoch: 2, Root: bytesutil.PadTo([]byte("target"), 32)},
|
||||
}
|
||||
|
||||
// Each call should go to the beacon node (no caching pre-Electra)
|
||||
client.EXPECT().AttestationData(gomock.Any(), ðpb.AttestationDataRequest{
|
||||
Slot: preElectraSlot,
|
||||
CommitteeIndex: 5,
|
||||
}).Return(expectedData, nil)
|
||||
client.EXPECT().AttestationData(gomock.Any(), ðpb.AttestationDataRequest{
|
||||
Slot: preElectraSlot,
|
||||
CommitteeIndex: 7,
|
||||
}).Return(expectedData, nil)
|
||||
|
||||
// First call with committee index 5
|
||||
data1, err := v.getAttestationData(context.Background(), preElectraSlot, 5)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, expectedData, data1)
|
||||
|
||||
// Second call with different committee index 7 - should still call beacon node
|
||||
data2, err := v.getAttestationData(context.Background(), preElectraSlot, 7)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, expectedData, data2)
|
||||
}
|
||||
|
||||
func TestGetAttestationData_PostElectraCaching(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
// Set up Electra fork epoch for this test
|
||||
cfg := params.BeaconConfig().Copy()
|
||||
originalElectraForkEpoch := cfg.ElectraForkEpoch
|
||||
cfg.ElectraForkEpoch = 1
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
defer func() {
|
||||
cfg.ElectraForkEpoch = originalElectraForkEpoch
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
}()
|
||||
|
||||
client := validatormock.NewMockValidatorClient(ctrl)
|
||||
v := &validator{validatorClient: client}
|
||||
|
||||
// Post-Electra slot
|
||||
postElectraSlot := primitives.Slot(params.BeaconConfig().SlotsPerEpoch + 5)
|
||||
|
||||
expectedData := ðpb.AttestationData{
|
||||
Slot: postElectraSlot,
|
||||
CommitteeIndex: 0,
|
||||
BeaconBlockRoot: bytesutil.PadTo([]byte("root"), 32),
|
||||
Source: ðpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte("source"), 32)},
|
||||
Target: ðpb.Checkpoint{Epoch: 2, Root: bytesutil.PadTo([]byte("target"), 32)},
|
||||
}
|
||||
|
||||
// Only ONE call should go to the beacon node (caching post-Electra)
|
||||
client.EXPECT().AttestationData(gomock.Any(), ðpb.AttestationDataRequest{
|
||||
Slot: postElectraSlot,
|
||||
CommitteeIndex: 0,
|
||||
}).Return(expectedData, nil).Times(1)
|
||||
|
||||
// First call - should hit beacon node
|
||||
data1, err := v.getAttestationData(context.Background(), postElectraSlot, 5)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, expectedData, data1)
|
||||
|
||||
// Second call with different committee index - should use cache
|
||||
data2, err := v.getAttestationData(context.Background(), postElectraSlot, 7)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, expectedData, data2)
|
||||
|
||||
// Third call - should still use cache
|
||||
data3, err := v.getAttestationData(context.Background(), postElectraSlot, 10)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, expectedData, data3)
|
||||
}
|
||||
|
||||
func TestGetAttestationData_PostElectraCacheInvalidatesOnNewSlot(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
// Set up Electra fork epoch for this test
|
||||
cfg := params.BeaconConfig().Copy()
|
||||
originalElectraForkEpoch := cfg.ElectraForkEpoch
|
||||
cfg.ElectraForkEpoch = 1
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
defer func() {
|
||||
cfg.ElectraForkEpoch = originalElectraForkEpoch
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
}()
|
||||
|
||||
client := validatormock.NewMockValidatorClient(ctrl)
|
||||
v := &validator{validatorClient: client}
|
||||
|
||||
slot1 := primitives.Slot(params.BeaconConfig().SlotsPerEpoch + 5)
|
||||
slot2 := primitives.Slot(params.BeaconConfig().SlotsPerEpoch + 6)
|
||||
|
||||
dataSlot1 := ðpb.AttestationData{
|
||||
Slot: slot1,
|
||||
CommitteeIndex: 0,
|
||||
BeaconBlockRoot: bytesutil.PadTo([]byte("root1"), 32),
|
||||
Source: ðpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte("source"), 32)},
|
||||
Target: ðpb.Checkpoint{Epoch: 2, Root: bytesutil.PadTo([]byte("target"), 32)},
|
||||
}
|
||||
dataSlot2 := ðpb.AttestationData{
|
||||
Slot: slot2,
|
||||
CommitteeIndex: 0,
|
||||
BeaconBlockRoot: bytesutil.PadTo([]byte("root2"), 32),
|
||||
Source: ðpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte("source"), 32)},
|
||||
Target: ðpb.Checkpoint{Epoch: 2, Root: bytesutil.PadTo([]byte("target"), 32)},
|
||||
}
|
||||
|
||||
// Expect one call per slot
|
||||
client.EXPECT().AttestationData(gomock.Any(), ðpb.AttestationDataRequest{
|
||||
Slot: slot1,
|
||||
CommitteeIndex: 0,
|
||||
}).Return(dataSlot1, nil).Times(1)
|
||||
client.EXPECT().AttestationData(gomock.Any(), ðpb.AttestationDataRequest{
|
||||
Slot: slot2,
|
||||
CommitteeIndex: 0,
|
||||
}).Return(dataSlot2, nil).Times(1)
|
||||
|
||||
// First slot - should hit beacon node
|
||||
data1, err := v.getAttestationData(context.Background(), slot1, 5)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, dataSlot1, data1)
|
||||
|
||||
// Same slot - should use cache
|
||||
data1Again, err := v.getAttestationData(context.Background(), slot1, 7)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, dataSlot1, data1Again)
|
||||
|
||||
// New slot - should invalidate cache and hit beacon node
|
||||
data2, err := v.getAttestationData(context.Background(), slot2, 5)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, dataSlot2, data2)
|
||||
}
|
||||
|
||||
func TestGetAttestationData_PostElectraConcurrentAccess(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
// Set up Electra fork epoch for this test
|
||||
cfg := params.BeaconConfig().Copy()
|
||||
originalElectraForkEpoch := cfg.ElectraForkEpoch
|
||||
cfg.ElectraForkEpoch = 1
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
defer func() {
|
||||
cfg.ElectraForkEpoch = originalElectraForkEpoch
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
}()
|
||||
|
||||
client := validatormock.NewMockValidatorClient(ctrl)
|
||||
v := &validator{validatorClient: client}
|
||||
|
||||
postElectraSlot := primitives.Slot(params.BeaconConfig().SlotsPerEpoch + 5)
|
||||
|
||||
expectedData := ðpb.AttestationData{
|
||||
Slot: postElectraSlot,
|
||||
CommitteeIndex: 0,
|
||||
BeaconBlockRoot: bytesutil.PadTo([]byte("root"), 32),
|
||||
Source: ðpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte("source"), 32)},
|
||||
Target: ðpb.Checkpoint{Epoch: 2, Root: bytesutil.PadTo([]byte("target"), 32)},
|
||||
}
|
||||
|
||||
// Should only call beacon node once despite concurrent requests
|
||||
client.EXPECT().AttestationData(gomock.Any(), ðpb.AttestationDataRequest{
|
||||
Slot: postElectraSlot,
|
||||
CommitteeIndex: 0,
|
||||
}).Return(expectedData, nil).Times(1)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
numGoroutines := 10
|
||||
results := make([]*ethpb.AttestationData, numGoroutines)
|
||||
errs := make([]error, numGoroutines)
|
||||
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
wg.Add(1)
|
||||
go func(idx int) {
|
||||
defer wg.Done()
|
||||
results[idx], errs[idx] = v.getAttestationData(context.Background(), postElectraSlot, primitives.CommitteeIndex(idx))
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
require.NoError(t, errs[i])
|
||||
require.DeepEqual(t, expectedData, results[i])
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user