feat: save blobs to db for subscriber (#12734)

This commit is contained in:
terencechain
2023-08-14 12:18:29 -07:00
committed by Preston Van Loon
parent a63a65939e
commit a011f78bd1
4 changed files with 50 additions and 14 deletions

View File

@@ -377,19 +377,22 @@ func (s *Service) getPayloadAttribute(ctx context.Context, st state.BeaconState,
return true, attr, proposerID
}
// removeInvalidBlockAndState removes the invalid block and its corresponding state from the cache and DB.
// removeInvalidBlockAndState removes the invalid block, blob and its corresponding state from the cache and DB.
func (s *Service) removeInvalidBlockAndState(ctx context.Context, blkRoots [][32]byte) error {
for _, root := range blkRoots {
if err := s.cfg.StateGen.DeleteStateFromCaches(ctx, root); err != nil {
return err
}
// Delete block also deletes the state as well.
if err := s.cfg.BeaconDB.DeleteBlock(ctx, root); err != nil {
// TODO(10487): If a caller requests to delete a root that's justified and finalized. We should gracefully shutdown.
// This is an irreparable condition, it would me a justified or finalized block has become invalid.
return err
}
// No op if the sidecar does not exist.
if err := s.cfg.BeaconDB.DeleteBlobSidecar(ctx, root); err != nil {
return err
}
}
return nil
}

View File

@@ -12,6 +12,7 @@ import (
types "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
@@ -23,7 +24,7 @@ import (
// 2. Compute key for blob as bytes(slot_to_rotating_buffer(blob.slot)) ++ bytes(blob.slot) ++ blob.block_root
//
// 3. Begin the save algorithm: If the incoming blob has a slot bigger than the saved slot at the spot
// in the rotating keys buffer, we overwrite all elements for that slot.
// in the rotating keys buffer, we overwrite all elements for that slot. Otherwise, we merge the blob with an existing one.
func (s *Store) SaveBlobSidecar(ctx context.Context, scs []*ethpb.BlobSidecar) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBlobSidecar")
defer span.End()
@@ -38,6 +39,7 @@ func (s *Store) SaveBlobSidecar(ctx context.Context, scs []*ethpb.BlobSidecar) e
if err != nil {
return err
}
bkt := tx.Bucket(blobsBucket)
c := bkt.Cursor()
newKey := blobSidecarKey(scs[0])
@@ -52,16 +54,34 @@ func (s *Store) SaveBlobSidecar(ctx context.Context, scs []*ethpb.BlobSidecar) e
// If there is no element stored at blob.slot % MAX_SLOTS_TO_PERSIST_BLOBS, then we simply
// store the blob by key and exit early.
if len(replacingKey) != 0 {
if err := bkt.Delete(replacingKey); err != nil {
log.WithError(err).Warnf("Could not delete blob with key %#x", replacingKey)
slotBytes := replacingKey[:8]
oldSlot := bytesutil.BytesToSlotBigEndian(slotBytes)
oldEpoch := slots.ToEpoch(oldSlot)
// The blob we are replacing is too old, so we delete it.
if slots.ToEpoch(scs[0].Slot) >= oldEpoch.Add(uint64(params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest)) {
if err := bkt.Delete(replacingKey); err != nil {
log.WithError(err).Warnf("Could not delete blob with key %#x", replacingKey)
}
} else {
// Otherwise, we need to merge the new blob with the old blob.
enc := bkt.Get(replacingKey)
sc := &ethpb.BlobSidecars{}
if err := decode(ctx, enc, sc); err != nil {
return err
}
sc.Sidecars = append(sc.Sidecars, scs...)
sortSideCars(sc.Sidecars)
encodedBlobSidecar, err = encode(ctx, &ethpb.BlobSidecars{Sidecars: sc.Sidecars})
if err != nil {
return err
}
}
}
return bkt.Put(newKey, encodedBlobSidecar)
})
}
// verifySideCars ensures that all sidecars have the same slot, parent root, block root, and proposer index.
// It also ensures that indices are sequential and start at 0 and no more than MAX_BLOB_EPOCHS.
// verifySideCars ensures that all sidecars have the same slot, parent root, block root, and proposer index, and no more than MAX_BLOB_EPOCHS.
func (s *Store) verifySideCars(scs []*ethpb.BlobSidecar) error {
if len(scs) == 0 {
return errors.New("nil or empty blob sidecars")
@@ -75,7 +95,7 @@ func (s *Store) verifySideCars(scs []*ethpb.BlobSidecar) error {
r := scs[0].BlockRoot
p := scs[0].ProposerIndex
for i, sc := range scs {
for _, sc := range scs {
if sc.Slot != sl {
return fmt.Errorf("sidecar slot mismatch: %d != %d", sc.Slot, sl)
}
@@ -88,9 +108,6 @@ func (s *Store) verifySideCars(scs []*ethpb.BlobSidecar) error {
if sc.ProposerIndex != p {
return fmt.Errorf("sidecar proposer index mismatch: %d != %d", sc.ProposerIndex, p)
}
if sc.Index != uint64(i) {
return fmt.Errorf("sidecar index mismatch: %d != %d", sc.Index, i)
}
}
return nil
}

View File

@@ -157,11 +157,26 @@ func TestStore_BlobSidecars(t *testing.T) {
require.ErrorIs(t, ErrNotFound, err)
require.Equal(t, 0, len(got))
})
t.Run("saving blob different times", func(t *testing.T) {
db := setupDB(t)
scs := generateBlobSidecars(t, fieldparams.MaxBlobsPerBlock)
scs0 := scs[0:2]
scs1 := scs[2:4]
scs2 := scs[4:6]
require.NoError(t, db.SaveBlobSidecar(ctx, scs0))
require.NoError(t, db.SaveBlobSidecar(ctx, scs1))
require.NoError(t, db.SaveBlobSidecar(ctx, scs2))
saved, err := db.BlobSidecarsByRoot(ctx, bytesutil.ToBytes32(scs0[0].BlockRoot))
require.NoError(t, err)
require.NoError(t, equalBlobSlices(saved, scs))
})
t.Run("saving a new blob for rotation", func(t *testing.T) {
db := setupDB(t)
scs := generateBlobSidecars(t, fieldparams.MaxBlobsPerBlock)
require.NoError(t, db.SaveBlobSidecar(ctx, scs))
require.Equal(t, int(fieldparams.MaxBlobsPerBlock), len(scs))
require.Equal(t, fieldparams.MaxBlobsPerBlock, len(scs))
oldBlockRoot := scs[0].BlockRoot
got, err := db.BlobSidecarsByRoot(ctx, bytesutil.ToBytes32(oldBlockRoot))
require.NoError(t, err)
@@ -227,7 +242,6 @@ func TestStore_verifySideCars(t *testing.T) {
{name: "invalid proposer index", scs: []*ethpb.BlobSidecar{{ProposerIndex: 1}, {ProposerIndex: 2}}, error: "sidecar proposer index mismatch: 2 != 1"},
{name: "invalid root", scs: []*ethpb.BlobSidecar{{BlockRoot: []byte{1}}, {BlockRoot: []byte{2}}}, error: "sidecar root mismatch: 02 != 01"},
{name: "invalid parent root", scs: []*ethpb.BlobSidecar{{BlockParentRoot: []byte{1}}, {BlockParentRoot: []byte{2}}}, error: "sidecar parent root mismatch: 02 != 01"},
{name: "invalid side index", scs: []*ethpb.BlobSidecar{{Index: 0}, {Index: 0}}, error: "sidecar index mismatch: 0 != 1"},
{name: "happy path", scs: []*ethpb.BlobSidecar{{Index: 0}, {Index: 1}}, error: ""},
}
for _, tt := range tests {

View File

@@ -18,7 +18,9 @@ func (s *Service) blobSubscriber(ctx context.Context, msg proto.Message) error {
s.setSeenBlobIndex(b.Message.Blob, b.Message.Index)
// TODO: Store blobs in cache. Will be addressed in subsequent PR.
if err := s.cfg.beaconDB.SaveBlobSidecar(ctx, []*eth.BlobSidecar{b.Message}); err != nil {
return err
}
return nil
}