Compare commits

...

1 Commits

Author SHA1 Message Date
james-prysm
1d063d57b8 adding debounce to remote key manager 2025-11-25 16:34:12 -06:00
2 changed files with 112 additions and 36 deletions

View File

@@ -13,6 +13,7 @@ import (
"sync"
"time"
"github.com/OffchainLabs/prysm/v7/async"
"github.com/OffchainLabs/prysm/v7/async/event"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/crypto/bls"
@@ -34,8 +35,9 @@ import (
)
const (
maxRetries = 60
retryDelay = 10 * time.Second
maxRetries = 60
retryDelay = 10 * time.Second
fileChangeDebounce = 3 * time.Second // Debounce interval for file change events to handle manual editing
)
// SetupConfig includes configuration values for initializing.
@@ -286,11 +288,6 @@ func (km *Keymanager) refreshRemoteKeysFromFileChanges(ctx context.Context) erro
log.WithError(err).Error("Could not close file watcher")
}
}()
initialFileInfo, err := os.Stat(km.keyFilePath)
if err != nil {
return errors.Wrap(err, "could not stat remote signer public key file")
}
initialFileSize := initialFileInfo.Size()
if err := watcher.Add(km.keyFilePath); err != nil {
return errors.Wrap(err, "could not add file to file watcher")
}
@@ -308,6 +305,26 @@ func (km *Keymanager) refreshRemoteKeysFromFileChanges(ctx context.Context) erro
}
km.updatePublicKeys(slices.Collect(maps.Values(fk)))
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
fileChangesChan := make(chan any, 100)
defer close(fileChangesChan)
// We debounce events sent over the file changes channel by an interval
// to ensure we are not overwhelmed by a ton of events fired over the channel in
// a short span of time (e.g., during manual file editing).
go async.Debounce(ctx, fileChangeDebounce, fileChangesChan, func(event any) {
e, ok := event.(fsnotify.Event)
if !ok {
log.Errorf("Type %T is not a valid file system event", event)
return
}
if e.Has(fsnotify.Write) || e.Has(fsnotify.Create) {
km.handleFileChange(ctx)
}
})
for {
select {
case e, ok := <-watcher.Events:
@@ -322,30 +339,8 @@ func (km *Keymanager) refreshRemoteKeysFromFileChanges(ctx context.Context) erro
if e.Has(fsnotify.Remove) {
return errors.New("remote signer key file was removed")
}
currentFileInfo, err := os.Stat(km.keyFilePath)
if err != nil {
return errors.Wrap(err, "could not stat remote signer public key file")
}
if currentFileInfo.Size() != initialFileSize {
log.Info("Remote signer key file updated")
fileKeys, _, err := km.readKeyFile()
if err != nil {
return errors.New("could not read key file")
}
// prioritize file keys over flag keys
if len(fileKeys) == 0 {
log.Warnln("Remote signer key file no longer has keys, defaulting to flag provided keys")
fileKeys = slices.Collect(maps.Values(km.flagLoadedKeysMap))
}
currentKeys, err := km.FetchValidatingPublicKeys(ctx)
if err != nil {
return errors.Wrap(err, "could not fetch current keys")
}
if !slices.Equal(currentKeys, fileKeys) {
km.updatePublicKeys(fileKeys)
}
initialFileSize = currentFileInfo.Size()
}
// Send event to debounce channel for processing
fileChangesChan <- e
case err, ok := <-watcher.Errors:
if !ok { // Channel was closed (i.e. Watcher.Close() was called).
log.Info("Closing file watcher")
@@ -359,6 +354,29 @@ func (km *Keymanager) refreshRemoteKeysFromFileChanges(ctx context.Context) erro
}
}
// handleFileChange processes a file change event after debouncing.
func (km *Keymanager) handleFileChange(ctx context.Context) {
log.Info("Remote signer key file updated")
fileKeys, _, err := km.readKeyFile()
if err != nil {
log.WithError(err).Error("Could not read key file")
return
}
// prioritize file keys over flag keys
if len(fileKeys) == 0 {
log.Warnln("Remote signer key file no longer has keys, defaulting to flag provided keys")
fileKeys = slices.Collect(maps.Values(km.flagLoadedKeysMap))
}
currentKeys, err := km.FetchValidatingPublicKeys(ctx)
if err != nil {
log.WithError(err).Error("Could not fetch current keys")
return
}
if !slices.Equal(currentKeys, fileKeys) {
km.updatePublicKeys(fileKeys)
}
}
func (km *Keymanager) updatePublicKeys(keys [][48]byte) {
km.lock.Lock()
defer km.lock.Unlock()

View File

@@ -218,7 +218,8 @@ func TestNewKeyManager_ChangingFileCreated(t *testing.T) {
require.Equal(t, slices.Contains(wantSlice, keys[i]), true)
}
// sleep needs to be at the front because of how watching the file works
time.Sleep(1 * time.Second)
// Wait for debounce interval plus processing time
time.Sleep(fileChangeDebounce + 500*time.Millisecond)
// Open the file for writing, create it if it does not exist, and truncate it if it does.
f, err := os.OpenFile(keyFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
@@ -235,6 +236,9 @@ func TestNewKeyManager_ChangingFileCreated(t *testing.T) {
require.Equal(t, 1, len(ks))
require.Equal(t, "0x8000a9a6d3f5e22d783eefaadbcf0298146adb5d95b04db910a0d4e16976b30229d0b1e7b9cda6c7e0bfa11f72efe055", hexutil.Encode(ks[0][:]))
// Wait for debounce and processing to update providedPublicKeys
time.Sleep(fileChangeDebounce + 500*time.Millisecond)
require.Equal(t, 1, len(km.providedPublicKeys))
require.Equal(t, "0x8000a9a6d3f5e22d783eefaadbcf0298146adb5d95b04db910a0d4e16976b30229d0b1e7b9cda6c7e0bfa11f72efe055", hexutil.Encode(km.providedPublicKeys[0][:]))
}
@@ -267,15 +271,15 @@ func TestNewKeyManager_FileAndFlagsWithDifferentKeys(t *testing.T) {
for _, key := range keys {
require.Equal(t, slices.Contains(wantSlice, hexutil.Encode(key[:])), true)
}
// wait for reading to be done
time.Sleep(2 * time.Second)
// wait for watcher to initialize
time.Sleep(500 * time.Millisecond)
// test fall back by clearing file
go func() {
err = file.WriteFile(keyFilePath, []byte(" "))
require.NoError(t, err)
}()
// waiting for writing to be done
time.Sleep(2 * time.Second)
// waiting for debounce and processing to complete
time.Sleep(fileChangeDebounce + 500*time.Millisecond)
require.LogsContain(t, logHook, "Remote signer key file no longer has keys, defaulting to flag provided keys")
// fall back to flag provided keys
@@ -321,6 +325,60 @@ func TestRefreshRemoteKeysFromFileChangesWithRetry(t *testing.T) {
require.Equal(t, 1, len(keys))
}
func TestRefreshRemoteKeysFromFileChanges_SameSizeDifferentKeys(t *testing.T) {
// This test verifies that key changes are detected even when the file size stays the same
// (e.g., swapping one key for another of equal length).
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
logHook := logTest.NewGlobal()
root, err := hexutil.Decode("0x270d43e74ce340de4bca2b1936beca0f4f5408d9e78aec4850920baf659d5b69")
require.NoError(t, err)
keyFilePath := filepath.Join(t.TempDir(), "keyfile.txt")
// Two keys of the same length (96 hex chars each = 48 bytes)
key1 := "0x8000a9a6d3f5e22d783eefaadbcf0298146adb5d95b04db910a0d4e16976b30229d0b1e7b9cda6c7e0bfa11f72efe055"
key2 := "0x800077e04f8d7496099b3d30ac5430aea64873a45e5bcfe004d2095babcbf55e21138ff0d5691abc29da190aa32755c6"
// Create initial file with key1
err = file.WriteFile(keyFilePath, []byte(key1))
require.NoError(t, err)
km, err := NewKeymanager(ctx, &SetupConfig{
BaseEndpoint: "http://example.com",
GenesisValidatorsRoot: root,
KeyFilePath: keyFilePath,
})
require.NoError(t, err)
// Verify initial key loaded
keys, err := km.FetchValidatingPublicKeys(ctx)
require.NoError(t, err)
require.Equal(t, 1, len(keys))
require.Equal(t, key1, hexutil.Encode(keys[0][:]))
// Start the file watcher
go func() {
_ = km.refreshRemoteKeysFromFileChanges(ctx)
}()
// Wait for watcher to initialize
time.Sleep(100 * time.Millisecond)
require.LogsContain(t, logHook, "Successfully initialized file watcher")
// Write key2 to the file (same size as key1)
err = file.WriteFile(keyFilePath, []byte(key2))
require.NoError(t, err)
// Wait for file change to be detected (debounce interval is 3s + processing time)
time.Sleep(fileChangeDebounce + 500*time.Millisecond)
// Verify key was updated despite same file size
keys, err = km.FetchValidatingPublicKeys(ctx)
require.NoError(t, err)
require.Equal(t, 1, len(keys))
require.Equal(t, key2, hexutil.Encode(keys[0][:]), "Key should have been updated even though file size is the same")
require.LogsContain(t, logHook, "Remote signer key file updated")
}
func TestReadKeyFile_PathMissing(t *testing.T) {
root, err := hexutil.Decode("0x270d43e74ce340de4bca2b1936beca0f4f5408d9e78aec4850920baf659d5b69")
require.NoError(t, err)