mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-08 21:08:10 -05:00
more efficient ancestry db queries, stategen (#15063)
Co-authored-by: Kasey <kasey@users.noreply.github.com>
This commit is contained in:
@@ -2,13 +2,19 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["filter.go"],
|
||||
srcs = [
|
||||
"errors.go",
|
||||
"filter.go",
|
||||
],
|
||||
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filters",
|
||||
visibility = [
|
||||
"//beacon-chain:__subpackages__",
|
||||
"//tools:__subpackages__",
|
||||
],
|
||||
deps = ["//consensus-types/primitives:go_default_library"],
|
||||
deps = [
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
|
||||
9
beacon-chain/db/filters/errors.go
Normal file
9
beacon-chain/db/filters/errors.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package filters
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
ErrIncompatibleFilters = errors.New("combination of filters is not valid")
|
||||
ErrNotSet = errors.New("filter was not set")
|
||||
ErrInvalidQuery = errors.New("invalid query")
|
||||
)
|
||||
@@ -14,7 +14,10 @@
|
||||
// }
|
||||
package filters
|
||||
|
||||
import primitives "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
)
|
||||
|
||||
// FilterType defines an enum which is used as the keys in a map that tracks
|
||||
// set attribute filters for data as part of the `FilterQuery` struct type.
|
||||
@@ -45,10 +48,34 @@ const (
|
||||
SlotStep
|
||||
)
|
||||
|
||||
// SlotRoot is the slot and root of a single block.
|
||||
type SlotRoot struct {
|
||||
Slot primitives.Slot
|
||||
Root [32]byte
|
||||
}
|
||||
|
||||
// AncestryQuery is a special query that describes a chain of blocks that satisfies the invariant of:
|
||||
// blocks[n].parent_root == blocks[n-1].root.
|
||||
type AncestryQuery struct {
|
||||
// Slot of oldest to return.
|
||||
Earliest primitives.Slot
|
||||
// Descendent that all ancestors in chain must descend from.
|
||||
Descendent SlotRoot
|
||||
set bool
|
||||
}
|
||||
|
||||
func (aq AncestryQuery) Span() primitives.Slot {
|
||||
if aq.Earliest > aq.Descendent.Slot {
|
||||
return 0
|
||||
}
|
||||
return (aq.Descendent.Slot - aq.Earliest) + 1 // +1 to include upper bound
|
||||
}
|
||||
|
||||
// QueryFilter defines a generic interface for type-asserting
|
||||
// specific filters to use in querying DB objects.
|
||||
type QueryFilter struct {
|
||||
queries map[FilterType]interface{}
|
||||
queries map[FilterType]interface{}
|
||||
ancestry AncestryQuery
|
||||
}
|
||||
|
||||
// NewFilter instantiates a new QueryFilter type used to build filters for
|
||||
@@ -150,3 +177,24 @@ func (q *QueryFilter) SimpleSlotRange() (primitives.Slot, primitives.Slot, bool)
|
||||
}
|
||||
return start, end, true
|
||||
}
|
||||
|
||||
// SetAncestryQuery sets the filter to be an ancestryQuery. Note that this filter type is exclusive with
|
||||
// other filters, so call ing GetAncestryQuery will return an error if other values are set.
|
||||
func (q *QueryFilter) SetAncestryQuery(aq AncestryQuery) *QueryFilter {
|
||||
aq.set = true
|
||||
q.ancestry = aq
|
||||
return q
|
||||
}
|
||||
|
||||
func (q *QueryFilter) GetAncestryQuery() (AncestryQuery, error) {
|
||||
if !q.ancestry.set {
|
||||
return q.ancestry, ErrNotSet
|
||||
}
|
||||
if len(q.queries) > 0 {
|
||||
return q.ancestry, errors.Wrap(ErrIncompatibleFilters, "AncestryQuery cannot be combined with other filters")
|
||||
}
|
||||
if q.ancestry.Earliest > q.ancestry.Descendent.Slot {
|
||||
return q.ancestry, errors.Wrap(ErrInvalidQuery, "descendent slot must come after earliest slot")
|
||||
}
|
||||
return q.ancestry, nil
|
||||
}
|
||||
|
||||
@@ -118,10 +118,88 @@ func (s *Store) HeadBlock(ctx context.Context) (interfaces.ReadOnlySignedBeaconB
|
||||
return headBlock, err
|
||||
}
|
||||
|
||||
// blocksAncestryQuery returns all blocks *before* the descendent block;
|
||||
// that is: inclusive of q.Earliest, exclusive of q.Descendent.Slot.
|
||||
func (s *Store) blocksAncestryQuery(ctx context.Context, q filters.AncestryQuery) ([]interfaces.ReadOnlySignedBeaconBlock, [][32]byte, error) {
|
||||
// Save resources if no blocks will be found by the query.
|
||||
if q.Span() < 1 {
|
||||
return nil, nil, filters.ErrInvalidQuery
|
||||
}
|
||||
|
||||
blocks := make([]interfaces.ReadOnlySignedBeaconBlock, 0, q.Span())
|
||||
roots := make([][32]byte, 0, q.Span())
|
||||
// Handle edge case where start and end are equal; slotRootsInRange would see end < start and err.
|
||||
// So, just grab the descendent in its own tx and stop there.
|
||||
if q.Span() == 1 {
|
||||
err := s.db.View(func(tx *bolt.Tx) error {
|
||||
descendent, err := s.getBlock(ctx, q.Descendent.Root, tx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "descendent block not in db")
|
||||
}
|
||||
blocks = append(blocks, descendent)
|
||||
roots = append(roots, q.Descendent.Root)
|
||||
return nil
|
||||
})
|
||||
return blocks, roots, err
|
||||
}
|
||||
|
||||
// stop before the descendent slot since it is determined by the query
|
||||
sr, err := s.slotRootsInRange(ctx, q.Earliest, q.Descendent.Slot-1, -1)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
err = s.db.View(func(tx *bolt.Tx) error {
|
||||
descendent, err := s.getBlock(ctx, q.Descendent.Root, tx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "descendent block not in db")
|
||||
}
|
||||
proot := descendent.Block().ParentRoot()
|
||||
lowest := descendent.Block().Slot()
|
||||
blocks = append(blocks, descendent)
|
||||
roots = append(roots, q.Descendent.Root)
|
||||
// slotRootsInRange returns the roots in descending order
|
||||
for _, prev := range sr {
|
||||
if prev.slot < q.Earliest {
|
||||
return nil
|
||||
}
|
||||
if prev.slot >= lowest {
|
||||
continue
|
||||
}
|
||||
if prev.root == proot {
|
||||
p, err := s.getBlock(ctx, prev.root, tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
roots = append(roots, prev.root)
|
||||
blocks = append(blocks, p)
|
||||
proot = p.Block().ParentRoot()
|
||||
lowest = p.Block().Slot()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
slices.Reverse(roots)
|
||||
slices.Reverse(blocks)
|
||||
|
||||
return blocks, roots, err
|
||||
}
|
||||
|
||||
// Blocks retrieves a list of beacon blocks and its respective roots by filter criteria.
|
||||
func (s *Store) Blocks(ctx context.Context, f *filters.QueryFilter) ([]interfaces.ReadOnlySignedBeaconBlock, [][32]byte, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.Blocks")
|
||||
defer span.End()
|
||||
|
||||
if q, err := f.GetAncestryQuery(); err == nil {
|
||||
return s.blocksAncestryQuery(ctx, q)
|
||||
} else {
|
||||
if !errors.Is(err, filters.ErrNotSet) {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
blocks := make([]interfaces.ReadOnlySignedBeaconBlock, 0)
|
||||
blockRoots := make([][32]byte, 0)
|
||||
|
||||
|
||||
@@ -121,6 +121,11 @@ func (s *State) StateByRootInitialSync(ctx context.Context, blockRoot [32]byte)
|
||||
return cachedInfo.state, nil
|
||||
}
|
||||
|
||||
if s.beaconDB.HasState(ctx, blockRoot) {
|
||||
s, err := s.beaconDB.State(ctx, blockRoot)
|
||||
return s, errors.Wrap(err, "failed to retrieve init-sync state from db")
|
||||
}
|
||||
|
||||
startState, err := s.latestAncestor(ctx, blockRoot)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not get ancestor state")
|
||||
|
||||
@@ -17,8 +17,6 @@ import (
|
||||
)
|
||||
|
||||
// ReplayBlocks replays the input blocks on the input state until the target slot is reached.
|
||||
//
|
||||
// WARNING Blocks passed to the function must be in decreasing slots order.
|
||||
func (*State) replayBlocks(
|
||||
ctx context.Context,
|
||||
state state.BeaconState,
|
||||
@@ -37,22 +35,20 @@ func (*State) replayBlocks(
|
||||
})
|
||||
rLog.Debug("Replaying state")
|
||||
// The input block list is sorted in decreasing slots order.
|
||||
if len(signed) > 0 {
|
||||
for i := len(signed) - 1; i >= 0; i-- {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
if state.Slot() >= targetSlot {
|
||||
break
|
||||
}
|
||||
// A node shouldn't process the block if the block slot is lower than the state slot.
|
||||
if state.Slot() >= signed[i].Block().Slot() {
|
||||
continue
|
||||
}
|
||||
state, err = executeStateTransitionStateGen(ctx, state, signed[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, blk := range signed {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
if state.Slot() >= targetSlot {
|
||||
break
|
||||
}
|
||||
// A node shouldn't process the block if the block slot is lower than the state slot.
|
||||
if state.Slot() >= blk.Block().Slot() {
|
||||
continue
|
||||
}
|
||||
state, err = executeStateTransitionStateGen(ctx, state, blk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,53 +77,13 @@ func (s *State) loadBlocks(ctx context.Context, startSlot, endSlot primitives.Sl
|
||||
if startSlot > endSlot {
|
||||
return nil, fmt.Errorf("start slot %d > end slot %d", startSlot, endSlot)
|
||||
}
|
||||
filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot)
|
||||
blocks, blockRoots, err := s.beaconDB.Blocks(ctx, filter)
|
||||
query := filters.AncestryQuery{Earliest: startSlot, Descendent: filters.SlotRoot{Slot: endSlot, Root: endBlockRoot}}
|
||||
filter := filters.NewFilter().SetAncestryQuery(query)
|
||||
blocks, _, err := s.beaconDB.Blocks(ctx, filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// The retrieved blocks and block roots have to be in the same length given same filter.
|
||||
if len(blocks) != len(blockRoots) {
|
||||
return nil, errors.New("length of blocks and roots don't match")
|
||||
}
|
||||
// Return early if there's no block given the input.
|
||||
length := len(blocks)
|
||||
if length == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// The last retrieved block root has to match input end block root.
|
||||
// Covers the edge case if there's multiple blocks on the same end slot,
|
||||
// the end root may not be the last index in `blockRoots`.
|
||||
for length >= 3 && blocks[length-1].Block().Slot() == blocks[length-2].Block().Slot() && blockRoots[length-1] != endBlockRoot {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
length--
|
||||
if blockRoots[length-2] == endBlockRoot {
|
||||
length--
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if blockRoots[length-1] != endBlockRoot {
|
||||
return nil, errors.New("end block roots don't match")
|
||||
}
|
||||
|
||||
filteredBlocks := []interfaces.ReadOnlySignedBeaconBlock{blocks[length-1]}
|
||||
// Starting from second to last index because the last block is already in the filtered block list.
|
||||
for i := length - 2; i >= 0; i-- {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
b := filteredBlocks[len(filteredBlocks)-1]
|
||||
if b.Block().ParentRoot() != blockRoots[i] {
|
||||
continue
|
||||
}
|
||||
filteredBlocks = append(filteredBlocks, blocks[i])
|
||||
}
|
||||
|
||||
return filteredBlocks, nil
|
||||
return blocks, nil
|
||||
}
|
||||
|
||||
// executeStateTransitionStateGen applies state transition on input historical state and block for state gen usages.
|
||||
|
||||
@@ -280,13 +280,14 @@ func TestLoadBlocks_FirstBranch(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
wanted := []*ethpb.SignedBeaconBlock{
|
||||
savedBlocks[8],
|
||||
savedBlocks[6],
|
||||
savedBlocks[4],
|
||||
savedBlocks[2],
|
||||
savedBlocks[1],
|
||||
savedBlocks[0],
|
||||
savedBlocks[1],
|
||||
savedBlocks[2],
|
||||
savedBlocks[4],
|
||||
savedBlocks[6],
|
||||
savedBlocks[8],
|
||||
}
|
||||
require.Equal(t, len(wanted), len(filteredBlocks))
|
||||
|
||||
for i, block := range wanted {
|
||||
filteredBlocksPb, err := filteredBlocks[i].Proto()
|
||||
@@ -311,10 +312,10 @@ func TestLoadBlocks_SecondBranch(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
wanted := []*ethpb.SignedBeaconBlock{
|
||||
savedBlocks[5],
|
||||
savedBlocks[3],
|
||||
savedBlocks[1],
|
||||
savedBlocks[0],
|
||||
savedBlocks[1],
|
||||
savedBlocks[3],
|
||||
savedBlocks[5],
|
||||
}
|
||||
|
||||
for i, block := range wanted {
|
||||
@@ -340,14 +341,16 @@ func TestLoadBlocks_ThirdBranch(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
wanted := []*ethpb.SignedBeaconBlock{
|
||||
savedBlocks[7],
|
||||
savedBlocks[6],
|
||||
savedBlocks[4],
|
||||
savedBlocks[2],
|
||||
savedBlocks[1],
|
||||
savedBlocks[0],
|
||||
savedBlocks[1],
|
||||
savedBlocks[2],
|
||||
savedBlocks[4],
|
||||
savedBlocks[6],
|
||||
savedBlocks[7],
|
||||
}
|
||||
|
||||
require.Equal(t, len(wanted), len(filteredBlocks))
|
||||
|
||||
for i, block := range wanted {
|
||||
filteredBlocksPb, err := filteredBlocks[i].Proto()
|
||||
require.NoError(t, err)
|
||||
@@ -371,11 +374,12 @@ func TestLoadBlocks_SameSlots(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
wanted := []*ethpb.SignedBeaconBlock{
|
||||
savedBlocks[6],
|
||||
savedBlocks[5],
|
||||
savedBlocks[1],
|
||||
savedBlocks[0],
|
||||
savedBlocks[1],
|
||||
savedBlocks[5],
|
||||
savedBlocks[6],
|
||||
}
|
||||
require.Equal(t, len(wanted), len(filteredBlocks))
|
||||
|
||||
for i, block := range wanted {
|
||||
filteredBlocksPb, err := filteredBlocks[i].Proto()
|
||||
@@ -400,10 +404,11 @@ func TestLoadBlocks_SameEndSlots(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
wanted := []*ethpb.SignedBeaconBlock{
|
||||
savedBlocks[2],
|
||||
savedBlocks[1],
|
||||
savedBlocks[0],
|
||||
savedBlocks[1],
|
||||
savedBlocks[2],
|
||||
}
|
||||
require.Equal(t, len(wanted), len(filteredBlocks))
|
||||
|
||||
for i, block := range wanted {
|
||||
filteredBlocksPb, err := filteredBlocks[i].Proto()
|
||||
@@ -428,9 +433,10 @@ func TestLoadBlocks_SameEndSlotsWith2blocks(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
wanted := []*ethpb.SignedBeaconBlock{
|
||||
savedBlocks[1],
|
||||
savedBlocks[0],
|
||||
savedBlocks[1],
|
||||
}
|
||||
require.Equal(t, len(wanted), len(filteredBlocks))
|
||||
|
||||
for i, block := range wanted {
|
||||
filteredBlocksPb, err := filteredBlocks[i].Proto()
|
||||
@@ -441,19 +447,6 @@ func TestLoadBlocks_SameEndSlotsWith2blocks(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadBlocks_BadStart(t *testing.T) {
|
||||
beaconDB := testDB.SetupDB(t)
|
||||
ctx := context.Background()
|
||||
s := &State{
|
||||
beaconDB: beaconDB,
|
||||
}
|
||||
|
||||
roots, _, err := tree1(t, beaconDB, bytesutil.PadTo([]byte{'A'}, 32))
|
||||
require.NoError(t, err)
|
||||
_, err = s.loadBlocks(ctx, 0, 5, roots[8])
|
||||
assert.ErrorContains(t, "end block roots don't match", err)
|
||||
}
|
||||
|
||||
// tree1 constructs the following tree:
|
||||
// B0 - B1 - - B3 -- B5
|
||||
//
|
||||
|
||||
2
changelog/kasey_ancestry-queries.md
Normal file
2
changelog/kasey_ancestry-queries.md
Normal file
@@ -0,0 +1,2 @@
|
||||
### Changed
|
||||
- More efficient query method for stategen to retrieve blocks between a given state and the replay target block. This avoids attempting to look up blocks that are not needed for head replay queries, which may be missing due to a previous rollback bug.
|
||||
Reference in New Issue
Block a user