Files
prysm/beacon-chain/sync/rpc_send_request.go
kasey 918129cf36 Replace statefeed Initialize (#12285)
* refactor initialization to blocking startup method

* require genesisSetter in blockchain, fix tests

* work-around gazelle weirdness

* fix dep gazelle ignores

* only call SetGenesis once

* fix typo

* validator test setup and fix to return right error

* move waitForChainStart to Start

* wire up sync Service.genesisWaiter

* fix p2p genesisWaiter plumbing

* remove extra clock type, integrate into genesis

and rename

* use time.Now when no Nower is specified

* remove unused ClockSetter

* simplify rpc context checking

* fix typo

* use clock everywhere in sync; [32]byte val root

* don't use DeepEqual to compare [32]byte and []byte

* don't use clock in init sync, not wired up yet

* use clock waiter in blockchain as well

* use cancelable contexts in tests with goroutines

* missed a reference to WithClockSetter

* Update beacon-chain/startup/genesis.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update beacon-chain/blockchain/service_test.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* more clear docs

* doc for NewClock

* move clock typedef to more logical file name

* adding documentation

* gaz

* fixes for capella

* reducing test raciness

* fix races in committee cache tests

* lint

* add tests on Duration slot math helper

* startup package test coverage

* fix bad merge

* set non-zero genesis time in tests that call Start

* happy deepsource, happy me-epsource

* replace Synced event with channel

* remove unused error

* remove accidental wip commit

* gaz!

* remove unused event constants

* remove sync statefeed subscription to fix deadlock

* remove state notifier

* fix build

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: nisdas <nishdas93@gmail.com>
2023-05-03 04:34:01 +00:00

132 lines
4.5 KiB
Go

package sync
import (
"context"
"io"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
)
// ErrInvalidFetchedData is thrown if stream fails to provide requested blocks.
var ErrInvalidFetchedData = errors.New("invalid data returned from peer")
// BeaconBlockProcessor defines a block processing function, which allows to start utilizing
// blocks even before all blocks are ready.
type BeaconBlockProcessor func(block interfaces.ReadOnlySignedBeaconBlock) error
// SendBeaconBlocksByRangeRequest sends BeaconBlocksByRange and returns fetched blocks, if any.
func SendBeaconBlocksByRangeRequest(
ctx context.Context, tor blockchain.TemporalOracle, p2pProvider p2p.SenderEncoder, pid peer.ID,
req *pb.BeaconBlocksByRangeRequest, blockProcessor BeaconBlockProcessor,
) ([]interfaces.ReadOnlySignedBeaconBlock, error) {
topic, err := p2p.TopicFromMessage(p2p.BeaconBlocksByRangeMessageName, slots.ToEpoch(tor.CurrentSlot()))
if err != nil {
return nil, err
}
stream, err := p2pProvider.Send(ctx, req, topic, pid)
if err != nil {
return nil, err
}
defer closeStream(stream, log)
// Augment block processing function, if non-nil block processor is provided.
blocks := make([]interfaces.ReadOnlySignedBeaconBlock, 0, req.Count)
process := func(blk interfaces.ReadOnlySignedBeaconBlock) error {
blocks = append(blocks, blk)
if blockProcessor != nil {
return blockProcessor(blk)
}
return nil
}
var prevSlot primitives.Slot
for i := uint64(0); ; i++ {
isFirstChunk := i == 0
blk, err := ReadChunkedBlock(stream, tor, p2pProvider, isFirstChunk)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
}
// The response MUST contain no more than `count` blocks, and no more than
// MAX_REQUEST_BLOCKS blocks.
if i >= req.Count || i >= params.BeaconNetworkConfig().MaxRequestBlocks {
return nil, ErrInvalidFetchedData
}
// Returned blocks MUST be in the slot range [start_slot, start_slot + count * step).
if blk.Block().Slot() < req.StartSlot || blk.Block().Slot() >= req.StartSlot.Add(req.Count*req.Step) {
return nil, ErrInvalidFetchedData
}
// Returned blocks, where they exist, MUST be sent in a consecutive order.
// Consecutive blocks MUST have values in `step` increments (slots may be skipped in between).
isSlotOutOfOrder := false
if prevSlot >= blk.Block().Slot() {
isSlotOutOfOrder = true
} else if req.Step != 0 && blk.Block().Slot().SubSlot(prevSlot).Mod(req.Step) != 0 {
isSlotOutOfOrder = true
}
if !isFirstChunk && isSlotOutOfOrder {
return nil, ErrInvalidFetchedData
}
prevSlot = blk.Block().Slot()
if err := process(blk); err != nil {
return nil, err
}
}
return blocks, nil
}
// SendBeaconBlocksByRootRequest sends BeaconBlocksByRoot and returns fetched blocks, if any.
func SendBeaconBlocksByRootRequest(
ctx context.Context, clock blockchain.TemporalOracle, p2pProvider p2p.P2P, pid peer.ID,
req *p2ptypes.BeaconBlockByRootsReq, blockProcessor BeaconBlockProcessor,
) ([]interfaces.ReadOnlySignedBeaconBlock, error) {
topic, err := p2p.TopicFromMessage(p2p.BeaconBlocksByRootsMessageName, slots.ToEpoch(clock.CurrentSlot()))
if err != nil {
return nil, err
}
stream, err := p2pProvider.Send(ctx, req, topic, pid)
if err != nil {
return nil, err
}
defer closeStream(stream, log)
// Augment block processing function, if non-nil block processor is provided.
blocks := make([]interfaces.ReadOnlySignedBeaconBlock, 0, len(*req))
process := func(block interfaces.ReadOnlySignedBeaconBlock) error {
blocks = append(blocks, block)
if blockProcessor != nil {
return blockProcessor(block)
}
return nil
}
for i := 0; i < len(*req); i++ {
// Exit if peer sends more than max request blocks.
if uint64(i) >= params.BeaconNetworkConfig().MaxRequestBlocks {
break
}
isFirstChunk := i == 0
blk, err := ReadChunkedBlock(stream, clock, p2pProvider, isFirstChunk)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
}
if err := process(blk); err != nil {
return nil, err
}
}
return blocks, nil
}