mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 05:47:59 -05:00
Compare commits
1 Commits
fusaka-dev
...
backfill-d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b114d5cf99 |
@@ -190,3 +190,29 @@ func computeInfoCacheKey(nodeID enode.ID, custodyGroupCount uint64) [nodeInfoCac
|
||||
|
||||
return key
|
||||
}
|
||||
|
||||
// ColumnIndices is a map of column indices where the key is the column index and the value is a boolean.
|
||||
// The boolean could indicate different things, eg whether the column is needed (in the context of satisfying custody requirements)
|
||||
// or present (in the context of a custody check on disk or in cache).
|
||||
type ColumnIndices map[uint64]bool
|
||||
|
||||
// CopyTrueIndices allows callers to get a copy of the given ColumnIndices, filtering out any keys
|
||||
// where the value == `false`.
|
||||
func CopyTrueIndices(src ColumnIndices) ColumnIndices {
|
||||
dst := make(ColumnIndices, len(src))
|
||||
for k, v := range src {
|
||||
if v {
|
||||
dst[k] = true
|
||||
}
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
// ColumnIndicesFromSlice converts a slice of uint64 indices into the ColumnIndices equivalent.
|
||||
func ColumnIndicesFromSlice(indices []uint64) ColumnIndices {
|
||||
ci := make(ColumnIndices, len(indices))
|
||||
for _, index := range indices {
|
||||
ci[index] = true
|
||||
}
|
||||
return ci
|
||||
}
|
||||
|
||||
@@ -14,6 +14,12 @@ import (
|
||||
// IsDataAvailable guarantees that all blobs committed to in the block have been
|
||||
// durably persisted before returning a non-error value.
|
||||
type AvailabilityStore interface {
|
||||
IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error
|
||||
AvailabilityChecker
|
||||
Persist(current primitives.Slot, sc ...blocks.ROSidecar) error
|
||||
}
|
||||
|
||||
// AvailabilityChecker is the minimum interface needed to check if data is available for a block.
|
||||
// We should prefer this interface over AvailabilityStore in places where we don't need to persist blob data.
|
||||
type AvailabilityChecker interface {
|
||||
IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error
|
||||
}
|
||||
|
||||
@@ -216,6 +216,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
|
||||
beacon.BackfillOpts,
|
||||
backfill.WithVerifierWaiter(beacon.verifyInitWaiter),
|
||||
backfill.WithInitSyncWaiter(initSyncWaiter(ctx, beacon.initialSyncComplete)),
|
||||
backfill.WithCustodyInfo(beacon.custodyInfo),
|
||||
)
|
||||
|
||||
if err := registerServices(cliCtx, beacon, synchronizer, bfs); err != nil {
|
||||
@@ -1182,7 +1183,7 @@ func (b *BeaconNode) registerPrunerService(cliCtx *cli.Context) error {
|
||||
func (b *BeaconNode) RegisterBackfillService(cliCtx *cli.Context, bfs *backfill.Store) error {
|
||||
pa := peers.NewAssigner(b.fetchP2P().Peers(), b.forkChoicer)
|
||||
// TODO: Add backfill for data column storage
|
||||
bf, err := backfill.NewService(cliCtx.Context, bfs, b.BlobStorage, b.clockWaiter, b.fetchP2P(), pa, b.BackfillOpts...)
|
||||
bf, err := backfill.NewService(cliCtx.Context, bfs, b.BlobStorage, b.DataColumnStorage, b.clockWaiter, b.fetchP2P(), pa, b.BackfillOpts...)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error initializing backfill service")
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ func (a *Assigner) freshPeers() ([]peer.ID, error) {
|
||||
if flags.Get().MinimumSyncPeers < required {
|
||||
required = flags.Get().MinimumSyncPeers
|
||||
}
|
||||
_, peers := a.ps.BestFinalized(params.BeaconConfig().MaxPeersToSync, a.fc.FinalizedCheckpoint().Epoch)
|
||||
_, peers := a.ps.BestFinalized(-1, a.fc.FinalizedCheckpoint().Epoch)
|
||||
if len(peers) < required {
|
||||
log.WithFields(logrus.Fields{
|
||||
"suitable": len(peers),
|
||||
@@ -52,27 +52,33 @@ func (a *Assigner) freshPeers() ([]peer.ID, error) {
|
||||
return peers, nil
|
||||
}
|
||||
|
||||
type AssignmentFilter func([]peer.ID) []peer.ID
|
||||
|
||||
// Assign uses the "BestFinalized" method to select the best peers that agree on a canonical block
|
||||
// for the configured finalized epoch. At most `n` peers will be returned. The `busy` param can be used
|
||||
// to filter out peers that we know we don't want to connect to, for instance if we are trying to limit
|
||||
// the number of outbound requests to each peer from a given component.
|
||||
func (a *Assigner) Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) {
|
||||
func (a *Assigner) Assign(filter AssignmentFilter) ([]peer.ID, error) {
|
||||
best, err := a.freshPeers()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pickBest(busy, n, best), nil
|
||||
return filter(best), nil
|
||||
}
|
||||
|
||||
func pickBest(busy map[peer.ID]bool, n int, best []peer.ID) []peer.ID {
|
||||
ps := make([]peer.ID, 0, n)
|
||||
for _, p := range best {
|
||||
if len(ps) == n {
|
||||
return ps
|
||||
}
|
||||
if !busy[p] {
|
||||
ps = append(ps, p)
|
||||
// NotBusy is a filter that returns a list of peer.IDs with len() <= n, which are not in the `busy` map.
|
||||
// n == -1 will return all peers that are not busy.
|
||||
func NotBusy(busy map[peer.ID]bool, n int) AssignmentFilter {
|
||||
return func(peers []peer.ID) []peer.ID {
|
||||
ps := make([]peer.ID, 0)
|
||||
for _, p := range peers {
|
||||
if n > 0 && len(ps) == n {
|
||||
return ps
|
||||
}
|
||||
if !busy[p] {
|
||||
ps = append(ps, p)
|
||||
}
|
||||
}
|
||||
return ps
|
||||
}
|
||||
return ps
|
||||
}
|
||||
|
||||
@@ -18,8 +18,9 @@ func TestPickBest(t *testing.T) {
|
||||
expected []peer.ID
|
||||
}{
|
||||
{
|
||||
name: "",
|
||||
n: 0,
|
||||
name: "don't limit",
|
||||
n: 0,
|
||||
expected: best,
|
||||
},
|
||||
{
|
||||
name: "none busy",
|
||||
@@ -88,7 +89,8 @@ func TestPickBest(t *testing.T) {
|
||||
if c.best == nil {
|
||||
c.best = best
|
||||
}
|
||||
pb := pickBest(c.busy, c.n, c.best)
|
||||
filt := NotBusy(c.busy, c.n)
|
||||
pb := filt(c.best)
|
||||
require.Equal(t, len(c.expected), len(pb))
|
||||
for i := range c.expected {
|
||||
require.Equal(t, c.expected[i], pb[i])
|
||||
|
||||
@@ -771,7 +771,7 @@ func (p *Status) BestFinalized(maxPeers int, ourFinalizedEpoch primitives.Epoch)
|
||||
}
|
||||
|
||||
// Trim potential peers to at most maxPeers.
|
||||
if len(potentialPIDs) > maxPeers {
|
||||
if maxPeers > 0 && len(potentialPIDs) > maxPeers {
|
||||
potentialPIDs = potentialPIDs[:maxPeers]
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ go_library(
|
||||
"block_batcher.go",
|
||||
"broadcast_bls_changes.go",
|
||||
"context.go",
|
||||
"data_column_assignment.go",
|
||||
"data_columns.go",
|
||||
"data_columns_reconstruct.go",
|
||||
"data_columns_sampling.go",
|
||||
|
||||
@@ -6,6 +6,8 @@ go_library(
|
||||
"batch.go",
|
||||
"batcher.go",
|
||||
"blobs.go",
|
||||
"columns.go",
|
||||
"fulu_transition.go",
|
||||
"log.go",
|
||||
"metrics.go",
|
||||
"pool.go",
|
||||
@@ -18,6 +20,7 @@ go_library(
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/core/peerdas:go_default_library",
|
||||
"//beacon-chain/core/signing:go_default_library",
|
||||
"//beacon-chain/das:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
@@ -41,6 +44,7 @@ go_library(
|
||||
"//runtime:go_default_library",
|
||||
"//runtime/version:go_default_library",
|
||||
"//time/slots:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus:go_default_library",
|
||||
@@ -67,6 +71,7 @@ go_test(
|
||||
"//beacon-chain/das:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
"//beacon-chain/db/filesystem:go_default_library",
|
||||
"//beacon-chain/p2p/peers:go_default_library",
|
||||
"//beacon-chain/p2p/testing:go_default_library",
|
||||
"//beacon-chain/startup:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/das"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
@@ -38,8 +37,10 @@ func (s batchState) String() string {
|
||||
return "import_complete"
|
||||
case batchEndSequence:
|
||||
return "end_sequence"
|
||||
case batchSidecarSync:
|
||||
return "sidecar_sync"
|
||||
case batchSyncBlobs:
|
||||
return "sync_blobs"
|
||||
case batchSyncColumns:
|
||||
return "sync_columns"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
@@ -50,7 +51,9 @@ const (
|
||||
batchInit
|
||||
batchSequenced
|
||||
batchErrRetryable
|
||||
batchSidecarSync
|
||||
batchErrFatal
|
||||
batchSyncBlobs
|
||||
batchSyncColumns
|
||||
batchImportable
|
||||
batchImportComplete
|
||||
batchEndSequence
|
||||
@@ -72,9 +75,12 @@ type batch struct {
|
||||
err error
|
||||
state batchState
|
||||
busy peer.ID
|
||||
nextReqCols []uint64
|
||||
blockPid peer.ID
|
||||
blobPid peer.ID
|
||||
columnPid peer.ID
|
||||
bs *blobSync
|
||||
cs *columnSync
|
||||
}
|
||||
|
||||
func (b batch) logFields() logrus.Fields {
|
||||
@@ -93,6 +99,9 @@ func (b batch) logFields() logrus.Fields {
|
||||
if b.retries > 0 {
|
||||
f["retryAfter"] = b.retryAfter.String()
|
||||
}
|
||||
if b.state == batchSyncColumns {
|
||||
f["nextColumns"] = fmt.Sprintf("%v", b.nextReqCols)
|
||||
}
|
||||
return f
|
||||
}
|
||||
|
||||
@@ -136,22 +145,29 @@ func (b batch) blobRequest() *eth.BlobSidecarsByRangeRequest {
|
||||
}
|
||||
}
|
||||
|
||||
func (b batch) withResults(results verifiedROBlocks, bs *blobSync) batch {
|
||||
func (b batch) postBlockSync(results verifiedROBlocks, bs *blobSync, cs *columnSync) batch {
|
||||
b.results = results
|
||||
b.bs = bs
|
||||
b.cs = cs
|
||||
if bs.blobsNeeded() > 0 {
|
||||
return b.withState(batchSidecarSync)
|
||||
return b.withState(batchSyncBlobs)
|
||||
}
|
||||
if len(cs.columnsNeeded()) > 0 {
|
||||
return b.withState(batchSyncColumns)
|
||||
}
|
||||
return b.withState(batchImportable)
|
||||
}
|
||||
|
||||
func (b batch) postBlobSync() batch {
|
||||
func (b batch) postSidecarSync() batch {
|
||||
if b.blobsNeeded() > 0 {
|
||||
log.WithFields(b.logFields()).WithField("blobsMissing", b.blobsNeeded()).Error("Batch still missing blobs after downloading from peer")
|
||||
b.bs = nil
|
||||
b.results = []blocks.ROBlock{}
|
||||
return b.withState(batchErrRetryable)
|
||||
}
|
||||
if len(b.cs.columnsNeeded()) > 0 {
|
||||
return b.withState(batchSyncColumns)
|
||||
}
|
||||
return b.withState(batchImportable)
|
||||
}
|
||||
|
||||
@@ -187,6 +203,11 @@ func (b batch) withRetryableError(err error) batch {
|
||||
return b.withState(batchErrRetryable)
|
||||
}
|
||||
|
||||
func (b batch) withFatalError(err error) batch {
|
||||
b.err = errors.Wrap(err, "fatal erorr in batch")
|
||||
return b.withState(batchErrFatal)
|
||||
}
|
||||
|
||||
func (b batch) blobsNeeded() int {
|
||||
return b.bs.blobsNeeded()
|
||||
}
|
||||
@@ -195,8 +216,8 @@ func (b batch) blobResponseValidator() sync.BlobResponseValidation {
|
||||
return b.bs.validateNext
|
||||
}
|
||||
|
||||
func (b batch) availabilityStore() das.AvailabilityStore {
|
||||
return b.bs.store
|
||||
func (b batch) validatingColumnRequest() *validatingColumnRequest {
|
||||
return b.cs.newValidatingColumnRequest(b.nextReqCols)
|
||||
}
|
||||
|
||||
var batchBlockUntil = func(ctx context.Context, untilRetry time.Duration, b batch) error {
|
||||
@@ -223,6 +244,21 @@ func (b batch) waitUntilReady(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b batch) workComplete() bool {
|
||||
if b.state == batchImportable {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (b batch) selectPeer(matrix *sync.ColumnPeerRank, busy map[peer.ID]bool) (peer.ID, []uint64, error) {
|
||||
if b.state == batchSyncColumns {
|
||||
return matrix.HighestForIndices(b.cs.columnsNeeded(), busy)
|
||||
}
|
||||
peer, err := matrix.Lowest(busy)
|
||||
return peer, nil, err
|
||||
}
|
||||
|
||||
func sortBatchDesc(bb []batch) {
|
||||
sort.Slice(bb, func(i, j int) bool {
|
||||
return bb[i].end > bb[j].end
|
||||
|
||||
250
beacon-chain/sync/backfill/columns.go
Normal file
250
beacon-chain/sync/backfill/columns.go
Normal file
@@ -0,0 +1,250 @@
|
||||
package backfill
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/das"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v6/time/slots"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
errInvalidResponseOrder = errors.New("out of order DataColumnSidecar response")
|
||||
errColumnResponseSlotOutOfRange = errors.New("slot out of range for DataColumnSidecar response")
|
||||
errColumnIndexNotRequested = errors.New("index in DataColumnSidecar response not requested")
|
||||
)
|
||||
|
||||
type columnBatch struct {
|
||||
first primitives.Slot
|
||||
last primitives.Slot
|
||||
custodyRequirement peerdas.ColumnIndices
|
||||
blockColumnsByRoot map[[32]byte]*blockColumns
|
||||
peerRank *sync.ColumnPeerRank
|
||||
}
|
||||
|
||||
type blockColumns struct {
|
||||
remaining peerdas.ColumnIndices
|
||||
commitments [][]byte
|
||||
}
|
||||
|
||||
func (cs *columnBatch) needed() peerdas.ColumnIndices {
|
||||
if len(cs.custodyRequirement) == 0 {
|
||||
return nil
|
||||
}
|
||||
search := peerdas.CopyTrueIndices(cs.custodyRequirement)
|
||||
ci := make(peerdas.ColumnIndices, len(search))
|
||||
// avoid iterating every single block+index by only searching for indices
|
||||
// we haven't found yet.
|
||||
for _, v := range cs.blockColumnsByRoot {
|
||||
for col := range search {
|
||||
if v.remaining[col] {
|
||||
ci[col] = true
|
||||
// We found the column, so we can delete it from the search.
|
||||
delete(search, col)
|
||||
}
|
||||
}
|
||||
}
|
||||
return ci
|
||||
}
|
||||
|
||||
type columnSync struct {
|
||||
*columnBatch
|
||||
store das.AvailabilityStore
|
||||
current primitives.Slot
|
||||
}
|
||||
|
||||
func newColumnSync(b batch, blks verifiedROBlocks, current primitives.Slot, p p2p.P2P, vbs verifiedROBlocks, cfg *workerCfg) (*columnSync, error) {
|
||||
cb, err := buildColumnBatch(b, blks, p.NodeID(), cfg.custodyInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cb == nil {
|
||||
return &columnSync{}, nil
|
||||
}
|
||||
return &columnSync{
|
||||
columnBatch: cb,
|
||||
current: current,
|
||||
store: das.NewLazilyPersistentStoreColumn(cfg.cfs, p.NodeID(), cfg.ndcv, cfg.custodyInfo),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (cs *columnSync) blockColumns(root [32]byte) *blockColumns {
|
||||
if cs.columnBatch == nil {
|
||||
return nil
|
||||
}
|
||||
return cs.columnBatch.blockColumnsByRoot[root]
|
||||
}
|
||||
|
||||
func (cs *columnSync) columnsNeeded() peerdas.ColumnIndices {
|
||||
if cs.columnBatch == nil {
|
||||
return nil
|
||||
}
|
||||
return cs.columnBatch.needed()
|
||||
}
|
||||
|
||||
func (cs *columnSync) request(reqCols []uint64) *ethpb.DataColumnSidecarsByRangeRequest {
|
||||
return sync.DataColumnSidecarsByRangeRequest(reqCols, cs.first, cs.last)
|
||||
}
|
||||
|
||||
func (cs *columnSync) newValidatingColumnRequest(cols []uint64) *validatingColumnRequest {
|
||||
req := cs.request(cols)
|
||||
if req == nil {
|
||||
return nil
|
||||
}
|
||||
return &validatingColumnRequest{
|
||||
req: req,
|
||||
columns: peerdas.ColumnIndicesFromSlice(cols),
|
||||
cs: cs,
|
||||
}
|
||||
}
|
||||
|
||||
type validatingColumnRequest struct {
|
||||
last primitives.Slot
|
||||
req *ethpb.DataColumnSidecarsByRangeRequest
|
||||
columns map[uint64]bool
|
||||
cs *columnSync
|
||||
}
|
||||
|
||||
func (v *validatingColumnRequest) validate(cd blocks.RODataColumn) bool {
|
||||
return recordColumnSidecarDownload(cd, v.countedValidation(cd))
|
||||
}
|
||||
|
||||
func recordColumnSidecarDownload(cd blocks.RODataColumn, valid bool) bool {
|
||||
validity := "invalid"
|
||||
if valid {
|
||||
validity = "valid"
|
||||
}
|
||||
backfillDataColumnSidecarDownloaded.WithLabelValues(fmt.Sprintf("%d", cd.Index), validity).Inc()
|
||||
backfillBytesDataColumnSidecar.Add(float64(cd.SizeSSZ()))
|
||||
return valid
|
||||
}
|
||||
|
||||
// When we call Persist we'll get the verification checks that are provided by the availability store.
|
||||
// In addition to those checks this function calls rpcValidity which maintains a state machine across
|
||||
// response values to ensure that the response is valid in the context of the overall request,
|
||||
// like making sure that the block roots is one of the ones we expect based on the blocks we used to
|
||||
// construct the request. It also does cheap sanity checks on the DataColumnSidecar values like
|
||||
// ensuring that the commitments line up with the block.
|
||||
func (v *validatingColumnRequest) countedValidation(cd blocks.RODataColumn) bool {
|
||||
if err := v.rpcValidity(cd); err != nil {
|
||||
log.WithError(err).WithField("slot", cd.Slot()).WithField("index", cd.Index).Error("invalid data column sidecar response")
|
||||
return false
|
||||
}
|
||||
root := cd.BlockRoot()
|
||||
expected := v.cs.blockColumns(root)
|
||||
if expected == nil {
|
||||
return false
|
||||
}
|
||||
// We don't need this column, but we trust the column state machine verified we asked for it as part of a range request.
|
||||
// So we can just skip over it and not try to persist it.
|
||||
if !expected.remaining[cd.Index] {
|
||||
return true
|
||||
}
|
||||
if len(cd.KzgCommitments) != len(expected.commitments) {
|
||||
log.WithField("slot", cd.Slot()).WithField("index", cd.Index).Error("unexpected number of commitments in data column sidecar")
|
||||
return false
|
||||
}
|
||||
for i, cmt := range cd.KzgCommitments {
|
||||
if !bytes.Equal(cmt, expected.commitments[i]) {
|
||||
log.WithField("slot", cd.Slot()).WithField("index", cd.Index).WithField("cmtIndex", i).Error("commitment in data column sidecar does not match expected commitment")
|
||||
return false
|
||||
}
|
||||
}
|
||||
if err := v.cs.store.Persist(v.cs.current, blocks.NewSidecarFromDataColumnSidecar(cd)); err != nil {
|
||||
log.WithError(err).Error("failed to persist data column")
|
||||
return false
|
||||
}
|
||||
delete(expected.remaining, cd.Index)
|
||||
return true
|
||||
}
|
||||
|
||||
// rpcValidity checks that the individual DataColumnSidecar value is valid in the context of the overall response
|
||||
// respecting the p2p spec rules for DataColumnSidecarByRange responses:
|
||||
// - values are in the requsted slot range
|
||||
// - values are in slot order
|
||||
// - block roots are canonical wrt the blocks we believe are canonical
|
||||
// (assuming previous block response from another peer was honest)
|
||||
// - there are not too many values in the response
|
||||
// - the column index is one of the requested columns
|
||||
func (v *validatingColumnRequest) rpcValidity(col blocks.RODataColumn) error {
|
||||
slot := col.Slot()
|
||||
if v.last > slot {
|
||||
return errInvalidResponseOrder
|
||||
}
|
||||
if slot < v.req.StartSlot {
|
||||
return errors.Wrap(errColumnResponseSlotOutOfRange, "sidecar slot before request start")
|
||||
}
|
||||
if slot >= v.req.StartSlot+primitives.Slot(v.req.Count) {
|
||||
return errors.Wrap(errColumnResponseSlotOutOfRange, "sidecar slot after request end")
|
||||
}
|
||||
// This is an important check because we may have already satisfied this column for a given
|
||||
// block root but still requested it for the benefit of other blocks in the batch. So this check ensures
|
||||
// that it was part ofthe overall batch request.
|
||||
if !v.columns[col.Index] {
|
||||
return errColumnIndexNotRequested
|
||||
}
|
||||
v.last = col.Slot()
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildColumnBatch(b batch, fuluBlocks verifiedROBlocks, nodeID enode.ID, ci *peerdas.CustodyInfo) (*columnBatch, error) {
|
||||
if len(fuluBlocks) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
fuluStart := params.BeaconConfig().FuluForkEpoch
|
||||
// If the batch end slot or last result block are pre-fulu, so are the rest.
|
||||
if slots.ToEpoch(b.end) < fuluStart || slots.ToEpoch(fuluBlocks[len(fuluBlocks)-1].Block().Slot()) < fuluStart {
|
||||
return nil, nil
|
||||
}
|
||||
// The last block in the batch is in fulu, but the first one is not.
|
||||
// Find the index of the first fulu block to exclude the pre-fulu blocks.
|
||||
if slots.ToEpoch(fuluBlocks[0].Block().Slot()) < fuluStart {
|
||||
fuluStart := sort.Search(len(fuluBlocks), func(i int) bool {
|
||||
return slots.ToEpoch(fuluBlocks[i].Block().Slot()) >= fuluStart
|
||||
})
|
||||
fuluBlocks = fuluBlocks[fuluStart:]
|
||||
}
|
||||
// Get the custody group sampling size for the node.
|
||||
custodyGroupSamplingSize := ci.CustodyGroupSamplingSize(peerdas.Actual)
|
||||
peerInfo, _, err := peerdas.Info(nodeID, custodyGroupSamplingSize)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "peer info")
|
||||
}
|
||||
indices := peerdas.CopyTrueIndices(peerInfo.CustodyColumns)
|
||||
|
||||
summary := &columnBatch{
|
||||
custodyRequirement: indices,
|
||||
blockColumnsByRoot: make(map[[32]byte]*blockColumns, len(fuluBlocks)),
|
||||
}
|
||||
for _, b := range fuluBlocks {
|
||||
cmts, err := b.Block().Body().BlobKzgCommitments()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to get blob kzg commitments")
|
||||
}
|
||||
if len(cmts) == 0 {
|
||||
continue
|
||||
}
|
||||
slot := b.Block().Slot()
|
||||
if len(summary.blockColumnsByRoot) == 0 {
|
||||
summary.first = slot
|
||||
}
|
||||
summary.blockColumnsByRoot[b.Root()] = &blockColumns{
|
||||
remaining: peerdas.CopyTrueIndices(indices),
|
||||
commitments: cmts,
|
||||
}
|
||||
summary.last = slot
|
||||
}
|
||||
|
||||
return summary, nil
|
||||
}
|
||||
54
beacon-chain/sync/backfill/fulu_transition.go
Normal file
54
beacon-chain/sync/backfill/fulu_transition.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package backfill
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/das"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var errAvailabilityCheckerInvalid = errors.New("invalid availability checker state")
|
||||
|
||||
type multiStore struct {
|
||||
fuluStart primitives.Slot
|
||||
columnStore das.AvailabilityStore
|
||||
blobStore das.AvailabilityStore
|
||||
}
|
||||
|
||||
// Persist implements das.AvailabilityStore.
|
||||
var _ das.AvailabilityChecker = &multiStore{}
|
||||
|
||||
// IsDataAvailable implements the das.AvailabilityStore interface.
|
||||
func (m *multiStore) IsDataAvailable(ctx context.Context, current primitives.Slot, blk blocks.ROBlock) error {
|
||||
if blk.Block().Slot() < m.fuluStart {
|
||||
return m.checkAvailabilityWithFallback(ctx, m.blobStore, current, blk)
|
||||
}
|
||||
return m.checkAvailabilityWithFallback(ctx, m.columnStore, current, blk)
|
||||
}
|
||||
|
||||
func (m *multiStore) checkAvailabilityWithFallback(ctx context.Context, ac das.AvailabilityChecker, current primitives.Slot, blk blocks.ROBlock) error {
|
||||
if ac != nil {
|
||||
return ac.IsDataAvailable(ctx, current, blk)
|
||||
}
|
||||
cmts, err := blk.Block().Body().BlobKzgCommitments()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(cmts) > 0 {
|
||||
return errAvailabilityCheckerInvalid
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func newMultiStore(fuluStart primitives.Slot, b batch) *multiStore {
|
||||
s := &multiStore{fuluStart: fuluStart}
|
||||
if b.bs != nil && b.bs.store != nil {
|
||||
s.blobStore = b.bs.store
|
||||
}
|
||||
if b.cs != nil && b.cs.store != nil {
|
||||
s.columnStore = b.cs.store
|
||||
}
|
||||
return s
|
||||
}
|
||||
@@ -92,6 +92,33 @@ var (
|
||||
Buckets: []float64{100, 300, 1000, 2000, 4000, 8000},
|
||||
},
|
||||
)
|
||||
backfillBatchTimeDownloadingColumns = promauto.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "backfill_batch_columns_time_download",
|
||||
Help: "Time, in milliseconds, batch spent downloading DataColumnSidecars from peer.",
|
||||
Buckets: []float64{100, 300, 1000, 2000, 4000, 8000},
|
||||
},
|
||||
)
|
||||
backfillBatchTimeVerifyingColumns = promauto.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "backfill_batch_columns_time_verify",
|
||||
Help: "Time, in milliseconds, batch spent verifying DataColumnSidecars.",
|
||||
Buckets: []float64{100, 300, 1000, 2000, 4000, 8000},
|
||||
},
|
||||
)
|
||||
backfillBytesDataColumnSidecar = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "backfill_data_column_sidecar_bytes_downloaded",
|
||||
Help: "DataColumnSidecar bytes downloaded from peers for backfill.",
|
||||
},
|
||||
)
|
||||
backfillDataColumnSidecarDownloaded = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "backfill_data_column_sidecar_downloaded",
|
||||
Help: "Number of DataColumnSidecar values downloaded from peers for backfill.",
|
||||
},
|
||||
[]string{"index", "validity"},
|
||||
)
|
||||
)
|
||||
|
||||
func blobValidationMetrics(_ blocks.ROBlob) error {
|
||||
|
||||
@@ -2,22 +2,18 @@ package backfill
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type batchWorkerPool interface {
|
||||
spawn(ctx context.Context, n int, clock *startup.Clock, a PeerAssigner, v *verifier, cm sync.ContextByteVersions, blobVerifier verification.NewBlobVerifier, bfs *filesystem.BlobStorage)
|
||||
spawn(ctx context.Context, n int, a PeerAssigner, cfg *workerCfg)
|
||||
todo(b batch)
|
||||
complete() (batch, error)
|
||||
}
|
||||
@@ -26,11 +22,11 @@ type worker interface {
|
||||
run(context.Context)
|
||||
}
|
||||
|
||||
type newWorker func(id workerId, in, out chan batch, c *startup.Clock, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) worker
|
||||
type newWorker func(id workerId, in, out chan batch, cfg *workerCfg) worker
|
||||
|
||||
func defaultNewWorker(p p2p.P2P) newWorker {
|
||||
return func(id workerId, in, out chan batch, c *startup.Clock, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) worker {
|
||||
return newP2pWorker(id, p, in, out, c, v, cm, nbv, bfs)
|
||||
return func(id workerId, in, out chan batch, cfg *workerCfg) worker {
|
||||
return newP2pWorker(id, p, in, out, cfg)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,6 +41,8 @@ type p2pBatchWorkerPool struct {
|
||||
endSeq []batch
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
p2p p2p.P2P
|
||||
earliest primitives.Slot
|
||||
}
|
||||
|
||||
var _ batchWorkerPool = &p2pBatchWorkerPool{}
|
||||
@@ -59,14 +57,15 @@ func newP2PBatchWorkerPool(p p2p.P2P, maxBatches int) *p2pBatchWorkerPool {
|
||||
fromWorkers: make(chan batch),
|
||||
maxBatches: maxBatches,
|
||||
shutdownErr: make(chan error),
|
||||
p2p: p,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *p2pBatchWorkerPool) spawn(ctx context.Context, n int, c *startup.Clock, a PeerAssigner, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) {
|
||||
func (p *p2pBatchWorkerPool) spawn(ctx context.Context, n int, a PeerAssigner, cfg *workerCfg) {
|
||||
p.ctx, p.cancel = context.WithCancel(ctx)
|
||||
go p.batchRouter(a)
|
||||
for i := 0; i < n; i++ {
|
||||
go p.newWorker(workerId(i), p.toWorkers, p.fromWorkers, c, v, cm, nbv, bfs).run(p.ctx)
|
||||
go p.newWorker(workerId(i), p.toWorkers, p.fromWorkers, cfg).run(p.ctx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,7 +102,6 @@ func (p *p2pBatchWorkerPool) batchRouter(pa PeerAssigner) {
|
||||
busy := make(map[peer.ID]bool)
|
||||
todo := make([]batch, 0)
|
||||
rt := time.NewTicker(time.Second)
|
||||
earliest := primitives.Slot(math.MaxUint64)
|
||||
for {
|
||||
select {
|
||||
case b := <-p.toRouter:
|
||||
@@ -116,50 +114,72 @@ func (p *p2pBatchWorkerPool) batchRouter(pa PeerAssigner) {
|
||||
// to retry failed assignments.
|
||||
case b := <-p.fromWorkers:
|
||||
pid := b.busy
|
||||
busy[pid] = false
|
||||
if b.state == batchSidecarSync {
|
||||
todo = append(todo, b)
|
||||
sortBatchDesc(todo)
|
||||
} else {
|
||||
delete(busy, pid)
|
||||
if b.workComplete() {
|
||||
p.fromRouter <- b
|
||||
break
|
||||
}
|
||||
todo = append(todo, b)
|
||||
sortBatchDesc(todo)
|
||||
case <-p.ctx.Done():
|
||||
log.WithError(p.ctx.Err()).Info("p2pBatchWorkerPool context canceled, shutting down")
|
||||
p.shutdown(p.ctx.Err())
|
||||
return
|
||||
}
|
||||
if len(todo) == 0 {
|
||||
continue
|
||||
}
|
||||
// Try to assign as many outstanding batches as possible to peers and feed the assigned batches to workers.
|
||||
assigned, err := pa.Assign(busy, len(todo))
|
||||
var err error
|
||||
todo, err = p.processTodo(todo, pa, busy)
|
||||
if err != nil {
|
||||
if errors.Is(err, peers.ErrInsufficientSuitable) {
|
||||
// Transient error resulting from insufficient number of connected peers. Leave batches in
|
||||
// queue and get to them whenever the peer situation is resolved.
|
||||
continue
|
||||
}
|
||||
p.shutdown(err)
|
||||
return
|
||||
}
|
||||
for _, pid := range assigned {
|
||||
if err := todo[0].waitUntilReady(p.ctx); err != nil {
|
||||
log.WithError(p.ctx.Err()).Info("p2pBatchWorkerPool context canceled, shutting down")
|
||||
p.shutdown(p.ctx.Err())
|
||||
return
|
||||
}
|
||||
busy[pid] = true
|
||||
todo[0].busy = pid
|
||||
p.toWorkers <- todo[0].withPeer(pid)
|
||||
if todo[0].begin < earliest {
|
||||
earliest = todo[0].begin
|
||||
oldestBatch.Set(float64(earliest))
|
||||
}
|
||||
todo = todo[1:]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *p2pBatchWorkerPool) processTodo(todo []batch, pa PeerAssigner, busy map[peer.ID]bool) ([]batch, error) {
|
||||
if len(todo) == 0 {
|
||||
return todo, nil
|
||||
}
|
||||
notBusy, err := pa.Assign(peers.NotBusy(busy, -1))
|
||||
if err != nil {
|
||||
if errors.Is(err, peers.ErrInsufficientSuitable) {
|
||||
// Transient error resulting from insufficient number of connected peers. Leave batches in
|
||||
// queue and get to them whenever the peer situation is resolved.
|
||||
return todo, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
peerRank, err := sync.ComputeColumnPeerRank(notBusy, p.p2p)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to compute column matrix for peer assignment")
|
||||
return todo, nil
|
||||
}
|
||||
if len(notBusy) == 0 {
|
||||
log.Warn("No suitable peers available for batch assignment")
|
||||
return todo, nil
|
||||
}
|
||||
for i, b := range todo {
|
||||
pid, cols, err := b.selectPeer(peerRank, busy)
|
||||
if err != nil {
|
||||
log.WithField("not_busy", len(notBusy)).WithError(err).WithFields(b.logFields()).Error("Failed to select peer for batch")
|
||||
// Return the remaining todo items and allow the outer loop to control when we try again.
|
||||
return todo[i:], nil
|
||||
}
|
||||
busy[pid] = true
|
||||
b.busy = pid
|
||||
b.nextReqCols = cols
|
||||
p.toWorkers <- b.withPeer(pid)
|
||||
p.updateEarliest(b.begin)
|
||||
}
|
||||
return []batch{}, nil
|
||||
}
|
||||
|
||||
func (p *p2pBatchWorkerPool) updateEarliest(current primitives.Slot) {
|
||||
if current >= p.earliest {
|
||||
return
|
||||
}
|
||||
p.earliest = current
|
||||
oldestBatch.Set(float64(p.earliest))
|
||||
}
|
||||
|
||||
func (p *p2pBatchWorkerPool) shutdown(err error) {
|
||||
p.cancel()
|
||||
p.shutdownErr <- err
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
|
||||
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
|
||||
@@ -24,7 +25,7 @@ type mockAssigner struct {
|
||||
|
||||
// Assign satisfies the PeerAssigner interface so that mockAssigner can be used in tests
|
||||
// in place of the concrete p2p implementation of PeerAssigner.
|
||||
func (m mockAssigner) Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) {
|
||||
func (m mockAssigner) Assign(filter peers.AssignmentFilter) ([]peer.ID, error) {
|
||||
if m.err != nil {
|
||||
return nil, m.err
|
||||
}
|
||||
@@ -53,7 +54,8 @@ func TestPoolDetectAllEnded(t *testing.T) {
|
||||
ctxMap, err := sync.ContextByteVersionsForValRoot(bytesutil.ToBytes32(st.GenesisValidatorsRoot()))
|
||||
require.NoError(t, err)
|
||||
bfs := filesystem.NewEphemeralBlobStorage(t)
|
||||
pool.spawn(ctx, nw, startup.NewClock(time.Now(), [32]byte{}), ma, v, ctxMap, mockNewBlobVerifier, bfs)
|
||||
wcfg := &workerCfg{c: startup.NewClock(time.Now(), [32]byte{}), nbv: mockNewBlobVerifier, v: v, cm: ctxMap, bfs: bfs}
|
||||
pool.spawn(ctx, nw, ma, wcfg)
|
||||
br := batcher{min: 10, size: 10}
|
||||
endSeq := br.before(0)
|
||||
require.Equal(t, batchEndSequence, endSeq.state)
|
||||
@@ -72,7 +74,7 @@ type mockPool struct {
|
||||
todoChan chan batch
|
||||
}
|
||||
|
||||
func (m *mockPool) spawn(_ context.Context, _ int, _ *startup.Clock, _ PeerAssigner, _ *verifier, _ sync.ContextByteVersions, _ verification.NewBlobVerifier, _ *filesystem.BlobStorage) {
|
||||
func (m *mockPool) spawn(_ context.Context, _ int, _ PeerAssigner, _ *workerCfg) {
|
||||
}
|
||||
|
||||
func (m *mockPool) todo(b batch) {
|
||||
|
||||
@@ -4,11 +4,14 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
|
||||
@@ -32,14 +35,16 @@ type Service struct {
|
||||
batchSeq *batchSequencer
|
||||
batchSize uint64
|
||||
pool batchWorkerPool
|
||||
verifier *verifier
|
||||
ctxMap sync.ContextByteVersions
|
||||
p2p p2p.P2P
|
||||
pa PeerAssigner
|
||||
batchImporter batchImporter
|
||||
blobStore *filesystem.BlobStorage
|
||||
dcStore *filesystem.DataColumnStorage
|
||||
initSyncWaiter func() error
|
||||
complete chan struct{}
|
||||
workerCfg *workerCfg
|
||||
fuluStart primitives.Slot
|
||||
}
|
||||
|
||||
var _ runtime.Service = (*Service)(nil)
|
||||
@@ -48,23 +53,13 @@ var _ runtime.Service = (*Service)(nil)
|
||||
// to service an RPC blockRequest. The Assign method takes a map of peers that should be excluded,
|
||||
// allowing the caller to avoid making multiple concurrent requests to the same peer.
|
||||
type PeerAssigner interface {
|
||||
Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error)
|
||||
//Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error)
|
||||
Assign(filter peers.AssignmentFilter) ([]peer.ID, error)
|
||||
}
|
||||
|
||||
type minimumSlotter func(primitives.Slot) primitives.Slot
|
||||
type batchImporter func(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error)
|
||||
|
||||
func defaultBatchImporter(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error) {
|
||||
status := su.status()
|
||||
if err := b.ensureParent(bytesutil.ToBytes32(status.LowParentRoot)); err != nil {
|
||||
return status, err
|
||||
}
|
||||
// Import blocks to db and update db state to reflect the newly imported blocks.
|
||||
// Other parts of the beacon node may use the same StatusUpdater instance
|
||||
// via the coverage.AvailableBlocker interface to safely determine if a given slot has been backfilled.
|
||||
return su.fillBack(ctx, current, b.results, b.availabilityStore())
|
||||
}
|
||||
|
||||
// ServiceOption represents a functional option for the backfill service constructor.
|
||||
type ServiceOption func(*Service) error
|
||||
|
||||
@@ -137,48 +132,43 @@ func WithMinimumSlot(s primitives.Slot) ServiceOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithCustodyInfo(custodyInfo *peerdas.CustodyInfo) ServiceOption {
|
||||
return func(s *Service) error {
|
||||
if s.workerCfg == nil {
|
||||
s.workerCfg = &workerCfg{}
|
||||
}
|
||||
s.workerCfg.custodyInfo = custodyInfo
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewService initializes the backfill Service. Like all implementations of the Service interface,
|
||||
// the service won't begin its runloop until Start() is called.
|
||||
func NewService(ctx context.Context, su *Store, bStore *filesystem.BlobStorage, cw startup.ClockWaiter, p p2p.P2P, pa PeerAssigner, opts ...ServiceOption) (*Service, error) {
|
||||
func NewService(ctx context.Context, su *Store, bStore *filesystem.BlobStorage, dcStore *filesystem.DataColumnStorage, cw startup.ClockWaiter, p p2p.P2P, pa PeerAssigner, opts ...ServiceOption) (*Service, error) {
|
||||
s := &Service{
|
||||
ctx: ctx,
|
||||
store: su,
|
||||
blobStore: bStore,
|
||||
cw: cw,
|
||||
ms: minimumBackfillSlot,
|
||||
p2p: p,
|
||||
pa: pa,
|
||||
batchImporter: defaultBatchImporter,
|
||||
complete: make(chan struct{}),
|
||||
ctx: ctx,
|
||||
store: su,
|
||||
blobStore: bStore,
|
||||
dcStore: dcStore,
|
||||
cw: cw,
|
||||
ms: minimumBackfillSlot,
|
||||
p2p: p,
|
||||
pa: pa,
|
||||
complete: make(chan struct{}),
|
||||
fuluStart: slots.SafeEpochStartOrMax(params.BeaconConfig().FuluForkEpoch),
|
||||
}
|
||||
s.batchImporter = s.defaultBatchImporter
|
||||
for _, o := range opts {
|
||||
if err := o(s); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
s.pool = newP2PBatchWorkerPool(p, s.nWorkers)
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Service) initVerifier(ctx context.Context) (*verifier, sync.ContextByteVersions, error) {
|
||||
cps, err := s.store.originState(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
keys, err := cps.PublicKeys()
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "unable to retrieve public keys for all validators in the origin state")
|
||||
}
|
||||
vr := cps.GenesisValidatorsRoot()
|
||||
ctxMap, err := sync.ContextByteVersionsForValRoot(bytesutil.ToBytes32(vr))
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrapf(err, "unable to initialize context version map using genesis validator root %#x", vr)
|
||||
}
|
||||
v, err := newBackfillVerifier(vr, keys)
|
||||
return v, ctxMap, err
|
||||
}
|
||||
|
||||
func (s *Service) updateComplete() bool {
|
||||
b, err := s.pool.complete()
|
||||
if err != nil {
|
||||
@@ -229,6 +219,18 @@ func (s *Service) importBatches(ctx context.Context) {
|
||||
backfillRemainingBatches.Set(float64(nt))
|
||||
}
|
||||
|
||||
func (s *Service) defaultBatchImporter(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error) {
|
||||
status := su.status()
|
||||
if err := b.ensureParent(bytesutil.ToBytes32(status.LowParentRoot)); err != nil {
|
||||
return status, err
|
||||
}
|
||||
// Import blocks to db and update db state to reflect the newly imported blocks.
|
||||
// Other parts of the beacon node may use the same StatusUpdater instance
|
||||
// via the coverage.AvailableBlocker interface to safely determine if a given slot has been backfilled.
|
||||
|
||||
return su.fillBack(ctx, current, b.results, newMultiStore(s.fuluStart, b))
|
||||
}
|
||||
|
||||
func (s *Service) scheduleTodos() {
|
||||
batches, err := s.batchSeq.sequence()
|
||||
if err != nil {
|
||||
@@ -260,25 +262,19 @@ func (s *Service) Start() {
|
||||
log.Info("Backfill service is shutting down")
|
||||
cancel()
|
||||
}()
|
||||
clock, err := s.cw.WaitForClock(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Backfill service failed to start while waiting for genesis data")
|
||||
return
|
||||
}
|
||||
s.clock = clock
|
||||
v, err := s.verifierWaiter.WaitForInitializer(ctx)
|
||||
s.newBlobVerifier = newBlobVerifierFromInitializer(v)
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not initialize blob verifier in backfill service")
|
||||
return
|
||||
}
|
||||
|
||||
if s.store.isGenesisSync() {
|
||||
log.Info("Backfill short-circuit; node synced from genesis")
|
||||
s.markComplete()
|
||||
return
|
||||
}
|
||||
|
||||
clock, err := s.cw.WaitForClock(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Backfill service failed to start while waiting for genesis data")
|
||||
return
|
||||
}
|
||||
s.clock = clock
|
||||
status := s.store.status()
|
||||
// Exit early if there aren't going to be any batches to backfill.
|
||||
if primitives.Slot(status.LowSlot) <= s.ms(s.clock.CurrentSlot()) {
|
||||
@@ -288,11 +284,6 @@ func (s *Service) Start() {
|
||||
s.markComplete()
|
||||
return
|
||||
}
|
||||
s.verifier, s.ctxMap, err = s.initVerifier(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Unable to initialize backfill verifier")
|
||||
return
|
||||
}
|
||||
|
||||
if s.initSyncWaiter != nil {
|
||||
log.Info("Backfill service waiting for initial-sync to reach head before starting")
|
||||
@@ -301,7 +292,14 @@ func (s *Service) Start() {
|
||||
return
|
||||
}
|
||||
}
|
||||
s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier, s.ctxMap, s.newBlobVerifier, s.blobStore)
|
||||
|
||||
wc, err := initWorkerCfg(ctx, s.workerCfg, s.clock, s.verifierWaiter, s.store, s.blobStore, s.dcStore)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not initialize blob verifier in backfill service")
|
||||
return
|
||||
}
|
||||
|
||||
s.pool.spawn(ctx, s.nWorkers, s.pa, wc)
|
||||
s.batchSeq = newBatchSequencer(s.nWorkers, s.ms(s.clock.CurrentSlot()), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize))
|
||||
if err = s.initBatches(); err != nil {
|
||||
log.WithError(err).Error("Non-recoverable error in backfill service")
|
||||
@@ -370,6 +368,12 @@ func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.
|
||||
}
|
||||
}
|
||||
|
||||
func newDataColumnVerifierFromInitializer(ini *verification.Initializer) verification.NewDataColumnsVerifier {
|
||||
return func(cols []blocks.RODataColumn, reqs []verification.Requirement) verification.DataColumnsVerifier {
|
||||
return ini.NewDataColumnsVerifier(cols, reqs)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) markComplete() {
|
||||
close(s.complete)
|
||||
log.Info("Backfill service marked as complete")
|
||||
|
||||
@@ -57,7 +57,8 @@ func TestServiceInit(t *testing.T) {
|
||||
pool := &mockPool{todoChan: make(chan batch, nWorkers), finishedChan: make(chan batch, nWorkers)}
|
||||
p2pt := p2ptest.NewTestP2P(t)
|
||||
bfs := filesystem.NewEphemeralBlobStorage(t)
|
||||
srv, err := NewService(ctx, su, bfs, cw, p2pt, &mockAssigner{},
|
||||
dcs := filesystem.NewEphemeralDataColumnStorage(t)
|
||||
srv, err := NewService(ctx, su, bfs, dcs, cw, p2pt, &mockAssigner{},
|
||||
WithBatchSize(batchSize), WithWorkerCount(nWorkers), WithEnableBackfill(true), WithVerifierWaiter(&mockInitalizerWaiter{}))
|
||||
require.NoError(t, err)
|
||||
srv.ms = mockMinimumSlotter{min: primitives.Slot(high - batchSize*uint64(nBatches))}.minimumSlot
|
||||
|
||||
@@ -74,7 +74,7 @@ func (s *Store) status() *dbval.BackfillStatus {
|
||||
// fillBack saves the slice of blocks and updates the BackfillStatus LowSlot/Root/ParentRoot tracker to the values
|
||||
// from the first block in the slice. This method assumes that the block slice has been fully validated and
|
||||
// sorted in slot order by the calling function.
|
||||
func (s *Store) fillBack(ctx context.Context, current primitives.Slot, blocks []blocks.ROBlock, store das.AvailabilityStore) (*dbval.BackfillStatus, error) {
|
||||
func (s *Store) fillBack(ctx context.Context, current primitives.Slot, blocks []blocks.ROBlock, store das.AvailabilityChecker) (*dbval.BackfillStatus, error) {
|
||||
status := s.status()
|
||||
if len(blocks) == 0 {
|
||||
return status, nil
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v6/crypto/bls"
|
||||
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
|
||||
@@ -23,18 +22,34 @@ var errUnknownDomain = errors.New("runtime error looking up signing domain for f
|
||||
type verifiedROBlocks []blocks.ROBlock
|
||||
|
||||
func (v verifiedROBlocks) blobIdents(retentionStart primitives.Slot) ([]blobSummary, error) {
|
||||
// early return if the newest block is outside the retention window
|
||||
if len(v) > 0 && v[len(v)-1].Block().Slot() < retentionStart {
|
||||
if len(v) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
latest := v[len(v)-1].Block().Slot()
|
||||
// early return if the newest block is outside the retention window
|
||||
if latest < retentionStart {
|
||||
return nil, nil
|
||||
}
|
||||
fuluStart := params.BeaconConfig().FuluForkEpoch
|
||||
// If the batch end slot or last result block are pre-fulu, so are the rest.
|
||||
if slots.ToEpoch(latest) < fuluStart {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
bs := make([]blobSummary, 0)
|
||||
for i := range v {
|
||||
if v[i].Block().Slot() < retentionStart {
|
||||
slot := v[i].Block().Slot()
|
||||
if slot < retentionStart {
|
||||
continue
|
||||
}
|
||||
if v[i].Block().Version() < version.Deneb {
|
||||
continue
|
||||
}
|
||||
// Assuming blocks are sorted, as soon as we see 1 fulu block we know the rest will also be fulu.
|
||||
if slots.ToEpoch(slot) >= fuluStart {
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
c, err := v[i].Block().Body().BlobKzgCommitments()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "unexpected error checking commitments for block root %#x", v[i].Root())
|
||||
@@ -57,37 +72,31 @@ type verifier struct {
|
||||
domain *domainCache
|
||||
}
|
||||
|
||||
// TODO: rewrite this to use ROBlock.
|
||||
func (vr verifier) verify(blks []interfaces.ReadOnlySignedBeaconBlock) (verifiedROBlocks, error) {
|
||||
func (vr verifier) verify(blks []blocks.ROBlock) (verifiedROBlocks, error) {
|
||||
var err error
|
||||
result := make([]blocks.ROBlock, len(blks))
|
||||
sigSet := bls.NewSet()
|
||||
for i := range blks {
|
||||
result[i], err = blocks.NewROBlock(blks[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if i > 0 && result[i-1].Root() != result[i].Block().ParentRoot() {
|
||||
p, b := result[i-1], result[i]
|
||||
if i > 0 && blks[i-1].Root() != blks[i].Block().ParentRoot() {
|
||||
p, b := blks[i-1], blks[i]
|
||||
return nil, errors.Wrapf(errInvalidBatchChain,
|
||||
"slot %d parent_root=%#x, slot %d root=%#x",
|
||||
b.Block().Slot(), b.Block().ParentRoot(),
|
||||
p.Block().Slot(), p.Root())
|
||||
}
|
||||
set, err := vr.blockSignatureBatch(result[i])
|
||||
set, err := vr.blockSignatureBatch(blks[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "block signature batch")
|
||||
}
|
||||
sigSet.Join(set)
|
||||
}
|
||||
v, err := sigSet.Verify()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "block signature verification error")
|
||||
return nil, errors.Wrap(err, "SignatureBatch Verify")
|
||||
}
|
||||
if !v {
|
||||
return nil, errors.New("batch block signature verification failed")
|
||||
return nil, errors.New("SignatureBatch Verify invalid")
|
||||
}
|
||||
return result, nil
|
||||
return blks, nil
|
||||
}
|
||||
|
||||
func (vr verifier) blockSignatureBatch(b blocks.ROBlock) (*bls.SignatureBatch, error) {
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v6/crypto/bls"
|
||||
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
|
||||
@@ -72,12 +71,7 @@ func TestVerify(t *testing.T) {
|
||||
}
|
||||
v, err := newBackfillVerifier(vr, pubkeys)
|
||||
require.NoError(t, err)
|
||||
notrob := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks))
|
||||
// We have to unwrap the ROBlocks for this code because that's what it expects (for now).
|
||||
for i := range blks {
|
||||
notrob[i] = blks[i].ReadOnlySignedBeaconBlock
|
||||
}
|
||||
vbs, err := v.verify(notrob)
|
||||
vbs, err := v.verify(blks)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(blks), len(vbs))
|
||||
}
|
||||
|
||||
@@ -4,14 +4,63 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type workerCfg struct {
|
||||
c *startup.Clock
|
||||
v *verifier
|
||||
cm sync.ContextByteVersions
|
||||
nbv verification.NewBlobVerifier
|
||||
ndcv verification.NewDataColumnsVerifier
|
||||
bfs *filesystem.BlobStorage
|
||||
cfs *filesystem.DataColumnStorage
|
||||
custodyInfo *peerdas.CustodyInfo
|
||||
}
|
||||
|
||||
func initWorkerCfg(ctx context.Context, cfg *workerCfg, c *startup.Clock, vw InitializerWaiter, store *Store, bfs *filesystem.BlobStorage, cfs *filesystem.DataColumnStorage) (*workerCfg, error) {
|
||||
vi, err := vw.WaitForInitializer(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cps, err := store.originState(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
keys, err := cps.PublicKeys()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to retrieve public keys for all validators in the origin state")
|
||||
}
|
||||
vr := cps.GenesisValidatorsRoot()
|
||||
cm, err := sync.ContextByteVersionsForValRoot(bytesutil.ToBytes32(vr))
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "unable to initialize context version map using genesis validator root %#x", vr)
|
||||
}
|
||||
v, err := newBackfillVerifier(vr, keys)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "newBackfillVerifier failed")
|
||||
}
|
||||
if cfg == nil {
|
||||
cfg = &workerCfg{}
|
||||
}
|
||||
cfg.c = c
|
||||
cfg.v = v
|
||||
cfg.cm = cm
|
||||
cfg.bfs = bfs
|
||||
cfg.cfs = cfs
|
||||
cfg.nbv = newBlobVerifierFromInitializer(vi)
|
||||
cfg.ndcv = newDataColumnVerifierFromInitializer(vi)
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
type workerId int
|
||||
|
||||
type p2pWorker struct {
|
||||
@@ -19,23 +68,38 @@ type p2pWorker struct {
|
||||
todo chan batch
|
||||
done chan batch
|
||||
p2p p2p.P2P
|
||||
v *verifier
|
||||
c *startup.Clock
|
||||
cm sync.ContextByteVersions
|
||||
nbv verification.NewBlobVerifier
|
||||
bfs *filesystem.BlobStorage
|
||||
cfg *workerCfg
|
||||
}
|
||||
|
||||
func newP2pWorker(id workerId, p p2p.P2P, todo, done chan batch, cfg *workerCfg) *p2pWorker {
|
||||
return &p2pWorker{
|
||||
id: id,
|
||||
todo: todo,
|
||||
done: done,
|
||||
p2p: p,
|
||||
cfg: cfg,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *p2pWorker) run(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case b := <-w.todo:
|
||||
log.WithFields(b.logFields()).WithField("backfillWorker", w.id).Debug("Backfill worker received batch")
|
||||
if b.state == batchSidecarSync {
|
||||
w.done <- w.handleSidecars(ctx, b)
|
||||
} else {
|
||||
w.done <- w.handleBlocks(ctx, b)
|
||||
if err := b.waitUntilReady(ctx); err != nil {
|
||||
log.WithField("batch_id", b.id()).WithError(ctx.Err()).Info("worker context canceled while waiting to retry")
|
||||
continue
|
||||
}
|
||||
log.WithFields(b.logFields()).WithField("backfillWorker", w.id).Debug("Backfill worker received batch")
|
||||
if b.state == batchSyncBlobs {
|
||||
w.done <- w.handleSidecars(ctx, b)
|
||||
continue
|
||||
}
|
||||
if b.state == batchSyncColumns {
|
||||
w.done <- w.handleColumns(ctx, b)
|
||||
continue
|
||||
}
|
||||
|
||||
w.done <- w.handleBlocks(ctx, b)
|
||||
case <-ctx.Done():
|
||||
log.WithField("backfillWorker", w.id).Info("Backfill worker exiting after context canceled")
|
||||
return
|
||||
@@ -44,21 +108,27 @@ func (w *p2pWorker) run(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (w *p2pWorker) handleBlocks(ctx context.Context, b batch) batch {
|
||||
cs := w.c.CurrentSlot()
|
||||
blobRetentionStart, err := sync.BlobRPCMinValidSlot(cs)
|
||||
current := w.cfg.c.CurrentSlot()
|
||||
blobRetentionStart, err := sync.BlobRPCMinValidSlot(current)
|
||||
if err != nil {
|
||||
return b.withRetryableError(errors.Wrap(err, "configuration issue, could not compute minimum blob retention slot"))
|
||||
}
|
||||
b.blockPid = b.busy
|
||||
start := time.Now()
|
||||
results, err := sync.SendBeaconBlocksByRangeRequest(ctx, w.c, w.p2p, b.blockPid, b.blockRequest(), blockValidationMetrics)
|
||||
dlt := time.Now()
|
||||
backfillBatchTimeDownloadingBlocks.Observe(float64(dlt.Sub(start).Milliseconds()))
|
||||
results, err := sync.SendBeaconBlocksByRangeRequest(ctx, w.cfg.c, w.p2p, b.blockPid, b.blockRequest(), blockValidationMetrics)
|
||||
if err != nil {
|
||||
log.WithError(err).WithFields(b.logFields()).Debug("Batch requesting failed")
|
||||
return b.withRetryableError(err)
|
||||
}
|
||||
vb, err := w.v.verify(results)
|
||||
dlt := time.Now()
|
||||
backfillBatchTimeDownloadingBlocks.Observe(float64(dlt.Sub(start).Milliseconds()))
|
||||
toVerify, err := blocks.NewROBlockSlice(results)
|
||||
if err != nil {
|
||||
log.WithError(err).WithFields(b.logFields()).Debug("Batch conversion to ROBlock failed")
|
||||
return b.withRetryableError(err)
|
||||
}
|
||||
|
||||
vb, err := w.cfg.v.verify(toVerify)
|
||||
backfillBatchTimeVerifying.Observe(float64(time.Since(dlt).Milliseconds()))
|
||||
if err != nil {
|
||||
log.WithError(err).WithFields(b.logFields()).Debug("Batch validation failed")
|
||||
@@ -73,11 +143,18 @@ func (w *p2pWorker) handleBlocks(ctx context.Context, b batch) batch {
|
||||
}
|
||||
backfillBlocksApproximateBytes.Add(float64(bdl))
|
||||
log.WithFields(b.logFields()).WithField("dlbytes", bdl).Debug("Backfill batch block bytes downloaded")
|
||||
bs, err := newBlobSync(cs, vb, &blobSyncConfig{retentionStart: blobRetentionStart, nbv: w.nbv, store: w.bfs})
|
||||
bscfg := &blobSyncConfig{retentionStart: blobRetentionStart, nbv: w.cfg.nbv, store: w.cfg.bfs}
|
||||
bs, err := newBlobSync(current, vb, bscfg)
|
||||
if err != nil {
|
||||
return b.withRetryableError(err)
|
||||
}
|
||||
return b.withResults(vb, bs)
|
||||
w.cfg.custodyInfo.Mut.RLock()
|
||||
defer w.cfg.custodyInfo.Mut.RUnlock()
|
||||
cs, err := newColumnSync(b, vb, current, w.p2p, vb, w.cfg)
|
||||
if err != nil {
|
||||
return b.withFatalError(err)
|
||||
}
|
||||
return b.postBlockSync(vb, bs, cs)
|
||||
}
|
||||
|
||||
func (w *p2pWorker) handleSidecars(ctx context.Context, b batch) batch {
|
||||
@@ -85,7 +162,7 @@ func (w *p2pWorker) handleSidecars(ctx context.Context, b batch) batch {
|
||||
start := time.Now()
|
||||
// we don't need to use the response for anything other than metrics, because blobResponseValidation
|
||||
// adds each of them to a batch AvailabilityStore once it is checked.
|
||||
blobs, err := sync.SendBlobsByRangeRequest(ctx, w.c, w.p2p, b.blobPid, w.cm, b.blobRequest(), b.blobResponseValidator(), blobValidationMetrics)
|
||||
blobs, err := sync.SendBlobsByRangeRequest(ctx, w.cfg.c, w.p2p, b.blobPid, w.cfg.cm, b.blobRequest(), b.blobResponseValidator(), blobValidationMetrics)
|
||||
if err != nil {
|
||||
b.bs = nil
|
||||
return b.withRetryableError(err)
|
||||
@@ -98,19 +175,19 @@ func (w *p2pWorker) handleSidecars(ctx context.Context, b batch) batch {
|
||||
backfillBlobsApproximateBytes.Add(float64(sz))
|
||||
log.WithFields(b.logFields()).WithField("dlbytes", sz).Debug("Backfill batch blob bytes downloaded")
|
||||
}
|
||||
return b.postBlobSync()
|
||||
return b.postSidecarSync()
|
||||
}
|
||||
|
||||
func newP2pWorker(id workerId, p p2p.P2P, todo, done chan batch, c *startup.Clock, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) *p2pWorker {
|
||||
return &p2pWorker{
|
||||
id: id,
|
||||
todo: todo,
|
||||
done: done,
|
||||
p2p: p,
|
||||
v: v,
|
||||
c: c,
|
||||
cm: cm,
|
||||
nbv: nbv,
|
||||
bfs: bfs,
|
||||
func (w *p2pWorker) handleColumns(ctx context.Context, b batch) batch {
|
||||
b.columnPid = b.busy
|
||||
start := time.Now()
|
||||
vr := b.validatingColumnRequest()
|
||||
// Response is dropped because the validation code adds the columns to the columnSync AvailabilityStore under the hood.
|
||||
_, err := sync.SendDataColumnSidecarsByRangeRequest(ctx, w.cfg.c, w.p2p, b.busy, w.cfg.cm, vr.req, vr.validate)
|
||||
if err != nil {
|
||||
return b.withRetryableError(errors.Wrap(err, "failed to request data column sidecars"))
|
||||
}
|
||||
dlt := time.Now()
|
||||
backfillBatchTimeDownloadingColumns.Observe(float64(dlt.Sub(start).Milliseconds()))
|
||||
return b.postSidecarSync()
|
||||
}
|
||||
|
||||
211
beacon-chain/sync/data_column_assignment.go
Normal file
211
beacon-chain/sync/data_column_assignment.go
Normal file
@@ -0,0 +1,211 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"slices"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/crypto/rand"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type columnRankedPeer struct {
|
||||
peerID peer.ID
|
||||
nodeID enode.ID
|
||||
custodied []uint64
|
||||
cov float64
|
||||
}
|
||||
|
||||
func (p *columnRankedPeer) covered(needed peerdas.ColumnIndices) []uint64 {
|
||||
covered := make([]uint64, 0, len(p.custodied))
|
||||
for col, want := range needed {
|
||||
if want && p.custodied[col] == 1 {
|
||||
covered = append(covered, uint64(col))
|
||||
}
|
||||
}
|
||||
return covered
|
||||
}
|
||||
|
||||
func (p *columnRankedPeer) coverageScore(rarity []float64) float64 {
|
||||
if p.cov == 0 {
|
||||
p.cov = coverageScore(p.custodied, rarity)
|
||||
}
|
||||
return p.cov
|
||||
}
|
||||
|
||||
type ColumnPeerRank struct {
|
||||
peers map[peer.ID]*columnRankedPeer
|
||||
freq []colFreq
|
||||
rarity []float64
|
||||
rg rand.Rand
|
||||
covScoreRank []*columnRankedPeer
|
||||
}
|
||||
|
||||
func coverageScore(covered []uint64, rarity []float64) float64 {
|
||||
score := 0.0
|
||||
for _, col := range covered {
|
||||
if col >= uint64(len(rarity)) {
|
||||
continue
|
||||
}
|
||||
score += rarity[col]
|
||||
}
|
||||
return score
|
||||
}
|
||||
|
||||
func (m *ColumnPeerRank) HighestForIndices(needed peerdas.ColumnIndices, busy map[peer.ID]bool) (peer.ID, []uint64, error) {
|
||||
// - find the custodied column with the lowest frequency
|
||||
// - collect all the peers that have custody of that column
|
||||
// - score the peers by how many other of the needed columns they ave
|
||||
// -- or, score them by the rank of the columns they have??
|
||||
for _, cf := range m.freq {
|
||||
if !needed[cf.col] {
|
||||
continue
|
||||
}
|
||||
if cf.freq() == 0 {
|
||||
continue
|
||||
}
|
||||
var best *columnRankedPeer
|
||||
bestScore, bestCoverage := 0.0, make([]uint64, 1)
|
||||
for _, p := range cf.custodians {
|
||||
if busy[p.peerID] {
|
||||
continue
|
||||
}
|
||||
coverage := p.covered(needed)
|
||||
if len(coverage) == 0 {
|
||||
continue
|
||||
}
|
||||
pscore := coverageScore(coverage, m.rarity)
|
||||
if pscore > bestScore {
|
||||
best, bestScore, bestCoverage = p, pscore, coverage
|
||||
}
|
||||
}
|
||||
if best != nil {
|
||||
return best.peerID, bestCoverage, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", nil, errors.New("no peers able to cover needed columns")
|
||||
}
|
||||
|
||||
func NeededCoveredIntersection(needed peerdas.ColumnIndices, covered []uint64) []uint64 {
|
||||
intersection := make([]uint64, 0, len(covered))
|
||||
for _, col := range covered {
|
||||
if needed[col] {
|
||||
intersection = append(intersection, col)
|
||||
}
|
||||
}
|
||||
return intersection
|
||||
}
|
||||
|
||||
// Lowest returns the lowest scoring peer in the set. This can be used to pick a peer
|
||||
// for block requests, preserving the peers that have the highest coverage scores
|
||||
// for column requests.
|
||||
func (m *ColumnPeerRank) Lowest(busy map[peer.ID]bool) (peer.ID, error) {
|
||||
for i := len(m.covScoreRank) - 1; i >= 0; i-- {
|
||||
p := m.covScoreRank[i]
|
||||
if !busy[p.peerID] {
|
||||
return p.peerID, nil
|
||||
}
|
||||
}
|
||||
return "", errors.New("no peers available")
|
||||
}
|
||||
|
||||
type colFreq struct {
|
||||
col uint64
|
||||
custodians []*columnRankedPeer
|
||||
}
|
||||
|
||||
func (f colFreq) rarity() float64 {
|
||||
if f.freq() == 0 {
|
||||
return 1
|
||||
}
|
||||
return 1 / float64(f.freq())
|
||||
}
|
||||
|
||||
func (f colFreq) freq() int {
|
||||
return len(f.custodians)
|
||||
}
|
||||
|
||||
type colFreqs []colFreq
|
||||
|
||||
func (s colFreqs) rarity() []float64 {
|
||||
ra := make([]float64, len(s))
|
||||
for _, f := range s {
|
||||
ra[f.col] = f.rarity()
|
||||
}
|
||||
return ra
|
||||
}
|
||||
|
||||
// ColumnMatrix computes a grid of column custody x peer.
|
||||
func ComputeColumnPeerRank(peers []peer.ID, p2pSvc p2p.P2P) (*ColumnPeerRank, error) {
|
||||
nc := params.BeaconConfig().NumberOfColumns
|
||||
grid := make(map[peer.ID]*columnRankedPeer, len(peers))
|
||||
freqByColumn := make([]colFreq, nc)
|
||||
for i := range freqByColumn {
|
||||
freqByColumn[i].col = uint64(i)
|
||||
}
|
||||
for _, peer := range peers {
|
||||
nodeID, err := p2p.ConvertPeerIDToNodeID(peer)
|
||||
if err != nil {
|
||||
log.WithField("peerID", peer).WithError(err).Debug("Failed to convert peer ID to node ID.")
|
||||
continue
|
||||
}
|
||||
dasInfo, _, err := peerdas.Info(nodeID, p2pSvc.CustodyGroupCountFromPeer(peer))
|
||||
if err != nil {
|
||||
log.WithField("peerID", peer).WithField("nodeID", nodeID).WithError(err).Debug("Failed to derive custody groups from peer.")
|
||||
return nil, errors.Wrap(err, "custody groups")
|
||||
}
|
||||
p := &columnRankedPeer{
|
||||
peerID: peer,
|
||||
nodeID: nodeID,
|
||||
custodied: make([]uint64, nc),
|
||||
}
|
||||
for c, v := range dasInfo.CustodyColumns {
|
||||
if c > nc-1 {
|
||||
return nil, errors.Errorf("column %d is out of bounds", c)
|
||||
}
|
||||
if v {
|
||||
p.custodied[c] = 1
|
||||
freqByColumn[c].custodians = append(freqByColumn[c].custodians, p)
|
||||
}
|
||||
}
|
||||
grid[peer] = p
|
||||
}
|
||||
|
||||
var colByFreq colFreqs
|
||||
colByFreq = slices.SortedFunc(slices.Values(freqByColumn), func(a, b colFreq) int {
|
||||
if a.freq() == b.freq() {
|
||||
return 0
|
||||
}
|
||||
if a.freq() < b.freq() {
|
||||
return -1
|
||||
}
|
||||
return 1
|
||||
})
|
||||
rarity := colByFreq.rarity()
|
||||
|
||||
covScoreRank := make([]*columnRankedPeer, 0, len(grid))
|
||||
for _, p := range grid {
|
||||
covScoreRank = append(covScoreRank, p)
|
||||
}
|
||||
slices.SortFunc(covScoreRank, func(a, b *columnRankedPeer) int {
|
||||
if a.coverageScore(rarity) == b.coverageScore(rarity) {
|
||||
return 0
|
||||
}
|
||||
if a.coverageScore(rarity) < b.coverageScore(rarity) {
|
||||
return -1
|
||||
}
|
||||
return 1
|
||||
})
|
||||
|
||||
return &ColumnPeerRank{
|
||||
peers: grid,
|
||||
freq: colByFreq,
|
||||
rg: *rand.NewGenerator(),
|
||||
rarity: rarity,
|
||||
covScoreRank: covScoreRank,
|
||||
}, nil
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package sync
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
|
||||
@@ -922,3 +923,18 @@ func uint64MapDiffer(left, right map[uint64]bool) bool {
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func DataColumnSidecarsByRangeRequest(columns []uint64, start, end primitives.Slot) *eth.DataColumnSidecarsByRangeRequest {
|
||||
sort.Slice(columns, func(i, j int) bool {
|
||||
return columns[i] < columns[j]
|
||||
})
|
||||
return ð.DataColumnSidecarsByRangeRequest{
|
||||
StartSlot: start,
|
||||
Count: uint64(end-start) + 1,
|
||||
Columns: columns,
|
||||
}
|
||||
}
|
||||
|
||||
func RequestDataColumnsByRoot(ctx context.Context, ctxMap ContextByteVersions, pid peer.ID, reqs types.DataColumnsByRootIdentifiers) ([]blocks.RODataColumn, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@@ -358,6 +358,7 @@ func SendDataColumnSidecarsByRangeRequest(
|
||||
pid peer.ID,
|
||||
ctxMap ContextByteVersions,
|
||||
req *ethpb.DataColumnSidecarsByRangeRequest,
|
||||
vfs ...DataColumnResponseValidation,
|
||||
) ([]blocks.RODataColumn, error) {
|
||||
topic, err := p2p.TopicFromMessage(p2p.DataColumnSidecarsByRangeName, slots.ToEpoch(tor.CurrentSlot()))
|
||||
if err != nil {
|
||||
@@ -399,6 +400,7 @@ func SendDataColumnSidecarsByRangeRequest(
|
||||
dataColumnValidatorFromRangeReq(req),
|
||||
dataColumnIndexValidatorFromRangeReq(req),
|
||||
}
|
||||
vfuncs = append(vfuncs, vfs...)
|
||||
|
||||
// Read the data column sidecars from the stream.
|
||||
roDataColumns := make([]blocks.RODataColumn, 0, max)
|
||||
|
||||
@@ -311,6 +311,19 @@ func MaxSafeEpoch() primitives.Epoch {
|
||||
return primitives.Epoch(math.MaxUint64 / uint64(params.BeaconConfig().SlotsPerEpoch))
|
||||
}
|
||||
|
||||
// SafeEpochStartOrMax returns the start slot of the given epoch if it will not overflow,
|
||||
// otherwise it returns the
|
||||
func SafeEpochStartOrMax(e primitives.Epoch) primitives.Slot {
|
||||
// The max value converted to a slot can't be the start of a conceptual epoch,
|
||||
// because the first slot of that epoch would be overflow
|
||||
// so use the start slot of the epoch right before that value.
|
||||
me := MaxSafeEpoch() - 1
|
||||
if e > me {
|
||||
return UnsafeEpochStart(me)
|
||||
}
|
||||
return UnsafeEpochStart(e)
|
||||
}
|
||||
|
||||
// SecondsUntilNextEpochStart returns how many seconds until the next Epoch start from the current time and slot
|
||||
func SecondsUntilNextEpochStart(genesisTimeSec uint64) (uint64, error) {
|
||||
currentSlot := CurrentSlot(genesisTimeSec)
|
||||
|
||||
Reference in New Issue
Block a user