From e203f66fe0d4a52da9377442d840ee3bca1a25cd Mon Sep 17 00:00:00 2001 From: Preston Van Loon Date: Tue, 26 Nov 2019 22:32:56 -0800 Subject: [PATCH] DB Improvements: Snappy compression, remove some unnecessary batch / goroutines (#4125) * do not use batch for SaveAttestations * use snappy compression * Encode / decode everything with snappy * Add snappy migration path * batch is probably fine... * fix test * gofmt * Merge branch 'master' of github.com:prysmaticlabs/prysm into remove-batch-attestations * add sanity check * remove that thing * gaz * Merge branch 'master' of github.com:prysmaticlabs/prysm into remove-batch-attestations --- beacon-chain/db/kv/BUILD.bazel | 6 + beacon-chain/db/kv/archive.go | 13 +-- beacon-chain/db/kv/attestations.go | 106 ++++++++++------- beacon-chain/db/kv/attestations_test.go | 6 +- beacon-chain/db/kv/blocks.go | 123 +++++++++----------- beacon-chain/db/kv/checkpoint.go | 9 +- beacon-chain/db/kv/encoding.go | 34 ++++++ beacon-chain/db/kv/finalized_block_roots.go | 11 +- beacon-chain/db/kv/kv.go | 7 ++ beacon-chain/db/kv/migrate_snappy.go | 73 ++++++++++++ beacon-chain/db/kv/migrate_snappy_test.go | 48 ++++++++ beacon-chain/db/kv/operations.go | 5 +- beacon-chain/db/kv/schema.go | 3 + beacon-chain/db/kv/slashings.go | 9 +- beacon-chain/db/kv/state.go | 54 +++++---- shared/featureconfig/config.go | 19 +-- shared/featureconfig/flags.go | 5 + 17 files changed, 361 insertions(+), 170 deletions(-) create mode 100644 beacon-chain/db/kv/encoding.go create mode 100644 beacon-chain/db/kv/migrate_snappy.go create mode 100644 beacon-chain/db/kv/migrate_snappy_test.go diff --git a/beacon-chain/db/kv/BUILD.bazel b/beacon-chain/db/kv/BUILD.bazel index a279157afd..dc27235e11 100644 --- a/beacon-chain/db/kv/BUILD.bazel +++ b/beacon-chain/db/kv/BUILD.bazel @@ -9,8 +9,10 @@ go_library( "blocks.go", "checkpoint.go", "deposit_contract.go", + "encoding.go", "finalized_block_roots.go", "kv.go", + "migrate_snappy.go", "operations.go", "schema.go", "slashings.go", @@ -26,11 +28,13 @@ go_library( "//proto/beacon/db:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//shared/bytesutil:go_default_library", + "//shared/featureconfig:go_default_library", "//shared/sliceutil:go_default_library", "//shared/traceutil:go_default_library", "@com_github_boltdb_bolt//:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", + "@com_github_golang_snappy//:go_default_library", "@com_github_karlseguin_ccache//:go_default_library", "@com_github_mdlayher_prombolt//:go_default_library", "@com_github_pkg_errors//:go_default_library", @@ -53,6 +57,7 @@ go_test( "deposit_contract_test.go", "finalized_block_roots_test.go", "kv_test.go", + "migrate_snappy_test.go", "operations_test.go", "slashings_test.go", "state_test.go", @@ -63,6 +68,7 @@ go_test( "//beacon-chain/db/filters:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//shared/bytesutil:go_default_library", + "//shared/featureconfig:go_default_library", "//shared/params:go_default_library", "//shared/testutil:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", diff --git a/beacon-chain/db/kv/archive.go b/beacon-chain/db/kv/archive.go index ba986e775d..c7456d47de 100644 --- a/beacon-chain/db/kv/archive.go +++ b/beacon-chain/db/kv/archive.go @@ -4,7 +4,6 @@ import ( "context" "github.com/boltdb/bolt" - "github.com/gogo/protobuf/proto" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" @@ -25,7 +24,7 @@ func (k *Store) ArchivedActiveValidatorChanges(ctx context.Context, epoch uint64 return nil } target = &pb.ArchivedActiveSetChanges{} - return proto.Unmarshal(enc, target) + return decode(enc, target) }) return target, err } @@ -35,7 +34,7 @@ func (k *Store) SaveArchivedActiveValidatorChanges(ctx context.Context, epoch ui ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveArchivedActiveValidatorChanges") defer span.End() buf := uint64ToBytes(epoch) - enc, err := proto.Marshal(changes) + enc, err := encode(changes) if err != nil { return err } @@ -59,7 +58,7 @@ func (k *Store) ArchivedCommitteeInfo(ctx context.Context, epoch uint64) (*pb.Ar return nil } target = &pb.ArchivedCommitteeInfo{} - return proto.Unmarshal(enc, target) + return decode(enc, target) }) return target, err } @@ -69,7 +68,7 @@ func (k *Store) SaveArchivedCommitteeInfo(ctx context.Context, epoch uint64, inf ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveArchivedCommitteeInfo") defer span.End() buf := uint64ToBytes(epoch) - enc, err := proto.Marshal(info) + enc, err := encode(info) if err != nil { return err } @@ -127,7 +126,7 @@ func (k *Store) ArchivedValidatorParticipation(ctx context.Context, epoch uint64 return nil } target = ðpb.ValidatorParticipation{} - return proto.Unmarshal(enc, target) + return decode(enc, target) }) return target, err } @@ -137,7 +136,7 @@ func (k *Store) SaveArchivedValidatorParticipation(ctx context.Context, epoch ui ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveArchivedValidatorParticipation") defer span.End() buf := uint64ToBytes(epoch) - enc, err := proto.Marshal(part) + enc, err := encode(part) if err != nil { return err } diff --git a/beacon-chain/db/kv/attestations.go b/beacon-chain/db/kv/attestations.go index ef059cdff2..883199415b 100644 --- a/beacon-chain/db/kv/attestations.go +++ b/beacon-chain/db/kv/attestations.go @@ -3,11 +3,8 @@ package kv import ( "context" "fmt" - "strings" - "sync" "github.com/boltdb/bolt" - "github.com/gogo/protobuf/proto" "github.com/pkg/errors" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" @@ -30,7 +27,7 @@ func (k *Store) AttestationsByDataRoot(ctx context.Context, attDataRoot [32]byte return nil } ac := &dbpb.AttestationContainer{} - if err := proto.Unmarshal(enc, ac); err != nil { + if err := decode(enc, ac); err != nil { return err } atts = ac.ToAttestations() @@ -47,7 +44,7 @@ func (k *Store) Attestations(ctx context.Context, f *filters.QueryFilter) ([]*et ctx, span := trace.StartSpan(ctx, "BeaconDB.Attestations") defer span.End() atts := make([]*ethpb.Attestation, 0) - err := k.db.Batch(func(tx *bolt.Tx) error { + err := k.db.View(func(tx *bolt.Tx) error { bkt := tx.Bucket(attestationsBucket) // If no filter criteria are specified, return an error. @@ -70,7 +67,7 @@ func (k *Store) Attestations(ctx context.Context, f *filters.QueryFilter) ([]*et for i := 0; i < len(keys); i++ { encoded := bkt.Get(keys[i]) ac := &dbpb.AttestationContainer{} - if err := proto.Unmarshal(encoded, ac); err != nil { + if err := decode(encoded, ac); err != nil { return err } atts = append(atts, ac.ToAttestations()...) @@ -105,7 +102,7 @@ func (k *Store) DeleteAttestation(ctx context.Context, attDataRoot [32]byte) err return nil } ac := &dbpb.AttestationContainer{} - if err := proto.Unmarshal(enc, ac); err != nil { + if err := decode(enc, ac); err != nil { return err } indicesByBucket := createAttestationIndicesFromData(ac.Data, tx) @@ -120,24 +117,25 @@ func (k *Store) DeleteAttestation(ctx context.Context, attDataRoot [32]byte) err func (k *Store) DeleteAttestations(ctx context.Context, attDataRoots [][32]byte) error { ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteAttestations") defer span.End() - var wg sync.WaitGroup - errs := make([]string, 0) - wg.Add(len(attDataRoots)) - for _, r := range attDataRoots { - go func(w *sync.WaitGroup, root [32]byte) { - defer wg.Done() - if err := k.DeleteAttestation(ctx, root); err != nil { - errs = append(errs, err.Error()) - return + + return k.db.Update(func(tx *bolt.Tx) error { + bkt := tx.Bucket(attestationsBucket) + for _, attDataRoot := range attDataRoots { + enc := bkt.Get(attDataRoot[:]) + ac := &dbpb.AttestationContainer{} + if err := decode(enc, ac); err != nil { + return err } - return - }(&wg, r) - } - wg.Wait() - if len(errs) > 0 { - return fmt.Errorf("deleting attestations failed with %d errors: %s", len(errs), strings.Join(errs, ", ")) - } - return nil + indicesByBucket := createAttestationIndicesFromData(ac.Data, tx) + if err := deleteValueForIndices(indicesByBucket, attDataRoot[:], tx); err != nil { + return errors.Wrap(err, "could not delete root for DB indices") + } + if err := bkt.Delete(attDataRoot[:]); err != nil { + return err + } + } + return nil + }) } // SaveAttestation to the db. @@ -165,14 +163,14 @@ func (k *Store) SaveAttestation(ctx context.Context, att *ethpb.Attestation) err } existingEnc := bkt.Get(attDataRoot[:]) if existingEnc != nil { - if err := proto.Unmarshal(existingEnc, ac); err != nil { + if err := decode(existingEnc, ac); err != nil { return err } } ac.InsertAttestation(att) - enc, err := proto.Marshal(ac) + enc, err := encode(ac) if err != nil { return err } @@ -193,24 +191,48 @@ func (k *Store) SaveAttestation(ctx context.Context, att *ethpb.Attestation) err func (k *Store) SaveAttestations(ctx context.Context, atts []*ethpb.Attestation) error { ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveAttestations") defer span.End() - var wg sync.WaitGroup - errs := make([]string, 0) - wg.Add(len(atts)) - for _, a := range atts { - go func(w *sync.WaitGroup, att *ethpb.Attestation) { - defer wg.Done() - if err := k.SaveAttestation(ctx, att); err != nil { - errs = append(errs, err.Error()) - return + + err := k.db.Update(func(tx *bolt.Tx) error { + for _, att := range atts { + attDataRoot, err := ssz.HashTreeRoot(att.Data) + if err != nil { + return err } - return - }(&wg, a) + + bkt := tx.Bucket(attestationsBucket) + ac := &dbpb.AttestationContainer{ + Data: att.Data, + } + existingEnc := bkt.Get(attDataRoot[:]) + if existingEnc != nil { + if err := decode(existingEnc, ac); err != nil { + return err + } + } + + ac.InsertAttestation(att) + + enc, err := encode(ac) + if err != nil { + return err + } + + indicesByBucket := createAttestationIndicesFromData(att.Data, tx) + if err := updateValueForIndices(indicesByBucket, attDataRoot[:], tx); err != nil { + return errors.Wrap(err, "could not update DB indices") + } + + if err := bkt.Put(attDataRoot[:], enc); err != nil { + return err + } + } + return nil + }) + if err != nil { + traceutil.AnnotateError(span, err) } - wg.Wait() - if len(errs) > 0 { - return fmt.Errorf("deleting attestations failed with %d errors: %s", len(errs), strings.Join(errs, ", ")) - } - return nil + + return err } // createAttestationIndicesFromData takes in attestation data and returns diff --git a/beacon-chain/db/kv/attestations_test.go b/beacon-chain/db/kv/attestations_test.go index e7df84744b..8d19ece663 100644 --- a/beacon-chain/db/kv/attestations_test.go +++ b/beacon-chain/db/kv/attestations_test.go @@ -118,7 +118,11 @@ func TestStore_BoltDontPanic(t *testing.T) { for i := 0; i <= 100; i++ { att := ðpb.Attestation{ - Data: ðpb.AttestationData{Slot: uint64(i)}, + Data: ðpb.AttestationData{ + Slot: uint64(i), + Source: ðpb.Checkpoint{}, + Target: ðpb.Checkpoint{}, + }, AggregationBits: bitfield.Bitlist{0b11}, } ctx := context.Background() diff --git a/beacon-chain/db/kv/blocks.go b/beacon-chain/db/kv/blocks.go index e0c997ba55..e46634f7c3 100644 --- a/beacon-chain/db/kv/blocks.go +++ b/beacon-chain/db/kv/blocks.go @@ -4,12 +4,9 @@ import ( "bytes" "context" "fmt" - "strings" - "sync" "time" "github.com/boltdb/bolt" - "github.com/gogo/protobuf/proto" "github.com/pkg/errors" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" @@ -35,7 +32,7 @@ func (k *Store) Block(ctx context.Context, blockRoot [32]byte) (*ethpb.BeaconBlo return nil } block = ðpb.BeaconBlock{} - return proto.Unmarshal(enc, block) + return decode(enc, block) }) return block, err } @@ -56,7 +53,7 @@ func (k *Store) HeadBlock(ctx context.Context) (*ethpb.BeaconBlock, error) { return nil } headBlock = ðpb.BeaconBlock{} - return proto.Unmarshal(enc, headBlock) + return decode(enc, headBlock) }) return headBlock, err } @@ -113,7 +110,7 @@ func (k *Store) Blocks(ctx context.Context, f *filters.QueryFilter) ([]*ethpb.Be for i := 0; i < len(keys); i++ { encoded := bkt.Get(keys[i]) block := ðpb.BeaconBlock{} - if err := proto.Unmarshal(encoded, block); err != nil { + if err := decode(encoded, block); err != nil { return err } blocks = append(blocks, block) @@ -207,7 +204,7 @@ func (k *Store) DeleteBlock(ctx context.Context, blockRoot [32]byte) error { return nil } block := ðpb.BeaconBlock{} - if err := proto.Unmarshal(enc, block); err != nil { + if err := decode(enc, block); err != nil { return err } indicesByBucket := createBlockIndicesFromBlock(block) @@ -223,23 +220,29 @@ func (k *Store) DeleteBlock(ctx context.Context, blockRoot [32]byte) error { func (k *Store) DeleteBlocks(ctx context.Context, blockRoots [][32]byte) error { ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteBlocks") defer span.End() - var wg sync.WaitGroup - errs := make([]string, 0) - wg.Add(len(blockRoots)) - for _, r := range blockRoots { - go func(w *sync.WaitGroup, root [32]byte) { - defer w.Done() - if err := k.DeleteBlock(ctx, root); err != nil { - errs = append(errs, err.Error()) - return + + return k.db.Update(func(tx *bolt.Tx) error { + bkt := tx.Bucket(blocksBucket) + for _, blockRoot := range blockRoots { + enc := bkt.Get(blockRoot[:]) + if enc == nil { + return nil } - }(&wg, r) - } - wg.Wait() - if len(errs) > 0 { - return fmt.Errorf("deleting blocks failed with %d errors: %s", len(errs), strings.Join(errs, ", ")) - } - return nil + block := ðpb.BeaconBlock{} + if err := decode(enc, block); err != nil { + return err + } + indicesByBucket := createBlockIndicesFromBlock(block) + if err := deleteValueForIndices(indicesByBucket, blockRoot[:], tx); err != nil { + return errors.Wrap(err, "could not delete root for DB indices") + } + k.blockCache.Delete(string(blockRoot[:])) + if err := bkt.Delete(blockRoot[:]); err != nil { + return err + } + } + return nil + }) } // SaveBlock to the db. @@ -258,7 +261,7 @@ func (k *Store) SaveBlock(ctx context.Context, block *ethpb.BeaconBlock) error { if existingBlock := bkt.Get(blockRoot[:]); existingBlock != nil { return nil } - enc, err := proto.Marshal(block) + enc, err := encode(block) if err != nil { return err } @@ -271,56 +274,36 @@ func (k *Store) SaveBlock(ctx context.Context, block *ethpb.BeaconBlock) error { }) } -// saveBatchedBlock to the db. -func (k *Store) saveBatchedBlock(ctx context.Context, block *ethpb.BeaconBlock) error { - ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBlock") - defer span.End() - blockRoot, err := ssz.SigningRoot(block) - if err != nil { - return err - } - if v := k.blockCache.Get(string(blockRoot[:])); v != nil { - return nil - } - return k.db.Batch(func(tx *bolt.Tx) error { - bkt := tx.Bucket(blocksBucket) - if existingBlock := bkt.Get(blockRoot[:]); existingBlock != nil { - return nil - } - enc, err := proto.Marshal(block) - if err != nil { - return err - } - indicesByBucket := createBlockIndicesFromBlock(block) - if err := updateValueForIndices(indicesByBucket, blockRoot[:], tx); err != nil { - return errors.Wrap(err, "could not update DB indices") - } - k.blockCache.Set(string(blockRoot[:]), block, time.Hour) - return bkt.Put(blockRoot[:], enc) - }) -} - -// SaveBlocks via batch updates to the db. +// SaveBlocks via bulk updates to the db. func (k *Store) SaveBlocks(ctx context.Context, blocks []*ethpb.BeaconBlock) error { ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBlocks") defer span.End() - var wg sync.WaitGroup - errs := make([]string, 0) - wg.Add(len(blocks)) - for _, blk := range blocks { - go func(w *sync.WaitGroup, b *ethpb.BeaconBlock) { - defer w.Done() - if err := k.saveBatchedBlock(ctx, b); err != nil { - errs = append(errs, err.Error()) - return + + return k.db.Update(func(tx *bolt.Tx) error { + for _, block := range blocks { + blockRoot, err := ssz.SigningRoot(block) + if err != nil { + return err } - }(&wg, blk) - } - wg.Wait() - if len(errs) > 0 { - return fmt.Errorf("saving blocks failed with %d errors: %s", len(errs), strings.Join(errs, ", ")) - } - return nil + bkt := tx.Bucket(blocksBucket) + if existingBlock := bkt.Get(blockRoot[:]); existingBlock != nil { + return nil + } + enc, err := encode(block) + if err != nil { + return err + } + indicesByBucket := createBlockIndicesFromBlock(block) + if err := updateValueForIndices(indicesByBucket, blockRoot[:], tx); err != nil { + return errors.Wrap(err, "could not update DB indices") + } + k.blockCache.Set(string(blockRoot[:]), block, time.Hour) + if err := bkt.Put(blockRoot[:], enc); err != nil { + return err + } + } + return nil + }) } // SaveHeadBlockRoot to the db. diff --git a/beacon-chain/db/kv/checkpoint.go b/beacon-chain/db/kv/checkpoint.go index f4ce239c8e..357109540c 100644 --- a/beacon-chain/db/kv/checkpoint.go +++ b/beacon-chain/db/kv/checkpoint.go @@ -5,7 +5,6 @@ import ( "errors" "github.com/boltdb/bolt" - "github.com/gogo/protobuf/proto" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/shared/traceutil" "go.opencensus.io/trace" @@ -28,7 +27,7 @@ func (k *Store) JustifiedCheckpoint(ctx context.Context) (*ethpb.Checkpoint, err return nil } checkpoint = ðpb.Checkpoint{} - return proto.Unmarshal(enc, checkpoint) + return decode(enc, checkpoint) }) return checkpoint, err } @@ -48,7 +47,7 @@ func (k *Store) FinalizedCheckpoint(ctx context.Context) (*ethpb.Checkpoint, err return nil } checkpoint = ðpb.Checkpoint{} - return proto.Unmarshal(enc, checkpoint) + return decode(enc, checkpoint) }) return checkpoint, err } @@ -58,7 +57,7 @@ func (k *Store) SaveJustifiedCheckpoint(ctx context.Context, checkpoint *ethpb.C ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveJustifiedCheckpoint") defer span.End() - enc, err := proto.Marshal(checkpoint) + enc, err := encode(checkpoint) if err != nil { return err } @@ -73,7 +72,7 @@ func (k *Store) SaveFinalizedCheckpoint(ctx context.Context, checkpoint *ethpb.C ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveFinalizedCheckpoint") defer span.End() - enc, err := proto.Marshal(checkpoint) + enc, err := encode(checkpoint) if err != nil { return err } diff --git a/beacon-chain/db/kv/encoding.go b/beacon-chain/db/kv/encoding.go new file mode 100644 index 0000000000..b3f3879add --- /dev/null +++ b/beacon-chain/db/kv/encoding.go @@ -0,0 +1,34 @@ +package kv + +import ( + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prysmaticlabs/prysm/shared/featureconfig" +) + +func decode(data []byte, dst proto.Message) error { + if featureconfig.Get().EnableSnappyDBCompression { + var err error + data, err = snappy.Decode(nil, data) + if err != nil { + return err + } + } + if err := proto.Unmarshal(data, dst); err != nil { + return err + } + return nil +} + +func encode(msg proto.Message) ([]byte, error) { + enc, err := proto.Marshal(msg) + if err != nil { + return nil, err + } + + if !featureconfig.Get().EnableSnappyDBCompression { + return enc, nil + } + + return snappy.Encode(nil, enc), nil +} diff --git a/beacon-chain/db/kv/finalized_block_roots.go b/beacon-chain/db/kv/finalized_block_roots.go index 0f13b07eb1..12ed52c585 100644 --- a/beacon-chain/db/kv/finalized_block_roots.go +++ b/beacon-chain/db/kv/finalized_block_roots.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/boltdb/bolt" - "github.com/gogo/protobuf/proto" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/db/filters" @@ -52,7 +51,7 @@ func (kv *Store) updateFinalizedBlockRoots(ctx context.Context, tx *bolt.Tx, che // De-index recent finalized block roots, to be re-indexed. prevousFinalizedCheckpoint := ðpb.Checkpoint{} if b := bkt.Get(previousFinalizedCheckpointKey); b != nil { - if err := proto.Unmarshal(b, prevousFinalizedCheckpoint); err != nil { + if err := decode(b, prevousFinalizedCheckpoint); err != nil { traceutil.AnnotateError(span, err) return err } @@ -94,7 +93,7 @@ func (kv *Store) updateFinalizedBlockRoots(ctx context.Context, tx *bolt.Tx, che ChildRoot: previousRoot, } - enc, err := proto.Marshal(container) + enc, err := encode(container) if err != nil { traceutil.AnnotateError(span, err) return err @@ -107,12 +106,12 @@ func (kv *Store) updateFinalizedBlockRoots(ctx context.Context, tx *bolt.Tx, che // Found parent, loop exit condition. if parentBytes := bkt.Get(block.ParentRoot); parentBytes != nil { parent := &dbpb.FinalizedBlockRootContainer{} - if err := proto.Unmarshal(parentBytes, parent); err != nil { + if err := decode(parentBytes, parent); err != nil { traceutil.AnnotateError(span, err) return err } parent.ChildRoot = root - enc, err := proto.Marshal(parent) + enc, err := encode(parent) if err != nil { traceutil.AnnotateError(span, err) return err @@ -145,7 +144,7 @@ func (kv *Store) updateFinalizedBlockRoots(ctx context.Context, tx *bolt.Tx, che } // Update previous checkpoint - enc, err := proto.Marshal(checkpoint) + enc, err := encode(checkpoint) if err != nil { traceutil.AnnotateError(span, err) return err diff --git a/beacon-chain/db/kv/kv.go b/beacon-chain/db/kv/kv.go index fa6ccafa97..31c7bc12dc 100644 --- a/beacon-chain/db/kv/kv.go +++ b/beacon-chain/db/kv/kv.go @@ -80,10 +80,17 @@ func NewKVStore(dirPath string) (*Store, error) { blockSlotIndicesBucket, blockParentRootIndicesBucket, finalizedBlockRootsIndexBucket, + // Migration bucket. + migrationBucket, ) }); err != nil { return nil, err } + + if err := kv.ensureSnappy(); err != nil { + return nil, err + } + err = prometheus.Register(createBoltCollector(kv.db)) return kv, err diff --git a/beacon-chain/db/kv/migrate_snappy.go b/beacon-chain/db/kv/migrate_snappy.go new file mode 100644 index 0000000000..7fa02a89d3 --- /dev/null +++ b/beacon-chain/db/kv/migrate_snappy.go @@ -0,0 +1,73 @@ +package kv + +import ( + "errors" + + "github.com/boltdb/bolt" + "github.com/golang/snappy" + "github.com/prysmaticlabs/prysm/shared/featureconfig" + "github.com/sirupsen/logrus" +) + +var snappyKey = []byte("snappy") + +func (kv *Store) ensureSnappy() error { + var isMigrated bool + + kv.db.View(func(tx *bolt.Tx) error { + bkt := tx.Bucket(migrationBucket) + v := bkt.Get(snappyKey) + isMigrated = len(v) == 1 && v[0] == 0x01 + return nil + }) + + if !featureconfig.Get().EnableSnappyDBCompression { + if isMigrated { + return errors.New("beaconDB has been migrated to snappy compression, run with flag --snappy") + } + return nil + } + + if isMigrated { + return nil + } + + log := logrus.WithField("prefix", "kv") + log.Info("Compressing database to snappy compression. This might take a while...") + + bucketsToMigrate := [][]byte{ + attestationsBucket, + blocksBucket, + stateBucket, + proposerSlashingsBucket, + attesterSlashingsBucket, + voluntaryExitsBucket, + checkpointBucket, + archivedValidatorSetChangesBucket, + archivedCommitteeInfoBucket, + archivedBalancesBucket, + archivedValidatorParticipationBucket, + finalizedBlockRootsIndexBucket, + } + + return kv.db.Update(func(tx *bolt.Tx) error { + for _, b := range bucketsToMigrate { + log.WithField("bucket", string(b)).Debug("Compressing bucket.") + if err := migrateBucketToSnappy(tx.Bucket(b)); err != nil { + return err + } + } + bkt := tx.Bucket(migrationBucket) + return bkt.Put(snappyKey, []byte{0x01}) + }) +} + +func migrateBucketToSnappy(bkt *bolt.Bucket) error { + c := bkt.Cursor() + for key, val := c.First(); key != nil; key, val = c.Next() { + if err := bkt.Put(key, snappy.Encode(nil, val)); err != nil { + return err + } + } + return nil +} diff --git a/beacon-chain/db/kv/migrate_snappy_test.go b/beacon-chain/db/kv/migrate_snappy_test.go new file mode 100644 index 0000000000..b986efa4a3 --- /dev/null +++ b/beacon-chain/db/kv/migrate_snappy_test.go @@ -0,0 +1,48 @@ +package kv + +import ( + "context" + "testing" + + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/shared/featureconfig" +) + +// Sanity check that an object can be accessed after migration. +func TestStore_MigrateSnappy(t *testing.T) { + db := setupDB(t) + ctx := context.Background() + + block := ðpb.BeaconBlock{ + Slot: 200, + } + root, err := ssz.SigningRoot(block) + if err != nil { + t.Fatal(err) + } + if err := db.SaveBlock(ctx, block); err != nil { + t.Fatal(err) + } + path := db.databasePath + db.Close() + + c := featureconfig.Get() + c.EnableSnappyDBCompression = true + featureconfig.Init(c) + + db2, err := NewKVStore(path) + if err != nil { + t.Fatalf("Failed to instantiate DB: %v", err) + } + defer teardownDB(t, db2) + + blk, err := db.Block(ctx, root) + if err != nil { + t.Fatal(err) + } + + if !ssz.DeepEqual(blk, block) { + t.Fatal("Blocks not same") + } +} diff --git a/beacon-chain/db/kv/operations.go b/beacon-chain/db/kv/operations.go index be7bef85e9..ba98a0f4f6 100644 --- a/beacon-chain/db/kv/operations.go +++ b/beacon-chain/db/kv/operations.go @@ -4,7 +4,6 @@ import ( "context" "github.com/boltdb/bolt" - "github.com/gogo/protobuf/proto" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" "go.opencensus.io/trace" @@ -22,7 +21,7 @@ func (k *Store) VoluntaryExit(ctx context.Context, exitRoot [32]byte) (*ethpb.Vo return nil } exit = ðpb.VoluntaryExit{} - return proto.Unmarshal(enc, exit) + return decode(enc, exit) }) return exit, err } @@ -49,7 +48,7 @@ func (k *Store) SaveVoluntaryExit(ctx context.Context, exit *ethpb.VoluntaryExit if err != nil { return err } - enc, err := proto.Marshal(exit) + enc, err := encode(exit) if err != nil { return err } diff --git a/beacon-chain/db/kv/schema.go b/beacon-chain/db/kv/schema.go index 377a07cad0..6739992a5d 100644 --- a/beacon-chain/db/kv/schema.go +++ b/beacon-chain/db/kv/schema.go @@ -37,4 +37,7 @@ var ( depositContractAddressKey = []byte("deposit-contract") justifiedCheckpointKey = []byte("justified-checkpoint") finalizedCheckpointKey = []byte("finalized-checkpoint") + + // Migration bucket. + migrationBucket = []byte("migrations") ) diff --git a/beacon-chain/db/kv/slashings.go b/beacon-chain/db/kv/slashings.go index b92f5c9ce8..953b2fc1c0 100644 --- a/beacon-chain/db/kv/slashings.go +++ b/beacon-chain/db/kv/slashings.go @@ -4,7 +4,6 @@ import ( "context" "github.com/boltdb/bolt" - "github.com/gogo/protobuf/proto" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" "go.opencensus.io/trace" @@ -22,7 +21,7 @@ func (k *Store) ProposerSlashing(ctx context.Context, slashingRoot [32]byte) (*e return nil } slashing = ðpb.ProposerSlashing{} - return proto.Unmarshal(enc, slashing) + return decode(enc, slashing) }) return slashing, err } @@ -49,7 +48,7 @@ func (k *Store) SaveProposerSlashing(ctx context.Context, slashing *ethpb.Propos if err != nil { return err } - enc, err := proto.Marshal(slashing) + enc, err := encode(slashing) if err != nil { return err } @@ -81,7 +80,7 @@ func (k *Store) AttesterSlashing(ctx context.Context, slashingRoot [32]byte) (*e return nil } slashing = ðpb.AttesterSlashing{} - return proto.Unmarshal(enc, slashing) + return decode(enc, slashing) }) return slashing, err } @@ -108,7 +107,7 @@ func (k *Store) SaveAttesterSlashing(ctx context.Context, slashing *ethpb.Attest if err != nil { return err } - enc, err := proto.Marshal(slashing) + enc, err := encode(slashing) if err != nil { return err } diff --git a/beacon-chain/db/kv/state.go b/beacon-chain/db/kv/state.go index a2c410c3b9..dddd1554ec 100644 --- a/beacon-chain/db/kv/state.go +++ b/beacon-chain/db/kv/state.go @@ -3,12 +3,8 @@ package kv import ( "bytes" "context" - "fmt" - "strings" - "sync" "github.com/boltdb/bolt" - "github.com/gogo/protobuf/proto" "github.com/pkg/errors" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" @@ -91,7 +87,7 @@ func (k *Store) GenesisState(ctx context.Context) (*pb.BeaconState, error) { func (k *Store) SaveState(ctx context.Context, state *pb.BeaconState, blockRoot [32]byte) error { ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveState") defer span.End() - enc, err := proto.Marshal(state) + enc, err := encode(state) if err != nil { return err } @@ -116,8 +112,8 @@ func (k *Store) DeleteState(ctx context.Context, blockRoot [32]byte) error { checkpoint := ðpb.Checkpoint{} if enc == nil { checkpoint = ðpb.Checkpoint{Root: genesisBlockRoot} - } else { - proto.Unmarshal(enc, checkpoint) + } else if err := decode(enc, checkpoint); err != nil { + return err } // Safe guard against deleting genesis or finalized state. @@ -134,29 +130,39 @@ func (k *Store) DeleteState(ctx context.Context, blockRoot [32]byte) error { func (k *Store) DeleteStates(ctx context.Context, blockRoots [][32]byte) error { ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteStates") defer span.End() - var wg sync.WaitGroup - errs := make([]string, 0) - wg.Add(len(blockRoots)) - for _, r := range blockRoots { - go func(w *sync.WaitGroup, root [32]byte) { - defer w.Done() - if err := k.DeleteState(ctx, root); err != nil { - errs = append(errs, err.Error()) - return + + return k.db.Batch(func(tx *bolt.Tx) error { + bkt := tx.Bucket(blocksBucket) + genesisBlockRoot := bkt.Get(genesisBlockRootKey) + + bkt = tx.Bucket(checkpointBucket) + enc := bkt.Get(finalizedCheckpointKey) + checkpoint := ðpb.Checkpoint{} + if enc == nil { + checkpoint = ðpb.Checkpoint{Root: genesisBlockRoot} + } else if err := decode(enc, checkpoint); err != nil { + return err + } + + for _, blockRoot := range blockRoots { + // Safe guard against deleting genesis or finalized state. + if bytes.Equal(blockRoot[:], checkpoint.Root) || bytes.Equal(blockRoot[:], genesisBlockRoot) { + return errors.New("could not delete genesis or finalized state") } - }(&wg, r) - } - wg.Wait() - if len(errs) > 0 { - return fmt.Errorf("deleting states failed with %d errors: %s", len(errs), strings.Join(errs, ", ")) - } - return nil + + bkt = tx.Bucket(stateBucket) + if err := bkt.Delete(blockRoot[:]); err != nil { + return err + } + } + return nil + }) } // creates state from marshaled proto state bytes. func createState(enc []byte) (*pb.BeaconState, error) { protoState := &pb.BeaconState{} - err := proto.Unmarshal(enc, protoState) + err := decode(enc, protoState) if err != nil { return nil, errors.Wrap(err, "failed to unmarshal encoding") } diff --git a/shared/featureconfig/config.go b/shared/featureconfig/config.go index 146a4a08b0..5153b4d512 100644 --- a/shared/featureconfig/config.go +++ b/shared/featureconfig/config.go @@ -25,13 +25,14 @@ var log = logrus.WithField("prefix", "flags") // Flags is a struct to represent which features the client will perform on runtime. type Flags struct { - GenesisDelay bool // GenesisDelay when processing a chain start genesis event. - MinimalConfig bool // MinimalConfig as defined in the spec. - WriteSSZStateTransitions bool // WriteSSZStateTransitions to tmp directory. - InitSyncNoVerify bool // InitSyncNoVerify when initial syncing w/o verifying block's contents. - SkipBLSVerify bool // Skips BLS verification across the runtime. - EnableBackupWebhook bool // EnableBackupWebhook to allow database backups to trigger from monitoring port /db/backup. - PruneFinalizedStates bool // PruneFinalizedStates from the database. + GenesisDelay bool // GenesisDelay when processing a chain start genesis event. + MinimalConfig bool // MinimalConfig as defined in the spec. + WriteSSZStateTransitions bool // WriteSSZStateTransitions to tmp directory. + InitSyncNoVerify bool // InitSyncNoVerify when initial syncing w/o verifying block's contents. + SkipBLSVerify bool // Skips BLS verification across the runtime. + EnableBackupWebhook bool // EnableBackupWebhook to allow database backups to trigger from monitoring port /db/backup. + PruneFinalizedStates bool // PruneFinalizedStates from the database. + EnableSnappyDBCompression bool // EnableSnappyDBCompression in the database. // Cache toggles. EnableAttestationCache bool // EnableAttestationCache; see https://github.com/prysmaticlabs/prysm/issues/3106. @@ -125,6 +126,10 @@ func ConfigureBeaconChain(ctx *cli.Context) { log.Warn("Enabled active count cache.") cfg.EnableActiveCountCache = true } + if ctx.GlobalBool(enableSnappyDBCompressionFlag.Name) { + log.Warn("Enabled snappy compression in the database.") + cfg.EnableSnappyDBCompression = true + } Init(cfg) } diff --git a/shared/featureconfig/flags.go b/shared/featureconfig/flags.go index 0437685cf6..62ec25289f 100644 --- a/shared/featureconfig/flags.go +++ b/shared/featureconfig/flags.go @@ -73,6 +73,10 @@ var ( Name: "enable-skip-slots-cache", Usage: "Enables the skip slot cache to be used in the event of skipped slots.", } + enableSnappyDBCompressionFlag = cli.BoolFlag{ + Name: "snappy", + Usage: "Enables snappy compression in the database.", + } ) // Deprecated flags list. @@ -136,4 +140,5 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{ enableActiveIndicesCacheFlag, enableActiveCountCacheFlag, enableSkipSlotsCache, + enableSnappyDBCompressionFlag, }...)