Compare commits

...

10 Commits

Author SHA1 Message Date
Manu NALEPA
aa9e3158b4 Blobs: Refactor. 2024-11-29 12:54:36 +01:00
Manu NALEPA
425a7e327e PeerDASIsActive: Remove tautological condition. 2024-11-29 10:53:53 +01:00
Manu NALEPA
5a7aa3715e Fix typo: 7954 ==> 7594. 2024-11-29 10:51:41 +01:00
Manu NALEPA
453ea01deb disconnectFromPeer: Remove unused function. 2024-11-28 17:37:30 +01:00
Manu NALEPA
6537f8011e Merge branch 'peerDAS' into peerDAS-do-not-merge 2024-11-28 17:27:44 +01:00
Manu NALEPA
5f17317c1c Revert "Add error count prom metric (#14670)"
This reverts commit b28b1ed6ce.
2024-11-28 16:37:19 +01:00
Manu NALEPA
79d05a87bb listenForNewNodes and FindPeersWithSubnet: Stop using ReadNodes and use iterator instead. (#14669)
* `listenForNewNodes` and `FindPeersWithSubnet`: Stop using `Readnodes` and use iterator instead.

It avoids infinite loop in small devnets.

* Update beacon-chain/p2p/discovery.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

---------

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>
2024-11-28 11:25:28 +00:00
kasey
1707cf3ec7 http response handling improvements (#14673)
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
2024-11-27 22:13:45 +00:00
wangjingcun
bdbb850250 chore: fix 404 status URL (#14675)
Signed-off-by: wangjingcun <wangjingcun@aliyun.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2024-11-27 15:54:00 +00:00
Dhruv Bodani
b28b1ed6ce Add error count prom metric (#14670)
* add error count prom metric

* address review comments

* add comment for response writer

* update changelog
2024-11-27 11:56:07 +00:00
21 changed files with 132 additions and 129 deletions

View File

@@ -65,6 +65,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Updated light client consensus types. [PR](https://github.com/prysmaticlabs/prysm/pull/14652)
- Fixed pending deposits processing on Electra.
- Modified `ListAttestationsV2`, `GetAttesterSlashingsV2` and `GetAggregateAttestationV2` endpoints to use slot to determine fork version.
- Improvements to HTTP response handling. [pr](https://github.com/prysmaticlabs/prysm/pull/14673)
### Deprecated
@@ -96,6 +97,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- corrects nil check on some interface attestation types
- temporary solution to handling electra attesation and attester_slashing events. [pr](14655)
- Diverse log improvements and comment additions.
- P2P: Avoid infinite loop when looking for peers in small networks.
### Security

View File

@@ -12,6 +12,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//api:go_default_library",
"//api/client:go_default_library",
"//api/server/structs:go_default_library",
"//config/fieldparams:go_default_library",
"//consensus-types:go_default_library",

View File

@@ -14,6 +14,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/api"
"github.com/prysmaticlabs/prysm/v5/api/client"
"github.com/prysmaticlabs/prysm/v5/api/server/structs"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
@@ -176,7 +177,7 @@ func (c *Client) do(ctx context.Context, method string, path string, body io.Rea
err = non200Err(r)
return
}
res, err = io.ReadAll(r.Body)
res, err = io.ReadAll(io.LimitReader(r.Body, client.MaxBodySize))
if err != nil {
err = errors.Wrap(err, "error reading http response body from builder server")
return
@@ -358,7 +359,7 @@ func (c *Client) Status(ctx context.Context) error {
}
func non200Err(response *http.Response) error {
bodyBytes, err := io.ReadAll(response.Body)
bodyBytes, err := io.ReadAll(io.LimitReader(response.Body, client.MaxErrBodySize))
var errMessage ErrorMessage
var body string
if err != nil {

View File

@@ -10,11 +10,17 @@ import (
"github.com/pkg/errors"
)
const (
MaxBodySize int64 = 1 << 23 // 8MB default, WithMaxBodySize can override
MaxErrBodySize int64 = 1 << 17 // 128KB
)
// Client is a wrapper object around the HTTP client.
type Client struct {
hc *http.Client
baseURL *url.URL
token string
hc *http.Client
baseURL *url.URL
token string
maxBodySize int64
}
// NewClient constructs a new client with the provided options (ex WithTimeout).
@@ -26,8 +32,9 @@ func NewClient(host string, opts ...ClientOpt) (*Client, error) {
return nil, err
}
c := &Client{
hc: &http.Client{},
baseURL: u,
hc: &http.Client{},
baseURL: u,
maxBodySize: MaxBodySize,
}
for _, o := range opts {
o(c)
@@ -72,7 +79,7 @@ func (c *Client) NodeURL() string {
// Get is a generic, opinionated GET function to reduce boilerplate amongst the getters in this package.
func (c *Client) Get(ctx context.Context, path string, opts ...ReqOption) ([]byte, error) {
u := c.baseURL.ResolveReference(&url.URL{Path: path})
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), http.NoBody)
if err != nil {
return nil, err
}
@@ -89,7 +96,7 @@ func (c *Client) Get(ctx context.Context, path string, opts ...ReqOption) ([]byt
if r.StatusCode != http.StatusOK {
return nil, Non200Err(r)
}
b, err := io.ReadAll(r.Body)
b, err := io.ReadAll(io.LimitReader(r.Body, c.maxBodySize))
if err != nil {
return nil, errors.Wrap(err, "error reading http response body")
}

View File

@@ -25,16 +25,16 @@ var ErrInvalidNodeVersion = errors.New("invalid node version response")
var ErrConnectionIssue = errors.New("could not connect")
// Non200Err is a function that parses an HTTP response to handle responses that are not 200 with a formatted error.
func Non200Err(response *http.Response) error {
bodyBytes, err := io.ReadAll(response.Body)
func Non200Err(r *http.Response) error {
b, err := io.ReadAll(io.LimitReader(r.Body, MaxErrBodySize))
var body string
if err != nil {
body = "(Unable to read response body.)"
} else {
body = "response body:\n" + string(bodyBytes)
body = "response body:\n" + string(b)
}
msg := fmt.Sprintf("code=%d, url=%s, body=%s", response.StatusCode, response.Request.URL, body)
switch response.StatusCode {
msg := fmt.Sprintf("code=%d, url=%s, body=%s", r.StatusCode, r.Request.URL, body)
switch r.StatusCode {
case http.StatusNotFound:
return errors.Wrap(ErrNotFound, msg)
default:

View File

@@ -46,3 +46,10 @@ func WithAuthenticationToken(token string) ClientOpt {
c.token = token
}
}
// WithMaxBodySize overrides the default max body size of 8MB.
func WithMaxBodySize(size int64) ClientOpt {
return func(c *Client) {
c.maxBodySize = size
}
}

View File

@@ -55,7 +55,7 @@ func HigherEqualThanAltairVersionAndEpoch(s state.BeaconState, e primitives.Epoc
// PeerDASIsActive checks whether peerDAS is active at the provided slot.
func PeerDASIsActive(slot primitives.Slot) bool {
return params.PeerDASEnabled() && slots.ToEpoch(slot) >= params.BeaconConfig().Eip7594ForkEpoch
return slots.ToEpoch(slot) >= params.BeaconConfig().Eip7594ForkEpoch
}
// CanUpgradeToAltair returns true if the input `slot` can upgrade to Altair.

View File

@@ -15,8 +15,6 @@ import (
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/sirupsen/logrus"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
@@ -25,6 +23,7 @@ import (
ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
)
type ListenerRebooter interface {
@@ -139,8 +138,7 @@ func (l *listenerWrapper) RebootListener() error {
}
// RefreshPersistentSubnets checks that we are tracking our local persistent subnets for a variety of gossip topics.
// This routine checks for our attestation, sync committee and data column subnets and updates them if they have
// been rotated.
// This routine verifies and updates our attestation and sync committee subnets if they have been rotated.
func (s *Service) RefreshPersistentSubnets() {
// Return early if discv5 service isn't running.
if s.dv5Listener == nil || !s.isInitialized() {

View File

@@ -37,44 +37,6 @@ func (s *Service) connectToPeer(conn network.Conn) {
}).Debug("Initiate peer connection")
}
func (s *Service) disconnectFromPeer(
conn network.Conn,
goodByeFunc func(ctx context.Context, id peer.ID) error,
badPeerErr error,
) {
// Get the remote peer ID.
remotePeerID := conn.RemotePeer()
// Get the direction of the connection.
direction := conn.Stat().Direction.String()
// Get the remote peer multiaddr.
remotePeerMultiAddr := peerMultiaddrString(conn)
// Set the peer to disconnecting state.
s.peers.SetConnectionState(remotePeerID, peers.Disconnecting)
// Only attempt a goodbye if we are still connected to the peer.
if s.host.Network().Connectedness(remotePeerID) == network.Connected {
if err := goodByeFunc(context.TODO(), remotePeerID); err != nil {
log.WithError(err).Error("Unable to disconnect from peer")
}
}
// Get the remaining active peers.
activePeerCount := len(s.peers.Active())
log.
WithError(badPeerErr).
WithFields(logrus.Fields{
"multiaddr": remotePeerMultiAddr,
"direction": direction,
"remainingActivePeers": activePeerCount,
}).
Debug("Initiate peer disconnection")
s.peers.SetConnectionState(remotePeerID, peers.Disconnected)
}
func (s *Service) disconnectFromPeerOnError(
conn network.Conn,
goodByeFunc func(ctx context.Context, id peer.ID) error,

View File

@@ -118,6 +118,15 @@ func NewStatus(ctx context.Context, config *StatusConfig) *Status {
}
}
func (p *Status) UpdateENR(record *enr.Record, pid peer.ID) {
p.store.Lock()
defer p.store.Unlock()
if peerData, ok := p.store.PeerData(pid); ok {
peerData.Enr = record
}
}
// Scorers exposes peer scoring management service.
func (p *Status) Scorers() *scorers.Service {
return p.scorers
@@ -160,14 +169,6 @@ func (p *Status) Add(record *enr.Record, pid peer.ID, address ma.Multiaddr, dire
p.addIpToTracker(pid)
}
func (p *Status) UpdateENR(record *enr.Record, pid peer.ID) {
p.store.Lock()
defer p.store.Unlock()
if peerData, ok := p.store.PeerData(pid); ok {
peerData.Enr = record
}
}
// Address returns the multiaddress of the given remote peer.
// This will error if the peer does not exist.
func (p *Status) Address(pid peer.ID) (ma.Multiaddr, error) {

View File

@@ -26,12 +26,14 @@ import (
"github.com/sirupsen/logrus"
)
var attestationSubnetCount = params.BeaconConfig().AttestationSubnetCount
var syncCommsSubnetCount = params.BeaconConfig().SyncCommitteeSubnetCount
var (
attestationSubnetCount = params.BeaconConfig().AttestationSubnetCount
syncCommsSubnetCount = params.BeaconConfig().SyncCommitteeSubnetCount
var attSubnetEnrKey = params.BeaconNetworkConfig().AttSubnetKey
var syncCommsSubnetEnrKey = params.BeaconNetworkConfig().SyncCommsSubnetKey
var custodySubnetCountEnrKey = params.BeaconNetworkConfig().CustodySubnetCountKey
attSubnetEnrKey = params.BeaconNetworkConfig().AttSubnetKey
syncCommsSubnetEnrKey = params.BeaconNetworkConfig().SyncCommsSubnetKey
custodySubnetCountEnrKey = params.BeaconNetworkConfig().CustodySubnetCountKey
)
// The value used with the subnet, in order
// to create an appropriate key to retrieve

View File

@@ -54,7 +54,7 @@ func (m *MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
}
// RefreshPersistentSubnets .
func (MockPeerManager) RefreshPersistentSubnets() {}
func (*MockPeerManager) RefreshPersistentSubnets() {}
// FindPeersWithSubnet .
func (*MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) {

View File

@@ -3,7 +3,6 @@ package lookup
import (
"context"
"fmt"
"math"
"strconv"
"strings"
@@ -470,26 +469,30 @@ func (p *BeaconDbBlocker) Blobs(ctx context.Context, id string, indices map[uint
// Get the slot of the block.
blockSlot := b.Block().Slot()
// Get the first peerDAS epoch.
eip7594ForkEpoch := params.BeaconConfig().Eip7594ForkEpoch
// Compute the first peerDAS slot.
peerDASStartSlot := primitives.Slot(math.MaxUint64)
if eip7594ForkEpoch != primitives.Epoch(math.MaxUint64) {
peerDASStartSlot, err = slots.EpochStart(eip7594ForkEpoch)
if err != nil {
return nil, &core.RpcError{Err: errors.Wrap(err, "could not calculate peerDAS start slot"), Reason: core.Internal}
}
}
// Is peerDAS enabled for this block?
isPeerDASEnabledForBlock := blockSlot >= peerDASStartSlot
// Create indices if needed.
if indices == nil {
indices = make(map[uint64]bool)
}
if !isPeerDASEnabledForBlock {
if params.PeerDASEnabled() {
// Get the first peerDAS epoch.
eip7594ForkEpoch := params.BeaconConfig().Eip7594ForkEpoch
// Calculate the slot at which peerDAS starts.
peerDASStartSlot, err := slots.EpochStart(eip7594ForkEpoch)
if err != nil {
return nil, &core.RpcError{Err: errors.Wrap(err, "could not calculate peerDAS start slot"), Reason: core.Internal}
}
// Is peerDAS active for this block?
isPeerDASActiveForBlock := blockSlot >= peerDASStartSlot
// If peerDAS is active for this block, then the database contains data columns.
if isPeerDASActiveForBlock {
return p.blobsFromStoredDataColumns(indices, root)
}
// Else, the database contains blobs.
return p.blobsFromStoredBlobs(indices, root)
}

View File

@@ -303,7 +303,7 @@ func (bs *Server) ListIndexedAttestationsElectra(
// that it was included in a block. The attestation may have expired.
// Refer to the ethereum consensus specification for more details on how
// attestations are processed and when they are no longer valid.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/core/0_beacon-chain.md#attestations
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#attestations
func (bs *Server) AttestationPool(_ context.Context, req *ethpb.AttestationPoolRequest) (*ethpb.AttestationPoolResponse, error) {
atts, err := attestationsFromPool[*ethpb.Attestation](req.PageSize, bs.AttestationsPool)
if err != nil {

View File

@@ -10,6 +10,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/checkpoint",
visibility = ["//visibility:public"],
deps = [
"//api/client:go_default_library",
"//api/client/beacon:go_default_library",
"//beacon-chain/db:go_default_library",
"//config/params:go_default_library",

View File

@@ -4,11 +4,14 @@ import (
"context"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/api/client"
"github.com/prysmaticlabs/prysm/v5/api/client/beacon"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/config/params"
)
const stateSizeLimit int64 = 1 << 29 // 512MB
// APIInitializer manages initializing the beacon node using checkpoint sync, retrieving the checkpoint state and root
// from the remote beacon node api.
type APIInitializer struct {
@@ -18,7 +21,7 @@ type APIInitializer struct {
// NewAPIInitializer creates an APIInitializer, handling the set up of a beacon node api client
// using the provided host string.
func NewAPIInitializer(beaconNodeHost string) (*APIInitializer, error) {
c, err := beacon.NewClient(beaconNodeHost)
c, err := beacon.NewClient(beaconNodeHost, client.WithMaxBodySize(stateSizeLimit))
if err != nil {
return nil, errors.Wrapf(err, "unable to parse beacon node url or hostname - %s", beaconNodeHost)
}
@@ -32,10 +35,9 @@ func (dl *APIInitializer) Initialize(ctx context.Context, d db.Database) error {
if err == nil && origin != params.BeaconConfig().ZeroHash {
log.Warnf("Origin checkpoint root %#x found in db, ignoring checkpoint sync flags", origin)
return nil
} else {
if !errors.Is(err, db.ErrNotFound) {
return errors.Wrap(err, "error while checking database for origin root")
}
}
if err != nil && !errors.Is(err, db.ErrNotFound) {
return errors.Wrap(err, "error while checking database for origin root")
}
od, err := beacon.DownloadFinalizedData(ctx, dl.c)
if err != nil {

View File

@@ -1713,7 +1713,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
// Fork epochs.
denebForkEpoch primitives.Epoch
eip7954ForkEpoch primitives.Epoch
eip7594ForkEpoch primitives.Epoch
// Current slot.
currentSlot uint64
@@ -1753,29 +1753,29 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
isError: false,
},
{
name: "All blocks are before EIP-7954 fork epoch",
name: "All blocks are before EIP-7594 fork epoch",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
eip7594ForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{
{slot: 25, hasBlobs: false}, // Before EIP-7954 fork epoch
{slot: 26, hasBlobs: false}, // Before EIP-7954 fork epoch
{slot: 27, hasBlobs: false}, // Before EIP-7954 fork epoch
{slot: 28, hasBlobs: false}, // Before EIP-7954 fork epoch
{slot: 25, hasBlobs: false}, // Before EIP-7594 fork epoch
{slot: 26, hasBlobs: false}, // Before EIP-7594 fork epoch
{slot: 27, hasBlobs: false}, // Before EIP-7594 fork epoch
{slot: 28, hasBlobs: false}, // Before EIP-7594 fork epoch
},
batchSize: 32,
addedRODataColumns: [][]int{nil, nil, nil, nil},
isError: false,
},
{
name: "All blocks with commitments before are EIP-7954 fork epoch",
name: "All blocks with commitments before are EIP-7594 fork epoch",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
eip7594ForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{
{slot: 25, hasBlobs: false}, // Before EIP-7954 fork epoch
{slot: 26, hasBlobs: true}, // Before EIP-7954 fork epoch
{slot: 27, hasBlobs: true}, // Before EIP-7954 fork epoch
{slot: 25, hasBlobs: false}, // Before EIP-7594 fork epoch
{slot: 26, hasBlobs: true}, // Before EIP-7594 fork epoch
{slot: 27, hasBlobs: true}, // Before EIP-7594 fork epoch
{slot: 32, hasBlobs: false},
{slot: 33, hasBlobs: false},
},
@@ -1785,12 +1785,12 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
{
name: "Some blocks with blobs but without any missing data columns",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
eip7594ForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{
{slot: 25, hasBlobs: false}, // Before EIP-7954 fork epoch
{slot: 26, hasBlobs: true}, // Before EIP-7954 fork epoch
{slot: 27, hasBlobs: true}, // Before EIP-7954 fork epoch
{slot: 25, hasBlobs: false}, // Before EIP-7594 fork epoch
{slot: 26, hasBlobs: true}, // Before EIP-7594 fork epoch
{slot: 27, hasBlobs: true}, // Before EIP-7594 fork epoch
{slot: 32, hasBlobs: false},
{slot: 33, hasBlobs: true},
},
@@ -1808,11 +1808,11 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
{
name: "Some blocks with blobs with missing data columns - one round needed",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
eip7594ForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{
{slot: 25, hasBlobs: false}, // Before EIP-7954 fork epoch
{slot: 27, hasBlobs: true}, // Before EIP-7954 fork epoch
{slot: 25, hasBlobs: false}, // Before EIP-7594 fork epoch
{slot: 27, hasBlobs: true}, // Before EIP-7594 fork epoch
{slot: 32, hasBlobs: false},
{slot: 33, hasBlobs: true},
{slot: 34, hasBlobs: true},
@@ -1916,7 +1916,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
{
name: "Some blocks with blobs with missing data columns - partial responses",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
eip7594ForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{
{slot: 33, hasBlobs: true},
@@ -1970,7 +1970,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
{
name: "Some blocks with blobs with missing data columns - first response is invalid",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
eip7594ForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{
{slot: 38, hasBlobs: true},
@@ -2004,7 +2004,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
{
name: "Some blocks with blobs with missing data columns - first response is empty",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
eip7594ForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{{slot: 38, hasBlobs: true}},
storedDataColumns: []map[int]bool{{38: true, 102: true}},
@@ -2033,7 +2033,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
{
name: "Some blocks with blobs with missing data columns - no response at all",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
eip7594ForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{{slot: 38, hasBlobs: true}},
storedDataColumns: []map[int]bool{{38: true, 102: true}},
@@ -2056,7 +2056,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
{
name: "Some blocks with blobs with missing data columns - request has to be split",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
eip7594ForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{
{slot: 32, hasBlobs: true}, {slot: 33, hasBlobs: true}, {slot: 34, hasBlobs: true}, {slot: 35, hasBlobs: true}, // 4
@@ -2177,7 +2177,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
params.BeaconConfig().DenebForkEpoch = tc.denebForkEpoch
// Set the EIP-7594 fork epoch.
params.BeaconConfig().Eip7594ForkEpoch = tc.eip7954ForkEpoch
params.BeaconConfig().Eip7594ForkEpoch = tc.eip7594ForkEpoch
// Save the blocks in the store.
storage := make(map[[fieldparams.RootLength]byte][]int)

View File

@@ -754,24 +754,38 @@ func (s *Service) subscribeDynamicWithColumnSubnets(
handle subHandler,
digest [4]byte,
) {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
_, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:])
if err != nil {
panic(err)
}
base := p2p.GossipTopicMappings(topicFormat, e)
// Retrieve the base protobuf message.
base := p2p.GossipTopicMappings(topicFormat, epoch)
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat))
}
subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().DataColumnSidecarSubnetCount)
genesis := s.cfg.clock.GenesisTime()
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
// Retrieve the genesis time.
genesisTime := s.cfg.clock.GenesisTime()
// Define a ticker ticking every slot.
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot)
// Retrieve the current slot.
currentSlot := s.cfg.clock.CurrentSlot()
wantedSubs := s.retrieveActiveColumnSubnets()
for _, idx := range wantedSubs {
s.subscribeWithBase(s.addDigestAndIndexToTopic(topicFormat, digest, idx), validate, handle)
}
go func() {
// Subscribe to the sync subnets.
s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
for {
select {
case <-s.ctx.Done():
@@ -781,7 +795,7 @@ func (s *Service) subscribeDynamicWithColumnSubnets(
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
continue
}
valid, err := isDigestValid(digest, genesis, genRoot)
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
if err != nil {
log.Error(err)
continue

View File

@@ -42,11 +42,13 @@ var downloadCmd = &cli.Command{
},
}
const stateSizeLimit int64 = 1 << 29 // 512MB to accommodate future state growth
func cliActionDownload(_ *cli.Context) error {
ctx := context.Background()
f := downloadFlags
opts := []client.ClientOpt{client.WithTimeout(f.Timeout)}
opts := []client.ClientOpt{client.WithTimeout(f.Timeout), client.WithMaxBodySize(stateSizeLimit)}
client, err := beacon.NewClient(downloadFlags.BeaconNodeHost, opts...)
if err != nil {
return err

View File

@@ -105,7 +105,7 @@ func (MetadataV0) Version() int {
// MetadataV1
// ----------
// MetadataV1 is a convenience wrapper around our metadata v2 protobuf object.
// MetadataV1 is a convenience wrapper around our metadata v1 protobuf object.
type MetadataV1 struct {
md *pb.MetaDataV1
}

View File

@@ -91,7 +91,7 @@ service BeaconChain {
// that it was included in a block. The attestation may have expired.
// Refer to the Ethereum Beacon Chain specification for more details on how
// attestations are processed and when they are no longer valid.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/core/0_beacon-chain.md#attestations
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#attestations
rpc AttestationPool(AttestationPoolRequest) returns (AttestationPoolResponse) {
option (google.api.http) = {
get: "/eth/v1alpha1/beacon/attestations/pool"
@@ -106,7 +106,7 @@ service BeaconChain {
// that it was included in a block. The attestation may have expired.
// Refer to the Ethereum Beacon Chain specification for more details on how
// attestations are processed and when they are no longer valid.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/core/0_beacon-chain.md#attestations
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#attestations
rpc AttestationPoolElectra(AttestationPoolRequest) returns (AttestationPoolElectraResponse) {
option (google.api.http) = {
get: "/eth/v1alpha1/beacon/attestations/pool_electra"