From 96f1ebf7069d792e330d4cc618bdfea07180c03b Mon Sep 17 00:00:00 2001 From: kasey <489222+kasey@users.noreply.github.com> Date: Tue, 1 Apr 2025 15:16:58 -0500 Subject: [PATCH] more efficient ancestry db queries, stategen (#15063) Co-authored-by: Kasey --- beacon-chain/db/filters/BUILD.bazel | 10 ++- beacon-chain/db/filters/errors.go | 9 +++ beacon-chain/db/filters/filter.go | 52 +++++++++++++- beacon-chain/db/kv/blocks.go | 78 +++++++++++++++++++++ beacon-chain/state/stategen/getter.go | 5 ++ beacon-chain/state/stategen/replay.go | 80 +++++----------------- beacon-chain/state/stategen/replay_test.go | 57 +++++++-------- changelog/kasey_ancestry-queries.md | 2 + 8 files changed, 195 insertions(+), 98 deletions(-) create mode 100644 beacon-chain/db/filters/errors.go create mode 100644 changelog/kasey_ancestry-queries.md diff --git a/beacon-chain/db/filters/BUILD.bazel b/beacon-chain/db/filters/BUILD.bazel index cd94eefe3b..e4dc16069d 100644 --- a/beacon-chain/db/filters/BUILD.bazel +++ b/beacon-chain/db/filters/BUILD.bazel @@ -2,13 +2,19 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = ["filter.go"], + srcs = [ + "errors.go", + "filter.go", + ], importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filters", visibility = [ "//beacon-chain:__subpackages__", "//tools:__subpackages__", ], - deps = ["//consensus-types/primitives:go_default_library"], + deps = [ + "//consensus-types/primitives:go_default_library", + "@com_github_pkg_errors//:go_default_library", + ], ) go_test( diff --git a/beacon-chain/db/filters/errors.go b/beacon-chain/db/filters/errors.go new file mode 100644 index 0000000000..a27fc1a2bc --- /dev/null +++ b/beacon-chain/db/filters/errors.go @@ -0,0 +1,9 @@ +package filters + +import "errors" + +var ( + ErrIncompatibleFilters = errors.New("combination of filters is not valid") + ErrNotSet = errors.New("filter was not set") + ErrInvalidQuery = errors.New("invalid query") +) diff --git a/beacon-chain/db/filters/filter.go b/beacon-chain/db/filters/filter.go index f617f64b7b..1625ad034e 100644 --- a/beacon-chain/db/filters/filter.go +++ b/beacon-chain/db/filters/filter.go @@ -14,7 +14,10 @@ // } package filters -import primitives "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" +import ( + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" +) // FilterType defines an enum which is used as the keys in a map that tracks // set attribute filters for data as part of the `FilterQuery` struct type. @@ -45,10 +48,34 @@ const ( SlotStep ) +// SlotRoot is the slot and root of a single block. +type SlotRoot struct { + Slot primitives.Slot + Root [32]byte +} + +// AncestryQuery is a special query that describes a chain of blocks that satisfies the invariant of: +// blocks[n].parent_root == blocks[n-1].root. +type AncestryQuery struct { + // Slot of oldest to return. + Earliest primitives.Slot + // Descendent that all ancestors in chain must descend from. + Descendent SlotRoot + set bool +} + +func (aq AncestryQuery) Span() primitives.Slot { + if aq.Earliest > aq.Descendent.Slot { + return 0 + } + return (aq.Descendent.Slot - aq.Earliest) + 1 // +1 to include upper bound +} + // QueryFilter defines a generic interface for type-asserting // specific filters to use in querying DB objects. type QueryFilter struct { - queries map[FilterType]interface{} + queries map[FilterType]interface{} + ancestry AncestryQuery } // NewFilter instantiates a new QueryFilter type used to build filters for @@ -150,3 +177,24 @@ func (q *QueryFilter) SimpleSlotRange() (primitives.Slot, primitives.Slot, bool) } return start, end, true } + +// SetAncestryQuery sets the filter to be an ancestryQuery. Note that this filter type is exclusive with +// other filters, so call ing GetAncestryQuery will return an error if other values are set. +func (q *QueryFilter) SetAncestryQuery(aq AncestryQuery) *QueryFilter { + aq.set = true + q.ancestry = aq + return q +} + +func (q *QueryFilter) GetAncestryQuery() (AncestryQuery, error) { + if !q.ancestry.set { + return q.ancestry, ErrNotSet + } + if len(q.queries) > 0 { + return q.ancestry, errors.Wrap(ErrIncompatibleFilters, "AncestryQuery cannot be combined with other filters") + } + if q.ancestry.Earliest > q.ancestry.Descendent.Slot { + return q.ancestry, errors.Wrap(ErrInvalidQuery, "descendent slot must come after earliest slot") + } + return q.ancestry, nil +} diff --git a/beacon-chain/db/kv/blocks.go b/beacon-chain/db/kv/blocks.go index 4edf714d89..0ae31f1394 100644 --- a/beacon-chain/db/kv/blocks.go +++ b/beacon-chain/db/kv/blocks.go @@ -118,10 +118,88 @@ func (s *Store) HeadBlock(ctx context.Context) (interfaces.ReadOnlySignedBeaconB return headBlock, err } +// blocksAncestryQuery returns all blocks *before* the descendent block; +// that is: inclusive of q.Earliest, exclusive of q.Descendent.Slot. +func (s *Store) blocksAncestryQuery(ctx context.Context, q filters.AncestryQuery) ([]interfaces.ReadOnlySignedBeaconBlock, [][32]byte, error) { + // Save resources if no blocks will be found by the query. + if q.Span() < 1 { + return nil, nil, filters.ErrInvalidQuery + } + + blocks := make([]interfaces.ReadOnlySignedBeaconBlock, 0, q.Span()) + roots := make([][32]byte, 0, q.Span()) + // Handle edge case where start and end are equal; slotRootsInRange would see end < start and err. + // So, just grab the descendent in its own tx and stop there. + if q.Span() == 1 { + err := s.db.View(func(tx *bolt.Tx) error { + descendent, err := s.getBlock(ctx, q.Descendent.Root, tx) + if err != nil { + return errors.Wrap(err, "descendent block not in db") + } + blocks = append(blocks, descendent) + roots = append(roots, q.Descendent.Root) + return nil + }) + return blocks, roots, err + } + + // stop before the descendent slot since it is determined by the query + sr, err := s.slotRootsInRange(ctx, q.Earliest, q.Descendent.Slot-1, -1) + if err != nil { + return nil, nil, err + } + err = s.db.View(func(tx *bolt.Tx) error { + descendent, err := s.getBlock(ctx, q.Descendent.Root, tx) + if err != nil { + return errors.Wrap(err, "descendent block not in db") + } + proot := descendent.Block().ParentRoot() + lowest := descendent.Block().Slot() + blocks = append(blocks, descendent) + roots = append(roots, q.Descendent.Root) + // slotRootsInRange returns the roots in descending order + for _, prev := range sr { + if prev.slot < q.Earliest { + return nil + } + if prev.slot >= lowest { + continue + } + if prev.root == proot { + p, err := s.getBlock(ctx, prev.root, tx) + if err != nil { + return err + } + roots = append(roots, prev.root) + blocks = append(blocks, p) + proot = p.Block().ParentRoot() + lowest = p.Block().Slot() + } + } + return nil + }) + if err != nil { + return nil, nil, err + } + slices.Reverse(roots) + slices.Reverse(blocks) + + return blocks, roots, err +} + // Blocks retrieves a list of beacon blocks and its respective roots by filter criteria. func (s *Store) Blocks(ctx context.Context, f *filters.QueryFilter) ([]interfaces.ReadOnlySignedBeaconBlock, [][32]byte, error) { ctx, span := trace.StartSpan(ctx, "BeaconDB.Blocks") defer span.End() + + if q, err := f.GetAncestryQuery(); err == nil { + return s.blocksAncestryQuery(ctx, q) + } else { + if !errors.Is(err, filters.ErrNotSet) { + return nil, nil, err + } + } + blocks := make([]interfaces.ReadOnlySignedBeaconBlock, 0) blockRoots := make([][32]byte, 0) diff --git a/beacon-chain/state/stategen/getter.go b/beacon-chain/state/stategen/getter.go index c1033a5a7c..690fe1c055 100644 --- a/beacon-chain/state/stategen/getter.go +++ b/beacon-chain/state/stategen/getter.go @@ -121,6 +121,11 @@ func (s *State) StateByRootInitialSync(ctx context.Context, blockRoot [32]byte) return cachedInfo.state, nil } + if s.beaconDB.HasState(ctx, blockRoot) { + s, err := s.beaconDB.State(ctx, blockRoot) + return s, errors.Wrap(err, "failed to retrieve init-sync state from db") + } + startState, err := s.latestAncestor(ctx, blockRoot) if err != nil { return nil, errors.Wrap(err, "could not get ancestor state") diff --git a/beacon-chain/state/stategen/replay.go b/beacon-chain/state/stategen/replay.go index ef8c3e1089..4a4837817b 100644 --- a/beacon-chain/state/stategen/replay.go +++ b/beacon-chain/state/stategen/replay.go @@ -17,8 +17,6 @@ import ( ) // ReplayBlocks replays the input blocks on the input state until the target slot is reached. -// -// WARNING Blocks passed to the function must be in decreasing slots order. func (*State) replayBlocks( ctx context.Context, state state.BeaconState, @@ -37,22 +35,20 @@ func (*State) replayBlocks( }) rLog.Debug("Replaying state") // The input block list is sorted in decreasing slots order. - if len(signed) > 0 { - for i := len(signed) - 1; i >= 0; i-- { - if ctx.Err() != nil { - return nil, ctx.Err() - } - if state.Slot() >= targetSlot { - break - } - // A node shouldn't process the block if the block slot is lower than the state slot. - if state.Slot() >= signed[i].Block().Slot() { - continue - } - state, err = executeStateTransitionStateGen(ctx, state, signed[i]) - if err != nil { - return nil, err - } + for _, blk := range signed { + if ctx.Err() != nil { + return nil, ctx.Err() + } + if state.Slot() >= targetSlot { + break + } + // A node shouldn't process the block if the block slot is lower than the state slot. + if state.Slot() >= blk.Block().Slot() { + continue + } + state, err = executeStateTransitionStateGen(ctx, state, blk) + if err != nil { + return nil, err } } @@ -81,53 +77,13 @@ func (s *State) loadBlocks(ctx context.Context, startSlot, endSlot primitives.Sl if startSlot > endSlot { return nil, fmt.Errorf("start slot %d > end slot %d", startSlot, endSlot) } - filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot) - blocks, blockRoots, err := s.beaconDB.Blocks(ctx, filter) + query := filters.AncestryQuery{Earliest: startSlot, Descendent: filters.SlotRoot{Slot: endSlot, Root: endBlockRoot}} + filter := filters.NewFilter().SetAncestryQuery(query) + blocks, _, err := s.beaconDB.Blocks(ctx, filter) if err != nil { return nil, err } - // The retrieved blocks and block roots have to be in the same length given same filter. - if len(blocks) != len(blockRoots) { - return nil, errors.New("length of blocks and roots don't match") - } - // Return early if there's no block given the input. - length := len(blocks) - if length == 0 { - return nil, nil - } - - // The last retrieved block root has to match input end block root. - // Covers the edge case if there's multiple blocks on the same end slot, - // the end root may not be the last index in `blockRoots`. - for length >= 3 && blocks[length-1].Block().Slot() == blocks[length-2].Block().Slot() && blockRoots[length-1] != endBlockRoot { - if ctx.Err() != nil { - return nil, ctx.Err() - } - length-- - if blockRoots[length-2] == endBlockRoot { - length-- - break - } - } - - if blockRoots[length-1] != endBlockRoot { - return nil, errors.New("end block roots don't match") - } - - filteredBlocks := []interfaces.ReadOnlySignedBeaconBlock{blocks[length-1]} - // Starting from second to last index because the last block is already in the filtered block list. - for i := length - 2; i >= 0; i-- { - if ctx.Err() != nil { - return nil, ctx.Err() - } - b := filteredBlocks[len(filteredBlocks)-1] - if b.Block().ParentRoot() != blockRoots[i] { - continue - } - filteredBlocks = append(filteredBlocks, blocks[i]) - } - - return filteredBlocks, nil + return blocks, nil } // executeStateTransitionStateGen applies state transition on input historical state and block for state gen usages. diff --git a/beacon-chain/state/stategen/replay_test.go b/beacon-chain/state/stategen/replay_test.go index eac26cfc1e..f7e7e49f5f 100644 --- a/beacon-chain/state/stategen/replay_test.go +++ b/beacon-chain/state/stategen/replay_test.go @@ -280,13 +280,14 @@ func TestLoadBlocks_FirstBranch(t *testing.T) { require.NoError(t, err) wanted := []*ethpb.SignedBeaconBlock{ - savedBlocks[8], - savedBlocks[6], - savedBlocks[4], - savedBlocks[2], - savedBlocks[1], savedBlocks[0], + savedBlocks[1], + savedBlocks[2], + savedBlocks[4], + savedBlocks[6], + savedBlocks[8], } + require.Equal(t, len(wanted), len(filteredBlocks)) for i, block := range wanted { filteredBlocksPb, err := filteredBlocks[i].Proto() @@ -311,10 +312,10 @@ func TestLoadBlocks_SecondBranch(t *testing.T) { require.NoError(t, err) wanted := []*ethpb.SignedBeaconBlock{ - savedBlocks[5], - savedBlocks[3], - savedBlocks[1], savedBlocks[0], + savedBlocks[1], + savedBlocks[3], + savedBlocks[5], } for i, block := range wanted { @@ -340,14 +341,16 @@ func TestLoadBlocks_ThirdBranch(t *testing.T) { require.NoError(t, err) wanted := []*ethpb.SignedBeaconBlock{ - savedBlocks[7], - savedBlocks[6], - savedBlocks[4], - savedBlocks[2], - savedBlocks[1], savedBlocks[0], + savedBlocks[1], + savedBlocks[2], + savedBlocks[4], + savedBlocks[6], + savedBlocks[7], } + require.Equal(t, len(wanted), len(filteredBlocks)) + for i, block := range wanted { filteredBlocksPb, err := filteredBlocks[i].Proto() require.NoError(t, err) @@ -371,11 +374,12 @@ func TestLoadBlocks_SameSlots(t *testing.T) { require.NoError(t, err) wanted := []*ethpb.SignedBeaconBlock{ - savedBlocks[6], - savedBlocks[5], - savedBlocks[1], savedBlocks[0], + savedBlocks[1], + savedBlocks[5], + savedBlocks[6], } + require.Equal(t, len(wanted), len(filteredBlocks)) for i, block := range wanted { filteredBlocksPb, err := filteredBlocks[i].Proto() @@ -400,10 +404,11 @@ func TestLoadBlocks_SameEndSlots(t *testing.T) { require.NoError(t, err) wanted := []*ethpb.SignedBeaconBlock{ - savedBlocks[2], - savedBlocks[1], savedBlocks[0], + savedBlocks[1], + savedBlocks[2], } + require.Equal(t, len(wanted), len(filteredBlocks)) for i, block := range wanted { filteredBlocksPb, err := filteredBlocks[i].Proto() @@ -428,9 +433,10 @@ func TestLoadBlocks_SameEndSlotsWith2blocks(t *testing.T) { require.NoError(t, err) wanted := []*ethpb.SignedBeaconBlock{ - savedBlocks[1], savedBlocks[0], + savedBlocks[1], } + require.Equal(t, len(wanted), len(filteredBlocks)) for i, block := range wanted { filteredBlocksPb, err := filteredBlocks[i].Proto() @@ -441,19 +447,6 @@ func TestLoadBlocks_SameEndSlotsWith2blocks(t *testing.T) { } } -func TestLoadBlocks_BadStart(t *testing.T) { - beaconDB := testDB.SetupDB(t) - ctx := context.Background() - s := &State{ - beaconDB: beaconDB, - } - - roots, _, err := tree1(t, beaconDB, bytesutil.PadTo([]byte{'A'}, 32)) - require.NoError(t, err) - _, err = s.loadBlocks(ctx, 0, 5, roots[8]) - assert.ErrorContains(t, "end block roots don't match", err) -} - // tree1 constructs the following tree: // B0 - B1 - - B3 -- B5 // diff --git a/changelog/kasey_ancestry-queries.md b/changelog/kasey_ancestry-queries.md new file mode 100644 index 0000000000..72d0234f3e --- /dev/null +++ b/changelog/kasey_ancestry-queries.md @@ -0,0 +1,2 @@ +### Changed +- More efficient query method for stategen to retrieve blocks between a given state and the replay target block. This avoids attempting to look up blocks that are not needed for head replay queries, which may be missing due to a previous rollback bug.