Compare commits

...

22 Commits

Author SHA1 Message Date
terence tsao
d0f5745d1f Log bad att 2025-11-02 16:19:35 -08:00
Manu NALEPA
b2a9db0826 BeaconBlockContainerToSignedBeaconBlock: Add Fulu. (#15940) 2025-11-01 23:18:42 +00:00
Manu NALEPA
040661bd68 Update go-netroute to v0.4.0. (#15949) 2025-10-31 20:46:06 +00:00
fernantho
d3bd0eaa30 SSZ-QL: update "path parsing" data types (#15935)
* updated path processing data types, refactored ParsePath and fixed tests

* updated generalized index accordingly, changed input parameter path type from []PathElemen to Path

* updated query.go accordingly, changed input parameter path type from []PathElemen to Path

* added descriptive changelog

* Update encoding/ssz/query/path.go

Co-authored-by: Jun Song <87601811+syjn99@users.noreply.github.com>

* Added documentation for Path struct and renamed  to  for clarity

* Update encoding/ssz/query/path.go

Co-authored-by: Radosław Kapka <radoslaw.kapka@gmail.com>

* updated changelog to its correct type: Changed

* updated outdated comment in generalized_index.go and removed test in generalized_index_test.go as this one belongs in path_test.go

* Added validateRawPath with strict raw-path validation only - no raw-path fixing is added. Added test suite covering

* added extra tests for wrongly formated paths

---------

Co-authored-by: Jun Song <87601811+syjn99@users.noreply.github.com>
Co-authored-by: Radosław Kapka <radoslaw.kapka@gmail.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2025-10-31 17:37:59 +00:00
Preston Van Loon
577899bfec P2p active val count lock (#15955)
* Add a lock for p2p computation of active validator count and limit only to topics that need it.

* Changelog fragment

* Update gossip_scoring_params.go

Wrap errors
2025-10-31 15:25:18 +00:00
Muzry
374bae9c81 Fix incorrect version used when sending attestation version in Fulu (#15950)
* Fix incorrect version used when sending attestation version in Fulu

* update typo

* fix Eth-Consensus-Version in submit_signed_aggregate_proof.go
2025-10-31 13:17:44 +00:00
kasey
3e0492a636 also ignore errors from readdirnames (#15947)
* also ignore errors from readdirnames

* test case for empty blobs dir

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
2025-10-30 19:02:25 +00:00
rocksload
5a1a5b5ae5 refactor: use slices.Contains to simplify code (#15646)
Signed-off-by: rocksload <rocksload@outlook.com>
2025-10-29 14:40:33 +00:00
james-prysm
dbb2f0b047 changelog (#15929) 2025-10-28 15:42:05 +00:00
Manu NALEPA
7b3c11c818 Do not serve sidecars if corresponding block is not available in the database (#15933)
* Implement `AvailableBlocks`.

* `blobSidecarByRootRPCHandler`: Do not serve a sidecar if the corresponding block is not available.

* `dataColumnSidecarByRootRPCHandler`: Do not do extra work if only needed for TRACE logging.

* `TestDataColumnSidecarsByRootRPCHandler`: Re-arrange (no functional change).

* `TestDataColumnSidecarsByRootRPCHandler`: Save blocks corresponding to sidecars into DB.

* `dataColumnSidecarByRootRPCHandler`: Do not serve a sidecar if the corresponding block is not available.

* Add changelog

* `TestDataColumnSidecarsByRootRPCHandler`: Use `assert` instead of `require` in goroutines.

https://github.com/stretchr/testify?tab=readme-ov-file#require-package
2025-10-28 15:39:35 +00:00
Manu NALEPA
c9b34d556d Update go-netroute to v0.3.0 (#15934) 2025-10-28 12:57:14 +00:00
fernantho
10a2f0687b SSZ-QL: calculate generalized indices for elements (#15873)
* added tests for calculating generalized indices

* added first version of GI calculation walking the specified path with no recursion. Extended test coverage for bitlist and bitvectors.
vectors need more testing

* refactored code. Detached PathElement processing, currently done at the beginning. Swap to regex to gain flexibility.

* added an updateRoot function with the GI formula. more refactoring

* added changelog

* replaced TODO tag

* udpated some comments

* simplified code - removed duplicated code in processingLengthField function

* run gazelle

* merging all input path processing into path.go

* reviewed Jun's feedback

* removed unnecessary idx pointer var + fixed error with length data type (uint64 instead of uint8)

* refactored path.go after merging path elements from generalized_indices.go

* re-computed GIs for tests as VariableTestContainer added a new field.

* added minor comment - rawPath MUST be snake case

removed extractFieldName func.

* fixed vector GI calculation - updated tests GIs

* removed updateRoot function in favor of inline code

* path input data enforced to be snake case

* added sanity checks for accessing outbound element indices - checked against vector.length/list.limit

* fixed issues triggered after merging develop

* Removed redundant comment

Co-authored-by: Jun Song <87601811+syjn99@users.noreply.github.com>

* removed unreachable condition as `strings.Split` always return a slice with length >= 1

If s does not contain sep and sep is not empty, Split returns a slice of
length 1 whose only element is s.

* added tests to cover edge cases + cleaned code (toLower is no longer needed in extractFieldName function

* added Jun's feedback + more testing

* postponed snake case conversion to do it on a per-element-basis. Added more testing focused mainly in snake case conversion

* addressed several Jun's comments.

* added sanity check to prevent length of a multi-dimensional array. added more tests with extended paths

* Update encoding/ssz/query/generalized_index.go

Co-authored-by: Radosław Kapka <radoslaw.kapka@gmail.com>

* Update encoding/ssz/query/generalized_index.go

Co-authored-by: Radosław Kapka <radoslaw.kapka@gmail.com>

* Update encoding/ssz/query/generalized_index.go

Co-authored-by: Radosław Kapka <radoslaw.kapka@gmail.com>

* placed constant bitsPerChunk in the right place. Exported BitsPerChunk and BytesPerChunk and updated code that use them

* added helpers for computing GI of each data type

* changed %q in favor of %s

* Update encoding/ssz/query/path.go

Co-authored-by: Jun Song <87601811+syjn99@users.noreply.github.com>

* removed the least restrictive condition isBasicType

* replaced length of containerInfo.order for containerInfo.fields for clarity

* removed outdated comment

* removed toSnakeCase conversion.

* moved isBasicType func to its natural place, SSZType

* cosmetic refactor

- renamed itemLengthFromInfo to itemLength (same name is in spec).
- arranged all SSZ helpers.

* cleaned tests

* renamed "root" to "index"

* removed unnecessary check for negative integers. Replaced %q for %s.

* refactored regex variables and prevented re-assignation

* added length regex explanation

* added more testing for stressing regex for path processing

* renamed currentIndex to parentIndex for clarity and documented the returns from calculate<Type>GeneralizedIndex functions

* Update encoding/ssz/query/generalized_index.go

Co-authored-by: Radosław Kapka <radoslaw.kapka@gmail.com>

* run gazelle

* fixed never asserted error. Updated error message

---------

Co-authored-by: Jun Song <87601811+syjn99@users.noreply.github.com>
Co-authored-by: Radosław Kapka <radoslaw.kapka@gmail.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2025-10-27 23:27:34 +00:00
Manu NALEPA
4fb75d6d0b Add some metrics improvements (#15922)
* Define TCP and QUIC as `InternetProtocol` (no functional change).

* Group types. (No functional changes)

* Rename variables and use range syntax.

* Add `p2pMaxPeers` and `p2pPeerCountDirectionType` metrics

* `p2p_subscribed_topic_peer_total`: Reset to avoid dangling values.

* `validateConfig`:
- Use `Warning` with fields instead of `Warnf`.
- Avoid to both modify in place the input value and return it.

* Add `p2p_minimum_peers_per_subnet` metric.

* `beaconConfig` => `cfg`.

https://github.com/OffchainLabs/prysm/pull/15880#discussion_r2436826215

* Add changelog

---------

Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
2025-10-26 15:16:05 +00:00
terence
6d596edea2 Use SlotTicker instead of time.Ticker for attestation pool pruning (#15917)
* Use SlotTicker instead of time.Ticker for attestation pool pruning

* Offset one second before slot start
2025-10-24 15:35:26 +00:00
Bastin
9153c5a202 light client logging (#15927) 2025-10-24 14:42:27 +00:00
james-prysm
26ce94e224 removes misleading keymanager info log (#15926)
* simple change

* fixing test"
"
2025-10-24 14:28:30 +00:00
terence
255ea2fac1 Return optimistic response only when handling blinded blocks (#15925)
* Return optimistic response only when handling blinded blocks in proposer

* Remove blind condition
2025-10-24 03:37:32 +00:00
terence
46bc81b4c8 Add metric to track data columns recovered from execution layer (#15924) 2025-10-23 15:50:25 +00:00
kasey
9c4774b82e default new blob storage layouts to by-epoch (#15904)
* default new blob storage layouts to by-epoch

also, do not log migration message until we see a directory that needs to be migrated

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>

* manu feedback

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
2025-10-22 20:09:18 +00:00
terence
7dd4f5948c Update consensus spec tests to v1.6.0-beta.1 with new hashes and URL template (#15918) 2025-10-22 18:22:19 +00:00
Radosław Kapka
2f090c52d9 Allow custom headers in validator client HTTP requests (#15884)
* Allow custom headers in validator client HTTP requests

* changelog <3

* improve flag description

* Bastin's review

* James' review

* add godoc for NodeConnectionOption
2025-10-22 13:47:35 +00:00
Manu NALEPA
3ecb5d0b67 Remove Reading static P2P private key from a file. log if Fulu is enabled. (#15913) 2025-10-22 11:52:31 +00:00
131 changed files with 2303 additions and 489 deletions

View File

@@ -4,6 +4,29 @@ All notable changes to this project will be documented in this file.
The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.
## [v6.1.4](https://github.com/prysmaticlabs/prysm/compare/v6.1.3...v6.1.4) - 2025-10-24
This release includes a bug fix affecting block proposals in rare cases, along with an important update for Windows users running post-Fusaka fork.
### Added
- SSZ-QL: Add endpoints for `BeaconState`/`BeaconBlock`. [[PR]](https://github.com/prysmaticlabs/prysm/pull/15888)
- Add native state diff type and marshalling functions. [[PR]](https://github.com/prysmaticlabs/prysm/pull/15250)
- Update the earliest available slot after pruning operations in beacon chain database pruner. This ensures the P2P layer accurately knows which historical data is available after pruning, preventing nodes from advertising or attempting to serve data that has been pruned. [[PR]](https://github.com/prysmaticlabs/prysm/pull/15694)
### Fixed
- Correctly advertise (in ENR and beacon API) attestation subnets when using `--subscribe-all-subnets`. [[PR]](https://github.com/prysmaticlabs/prysm/pull/15880)
- `randomPeer`: Return if the context is cancelled when waiting for peers. [[PR]](https://github.com/prysmaticlabs/prysm/pull/15876)
- Improve error message when the byte count read from disk when reading a data column sidecars is lower than expected. (Mostly, because the file is truncated.). [[PR]](https://github.com/prysmaticlabs/prysm/pull/15881)
- Delete the genesis state file when --clear-db / --force-clear-db is specified. [[PR]](https://github.com/prysmaticlabs/prysm/pull/15883)
- Fix sync committee subscription to use subnet indices instead of committee indices. [[PR]](https://github.com/prysmaticlabs/prysm/pull/15885)
- Fixed metadata extraction on Windows by correctly splitting file paths. [[PR]](https://github.com/prysmaticlabs/prysm/pull/15899)
- `VerifyDataColumnsSidecarKZGProofs`: Check if sizes match. [[PR]](https://github.com/prysmaticlabs/prysm/pull/15892)
- Fix recoverStateSummary to persist state summaries in stateSummaryBucket instead of stateBucket (#15896). [[PR]](https://github.com/prysmaticlabs/prysm/pull/15896)
- `updateCustodyInfoInDB`: Use `NumberOfCustodyGroups` instead of `NumberOfColumns`. [[PR]](https://github.com/prysmaticlabs/prysm/pull/15908)
- Sync committee uses correct state to calculate position. [[PR]](https://github.com/prysmaticlabs/prysm/pull/15905)
## [v6.1.3](https://github.com/prysmaticlabs/prysm/compare/v6.1.2...v6.1.3) - 2025-10-20
This release has several important beacon API and p2p fixes.

View File

@@ -253,16 +253,16 @@ filegroup(
url = "https://github.com/ethereum/EIPs/archive/5480440fe51742ed23342b68cf106cefd427e39d.tar.gz",
)
consensus_spec_version = "v1.6.0-beta.0"
consensus_spec_version = "v1.6.0-beta.1"
load("@prysm//tools:download_spectests.bzl", "consensus_spec_tests")
consensus_spec_tests(
name = "consensus_spec_tests",
flavors = {
"general": "sha256-rT3jQp2+ZaDiO66gIQggetzqr+kGeexaLqEhbx4HDMY=",
"minimal": "sha256-wowwwyvd0KJLsE+oDOtPkrhZyJndJpJ0lbXYsLH6XBw=",
"mainnet": "sha256-4ZLrLNeO7NihZ4TuWH5V5fUhvW9Y3mAPBQDCqrfShps=",
"general": "sha256-oEj0MTViJHjZo32nABK36gfvSXpbwkBk/jt6Mj7pWFI=",
"minimal": "sha256-cS4NPv6IRBoCSmWomQ8OEo8IsVNW9YawUFqoRZQBUj4=",
"mainnet": "sha256-BYuLndMPAh4p13IRJgNfVakrCVL69KRrNw2tdc3ETbE=",
},
version = consensus_spec_version,
)
@@ -278,7 +278,7 @@ filegroup(
visibility = ["//visibility:public"],
)
""",
integrity = "sha256-sBe3Rx8zGq9IrvfgIhZQpYidGjy3mE1SiCb6/+pjLdY=",
integrity = "sha256-yrq3tdwPS8Ri+ueeLAHssIT3ssMrX7zvHiJ8Xf9GVYs=",
strip_prefix = "consensus-specs-" + consensus_spec_version[1:],
url = "https://github.com/ethereum/consensus-specs/archive/refs/tags/%s.tar.gz" % consensus_spec_version,
)

View File

@@ -6,6 +6,7 @@ go_library(
"client.go",
"errors.go",
"options.go",
"transport.go",
],
importpath = "github.com/OffchainLabs/prysm/v6/api/client",
visibility = ["//visibility:public"],
@@ -14,7 +15,13 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["client_test.go"],
srcs = [
"client_test.go",
"transport_test.go",
],
embed = [":go_default_library"],
deps = ["//testing/require:go_default_library"],
deps = [
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
],
)

25
api/client/transport.go Normal file
View File

@@ -0,0 +1,25 @@
package client
import "net/http"
// CustomHeadersTransport adds custom headers to each request
type CustomHeadersTransport struct {
base http.RoundTripper
headers map[string][]string
}
func NewCustomHeadersTransport(base http.RoundTripper, headers map[string][]string) *CustomHeadersTransport {
return &CustomHeadersTransport{
base: base,
headers: headers,
}
}
func (t *CustomHeadersTransport) RoundTrip(req *http.Request) (*http.Response, error) {
for header, values := range t.headers {
for _, value := range values {
req.Header.Add(header, value)
}
}
return t.base.RoundTrip(req)
}

View File

@@ -0,0 +1,25 @@
package client
import (
"net/http"
"net/http/httptest"
"testing"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require"
)
type noopTransport struct{}
func (*noopTransport) RoundTrip(*http.Request) (*http.Response, error) {
return nil, nil
}
func TestRoundTrip(t *testing.T) {
tr := &CustomHeadersTransport{base: &noopTransport{}, headers: map[string][]string{"key1": []string{"value1", "value2"}, "key2": []string{"value3"}}}
req := httptest.NewRequest("GET", "http://foo", nil)
_, err := tr.RoundTrip(req)
require.NoError(t, err)
assert.DeepEqual(t, []string{"value1", "value2"}, req.Header.Values("key1"))
assert.DeepEqual(t, []string{"value3"}, req.Header.Values("key2"))
}

View File

@@ -472,8 +472,8 @@ func (s *Service) removeStartupState() {
func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot, uint64, error) {
isSubscribedToAllDataSubnets := flags.Get().SubscribeAllDataSubnets
beaconConfig := params.BeaconConfig()
custodyRequirement := beaconConfig.CustodyRequirement
cfg := params.BeaconConfig()
custodyRequirement := cfg.CustodyRequirement
// Check if the node was previously subscribed to all data subnets, and if so,
// store the new status accordingly.
@@ -493,7 +493,7 @@ func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot,
// Compute the custody group count.
custodyGroupCount := custodyRequirement
if isSubscribedToAllDataSubnets {
custodyGroupCount = beaconConfig.NumberOfCustodyGroups
custodyGroupCount = cfg.NumberOfCustodyGroups
}
// Safely compute the fulu fork slot.
@@ -536,11 +536,11 @@ func spawnCountdownIfPreGenesis(ctx context.Context, genesisTime time.Time, db d
}
func fuluForkSlot() (primitives.Slot, error) {
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
fuluForkEpoch := beaconConfig.FuluForkEpoch
if fuluForkEpoch == beaconConfig.FarFutureEpoch {
return beaconConfig.FarFutureSlot, nil
fuluForkEpoch := cfg.FuluForkEpoch
if fuluForkEpoch == cfg.FarFutureEpoch {
return cfg.FarFutureSlot, nil
}
forkFuluSlot, err := slots.EpochStart(fuluForkEpoch)

View File

@@ -472,6 +472,36 @@ func (s *ChainService) HasBlock(ctx context.Context, rt [32]byte) bool {
return s.InitSyncBlockRoots[rt]
}
func (s *ChainService) AvailableBlocks(ctx context.Context, blockRoots [][32]byte) map[[32]byte]bool {
if s.DB == nil {
return nil
}
count := len(blockRoots)
availableRoots := make(map[[32]byte]bool, count)
notInDBRoots := make([][32]byte, 0, count)
for _, root := range blockRoots {
if s.DB.HasBlock(ctx, root) {
availableRoots[root] = true
continue
}
notInDBRoots = append(notInDBRoots, root)
}
if s.InitSyncBlockRoots == nil {
return availableRoots
}
for _, root := range notInDBRoots {
if s.InitSyncBlockRoots[root] {
availableRoots[root] = true
}
}
return availableRoots
}
// RecentBlockSlot mocks the same method in the chain service.
func (s *ChainService) RecentBlockSlot([32]byte) (primitives.Slot, error) {
return s.BlockSlot, nil

View File

@@ -3,6 +3,7 @@ package blockchain
import (
"context"
"fmt"
"slices"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filters"
"github.com/OffchainLabs/prysm/v6/config/params"
@@ -81,12 +82,10 @@ func (v *WeakSubjectivityVerifier) VerifyWeakSubjectivity(ctx context.Context, f
if err != nil {
return errors.Wrap(err, "error while retrieving block roots to verify weak subjectivity")
}
for _, root := range roots {
if v.root == root {
log.Info("Weak subjectivity check has passed!!")
v.verified = true
return nil
}
if slices.Contains(roots, v.root) {
log.Info("Weak subjectivity check has passed!!")
v.verified = true
return nil
}
return errors.Wrap(errWSBlockNotFoundInEpoch, fmt.Sprintf("root=%#x, epoch=%d", v.root, v.epoch))
}

View File

@@ -401,7 +401,7 @@ func ComputeProposerIndex(bState state.ReadOnlyBeaconState, activeIndices []prim
return 0, errors.New("empty active indices list")
}
hashFunc := hash.CustomSHA256Hasher()
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
seedBuffer := make([]byte, len(seed)+8)
copy(seedBuffer, seed[:])
@@ -426,14 +426,14 @@ func ComputeProposerIndex(bState state.ReadOnlyBeaconState, activeIndices []prim
offset := (i % 16) * 2
randomValue := uint64(randomBytes[offset]) | uint64(randomBytes[offset+1])<<8
if effectiveBal*fieldparams.MaxRandomValueElectra >= beaconConfig.MaxEffectiveBalanceElectra*randomValue {
if effectiveBal*fieldparams.MaxRandomValueElectra >= cfg.MaxEffectiveBalanceElectra*randomValue {
return candidateIndex, nil
}
} else {
binary.LittleEndian.PutUint64(seedBuffer[len(seed):], i/32)
randomByte := hashFunc(seedBuffer)[i%32]
if effectiveBal*fieldparams.MaxRandomByte >= beaconConfig.MaxEffectiveBalance*uint64(randomByte) {
if effectiveBal*fieldparams.MaxRandomByte >= cfg.MaxEffectiveBalance*uint64(randomByte) {
return candidateIndex, nil
}
}

View File

@@ -89,14 +89,14 @@ func CustodyGroups(nodeId enode.ID, custodyGroupCount uint64) ([]uint64, error)
// ComputeColumnsForCustodyGroup computes the columns for a given custody group.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#compute_columns_for_custody_group
func ComputeColumnsForCustodyGroup(custodyGroup uint64) ([]uint64, error) {
beaconConfig := params.BeaconConfig()
numberOfCustodyGroups := beaconConfig.NumberOfCustodyGroups
cfg := params.BeaconConfig()
numberOfCustodyGroups := cfg.NumberOfCustodyGroups
if custodyGroup >= numberOfCustodyGroups {
return nil, ErrCustodyGroupTooLarge
}
numberOfColumns := beaconConfig.NumberOfColumns
numberOfColumns := cfg.NumberOfColumns
columnsPerGroup := numberOfColumns / numberOfCustodyGroups
@@ -112,9 +112,9 @@ func ComputeColumnsForCustodyGroup(custodyGroup uint64) ([]uint64, error) {
// ComputeCustodyGroupForColumn computes the custody group for a given column.
// It is the reciprocal function of ComputeColumnsForCustodyGroup.
func ComputeCustodyGroupForColumn(columnIndex uint64) (uint64, error) {
beaconConfig := params.BeaconConfig()
numberOfColumns := beaconConfig.NumberOfColumns
numberOfCustodyGroups := beaconConfig.NumberOfCustodyGroups
cfg := params.BeaconConfig()
numberOfColumns := cfg.NumberOfColumns
numberOfCustodyGroups := cfg.NumberOfCustodyGroups
if columnIndex >= numberOfColumns {
return 0, ErrIndexTooLarge

View File

@@ -84,10 +84,10 @@ func ValidatorsCustodyRequirement(state beaconState.ReadOnlyBeaconState, validat
totalNodeBalance += validator.EffectiveBalance()
}
beaconConfig := params.BeaconConfig()
numberOfCustodyGroups := beaconConfig.NumberOfCustodyGroups
validatorCustodyRequirement := beaconConfig.ValidatorCustodyRequirement
balancePerAdditionalCustodyGroup := beaconConfig.BalancePerAdditionalCustodyGroup
cfg := params.BeaconConfig()
numberOfCustodyGroups := cfg.NumberOfCustodyGroups
validatorCustodyRequirement := cfg.ValidatorCustodyRequirement
balancePerAdditionalCustodyGroup := cfg.BalancePerAdditionalCustodyGroup
count := totalNodeBalance / balancePerAdditionalCustodyGroup
return min(max(count, validatorCustodyRequirement), numberOfCustodyGroups), nil

View File

@@ -196,7 +196,7 @@ func TestAltairCompatible(t *testing.T) {
}
func TestCanUpgradeTo(t *testing.T) {
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
outerTestCases := []struct {
name string
@@ -205,32 +205,32 @@ func TestCanUpgradeTo(t *testing.T) {
}{
{
name: "Altair",
forkEpoch: &beaconConfig.AltairForkEpoch,
forkEpoch: &cfg.AltairForkEpoch,
upgradeFunc: time.CanUpgradeToAltair,
},
{
name: "Bellatrix",
forkEpoch: &beaconConfig.BellatrixForkEpoch,
forkEpoch: &cfg.BellatrixForkEpoch,
upgradeFunc: time.CanUpgradeToBellatrix,
},
{
name: "Capella",
forkEpoch: &beaconConfig.CapellaForkEpoch,
forkEpoch: &cfg.CapellaForkEpoch,
upgradeFunc: time.CanUpgradeToCapella,
},
{
name: "Deneb",
forkEpoch: &beaconConfig.DenebForkEpoch,
forkEpoch: &cfg.DenebForkEpoch,
upgradeFunc: time.CanUpgradeToDeneb,
},
{
name: "Electra",
forkEpoch: &beaconConfig.ElectraForkEpoch,
forkEpoch: &cfg.ElectraForkEpoch,
upgradeFunc: time.CanUpgradeToElectra,
},
{
name: "Fulu",
forkEpoch: &beaconConfig.FuluForkEpoch,
forkEpoch: &cfg.FuluForkEpoch,
upgradeFunc: time.CanUpgradeToFulu,
},
}
@@ -238,7 +238,7 @@ func TestCanUpgradeTo(t *testing.T) {
for _, otc := range outerTestCases {
params.SetupTestConfigCleanup(t)
*otc.forkEpoch = 5
params.OverrideBeaconConfig(beaconConfig)
params.OverrideBeaconConfig(cfg)
innerTestCases := []struct {
name string

View File

@@ -212,7 +212,8 @@ func filterNoop(_ string) bool {
return true
}
func isRootDir(p string) bool {
// IsBlockRootDir returns true if the path segment looks like a block root directory.
func IsBlockRootDir(p string) bool {
dir := filepath.Base(p)
return len(dir) == rootStringLen && strings.HasPrefix(dir, "0x")
}

View File

@@ -188,7 +188,7 @@ func TestListDir(t *testing.T) {
name: "root filter",
dirPath: ".",
expected: []string{childlessBlob.name, blobWithSsz.name, blobWithSszAndTmp.name},
filter: isRootDir,
filter: IsBlockRootDir,
},
{
name: "ssz filter",

View File

@@ -19,12 +19,14 @@ import (
const (
// Full root in directory will be 66 chars, eg:
// >>> len('0x0002fb4db510b8618b04dc82d023793739c26346a8b02eb73482e24b0fec0555') == 66
rootStringLen = 66
sszExt = "ssz"
partExt = "part"
periodicEpochBaseDir = "by-epoch"
rootStringLen = 66
sszExt = "ssz"
partExt = "part"
)
// PeriodicEpochBaseDir is the name of the base directory for the by-epoch layout.
const PeriodicEpochBaseDir = "by-epoch"
const (
LayoutNameFlat = "flat"
LayoutNameByEpoch = "by-epoch"
@@ -130,11 +132,11 @@ func migrateLayout(fs afero.Fs, from, to fsLayout, cache *blobStorageSummaryCach
if iter.atEOF() {
return errLayoutNotDetected
}
log.WithField("fromLayout", from.name()).WithField("toLayout", to.name()).Info("Migrating blob filesystem layout. This one-time operation can take extra time (up to a few minutes for systems with extended blob storage and a cold disk cache).")
lastMoved := ""
parentDirs := make(map[string]bool) // this map should have < 65k keys by design
moved := 0
dc := newDirCleaner()
migrationLogged := false
for ident, err := iter.next(); !errors.Is(err, io.EOF); ident, err = iter.next() {
if err != nil {
if errors.Is(err, errIdentFailure) {
@@ -146,6 +148,11 @@ func migrateLayout(fs afero.Fs, from, to fsLayout, cache *blobStorageSummaryCach
}
return errors.Wrapf(errMigrationFailure, "failed to iterate previous layout structure while migrating blobs, err=%s", err.Error())
}
if !migrationLogged {
log.WithField("fromLayout", from.name()).WithField("toLayout", to.name()).
Info("Migrating blob filesystem layout. This one-time operation can take extra time (up to a few minutes for systems with extended blob storage and a cold disk cache).")
migrationLogged = true
}
src := from.dir(ident)
target := to.dir(ident)
if src != lastMoved {

View File

@@ -34,7 +34,7 @@ func (l *periodicEpochLayout) name() string {
func (l *periodicEpochLayout) blockParentDirs(ident blobIdent) []string {
return []string{
periodicEpochBaseDir,
PeriodicEpochBaseDir,
l.periodDir(ident.epoch),
l.epochDir(ident.epoch),
}
@@ -50,28 +50,28 @@ func (l *periodicEpochLayout) notify(ident blobIdent) error {
// If before == 0, it won't be used as a filter and all idents will be returned.
func (l *periodicEpochLayout) iterateIdents(before primitives.Epoch) (*identIterator, error) {
_, err := l.fs.Stat(periodicEpochBaseDir)
_, err := l.fs.Stat(PeriodicEpochBaseDir)
if err != nil {
if os.IsNotExist(err) {
return &identIterator{eof: true}, nil // The directory is non-existent, which is fine; stop iteration.
}
return nil, errors.Wrapf(err, "error reading path %s", periodicEpochBaseDir)
return nil, errors.Wrapf(err, "error reading path %s", PeriodicEpochBaseDir)
}
// iterate root, which should have directories named by "period"
entries, err := listDir(l.fs, periodicEpochBaseDir)
entries, err := listDir(l.fs, PeriodicEpochBaseDir)
if err != nil {
return nil, errors.Wrapf(err, "failed to list %s", periodicEpochBaseDir)
return nil, errors.Wrapf(err, "failed to list %s", PeriodicEpochBaseDir)
}
return &identIterator{
fs: l.fs,
path: periodicEpochBaseDir,
path: PeriodicEpochBaseDir,
// Please see comments on the `layers` field in `identIterator`` if the role of the layers is unclear.
layers: []layoutLayer{
{populateIdent: populateNoop, filter: isBeforePeriod(before)},
{populateIdent: populateEpoch, filter: isBeforeEpoch(before)},
{populateIdent: populateRoot, filter: isRootDir}, // extract root from path
{populateIdent: populateIndex, filter: isSszFile}, // extract index from filename
{populateIdent: populateRoot, filter: IsBlockRootDir}, // extract root from path
{populateIdent: populateIndex, filter: isSszFile}, // extract index from filename
},
entries: entries,
}, nil
@@ -98,7 +98,7 @@ func (l *periodicEpochLayout) epochDir(epoch primitives.Epoch) string {
}
func (l *periodicEpochLayout) periodDir(epoch primitives.Epoch) string {
return filepath.Join(periodicEpochBaseDir, fmt.Sprintf("%d", periodForEpoch(epoch)))
return filepath.Join(PeriodicEpochBaseDir, fmt.Sprintf("%d", periodForEpoch(epoch)))
}
func (l *periodicEpochLayout) sszPath(n blobIdent) string {

View File

@@ -30,7 +30,7 @@ func (l *flatLayout) iterateIdents(before primitives.Epoch) (*identIterator, err
if os.IsNotExist(err) {
return &identIterator{eof: true}, nil // The directory is non-existent, which is fine; stop iteration.
}
return nil, errors.Wrapf(err, "error reading path %s", periodicEpochBaseDir)
return nil, errors.Wrap(err, "error reading blob base dir")
}
entries, err := listDir(l.fs, ".")
if err != nil {
@@ -199,10 +199,10 @@ func (l *flatSlotReader) isSSZAndBefore(fname string) bool {
// the epoch can be determined.
func isFlatCachedAndBefore(cache *blobStorageSummaryCache, before primitives.Epoch) func(string) bool {
if before == 0 {
return isRootDir
return IsBlockRootDir
}
return func(p string) bool {
if !isRootDir(p) {
if !IsBlockRootDir(p) {
return false
}
root, err := rootFromPath(p)

View File

@@ -28,6 +28,7 @@ type ReadOnlyDatabase interface {
BlocksBySlot(ctx context.Context, slot primitives.Slot) ([]interfaces.ReadOnlySignedBeaconBlock, error)
BlockRootsBySlot(ctx context.Context, slot primitives.Slot) (bool, [][32]byte, error)
HasBlock(ctx context.Context, blockRoot [32]byte) bool
AvailableBlocks(ctx context.Context, blockRoots [][32]byte) map[[32]byte]bool
GenesisBlock(ctx context.Context) (interfaces.ReadOnlySignedBeaconBlock, error)
GenesisBlockRoot(ctx context.Context) ([32]byte, error)
IsFinalizedBlock(ctx context.Context, blockRoot [32]byte) bool

View File

@@ -336,6 +336,42 @@ func (s *Store) HasBlock(ctx context.Context, blockRoot [32]byte) bool {
return exists
}
// AvailableBlocks returns a set of roots indicating which blocks corresponding to `blockRoots` are available in the storage.
func (s *Store) AvailableBlocks(ctx context.Context, blockRoots [][32]byte) map[[32]byte]bool {
_, span := trace.StartSpan(ctx, "BeaconDB.AvailableBlocks")
defer span.End()
count := len(blockRoots)
availableRoots := make(map[[32]byte]bool, count)
// First, check the cache for each block root.
notInCacheRoots := make([][32]byte, 0, count)
for _, root := range blockRoots {
if v, ok := s.blockCache.Get(string(root[:])); v != nil && ok {
availableRoots[root] = true
continue
}
notInCacheRoots = append(notInCacheRoots, root)
}
// Next, check the database for the remaining block roots.
if err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(blocksBucket)
for _, root := range notInCacheRoots {
if bkt.Get(root[:]) != nil {
availableRoots[root] = true
}
}
return nil
}); err != nil {
panic(err) // lint:nopanic -- View never returns an error.
}
return availableRoots
}
// BlocksBySlot retrieves a list of beacon blocks and its respective roots by slot.
func (s *Store) BlocksBySlot(ctx context.Context, slot primitives.Slot) ([]interfaces.ReadOnlySignedBeaconBlock, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.BlocksBySlot")

View File

@@ -656,6 +656,44 @@ func TestStore_BlocksCRUD_NoCache(t *testing.T) {
}
}
func TestAvailableBlocks(t *testing.T) {
ctx := t.Context()
db := setupDB(t)
b0, b1, b2 := util.NewBeaconBlock(), util.NewBeaconBlock(), util.NewBeaconBlock()
b0.Block.Slot, b1.Block.Slot, b2.Block.Slot = 10, 20, 30
sb0, err := blocks.NewSignedBeaconBlock(b0)
require.NoError(t, err)
r0, err := b0.Block.HashTreeRoot()
require.NoError(t, err)
// Save b0 but remove it from cache.
err = db.SaveBlock(ctx, sb0)
require.NoError(t, err)
db.blockCache.Del(string(r0[:]))
// b1 is not saved at all.
r1, err := b1.Block.HashTreeRoot()
require.NoError(t, err)
// Save b2 in cache and DB.
sb2, err := blocks.NewSignedBeaconBlock(b2)
require.NoError(t, err)
r2, err := b2.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, db.SaveBlock(ctx, sb2))
require.NoError(t, err)
expected := map[[32]byte]bool{r0: true, r2: true}
actual := db.AvailableBlocks(ctx, [][32]byte{r0, r1, r2})
require.Equal(t, len(expected), len(actual))
for i := range expected {
require.Equal(t, true, actual[i])
}
}
func TestStore_Blocks_FiltersCorrectly(t *testing.T) {
for _, tt := range blockTests {
t.Run(tt.name, func(t *testing.T) {

View File

@@ -6,6 +6,7 @@ go_library(
"cache.go",
"helpers.go",
"lightclient.go",
"log.go",
"store.go",
],
importpath = "github.com/OffchainLabs/prysm/v6/beacon-chain/light-client",

View File

@@ -0,0 +1,5 @@
package light_client
import "github.com/sirupsen/logrus"
var log = logrus.WithField("prefix", "light-client")

View File

@@ -14,7 +14,6 @@ import (
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
var ErrLightClientBootstrapNotFound = errors.New("light client bootstrap not found")

View File

@@ -12,6 +12,7 @@ import (
"os"
"os/signal"
"path/filepath"
"slices"
"strconv"
"strings"
"sync"
@@ -1135,10 +1136,8 @@ func (b *BeaconNode) registerLightClientStore() {
func hasNetworkFlag(cliCtx *cli.Context) bool {
for _, flag := range features.NetworkFlags {
for _, name := range flag.Names() {
if cliCtx.IsSet(name) {
return true
}
if slices.ContainsFunc(flag.Names(), cliCtx.IsSet) {
return true
}
}
return false

View File

@@ -10,11 +10,13 @@ import (
// pruneExpired prunes attestations pool on every slot interval.
func (s *Service) pruneExpired() {
ticker := time.NewTicker(s.cfg.pruneInterval)
defer ticker.Stop()
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
offset := time.Duration(secondsPerSlot-1) * time.Second
slotTicker := slots.NewSlotTickerWithOffset(s.genesisTime, offset, secondsPerSlot)
defer slotTicker.Done()
for {
select {
case <-ticker.C:
case <-slotTicker.C():
s.pruneExpiredAtts()
s.updateMetrics()
case <-s.ctx.Done():

View File

@@ -17,7 +17,9 @@ import (
)
func TestPruneExpired_Ticker(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 3*time.Second)
// Need timeout longer than the offset (secondsPerSlot - 1) + some buffer
timeout := time.Duration(params.BeaconConfig().SecondsPerSlot+5) * time.Second
ctx, cancel := context.WithTimeout(t.Context(), timeout)
defer cancel()
s, err := NewService(ctx, &Config{

View File

@@ -7,6 +7,7 @@ import (
statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
"github.com/sirupsen/logrus"
)
// This is the default queue size used if we have specified an invalid one.
@@ -63,12 +64,17 @@ func (cfg *Config) connManagerLowHigh() (int, int) {
return low, high
}
// validateConfig validates whether the values provided are accurate and will set
// the appropriate values for those that are invalid.
func validateConfig(cfg *Config) *Config {
if cfg.QueueSize == 0 {
log.Warnf("Invalid pubsub queue size of %d initialized, setting the quese size as %d instead", cfg.QueueSize, defaultPubsubQueueSize)
cfg.QueueSize = defaultPubsubQueueSize
// validateConfig validates whether the provided config has valid values and sets
// the invalid ones to default.
func validateConfig(cfg *Config) {
if cfg.QueueSize > 0 {
return
}
return cfg
log.WithFields(logrus.Fields{
"queueSize": cfg.QueueSize,
"default": defaultPubsubQueueSize,
}).Warning("Invalid pubsub queue size, setting the queue size to the default value")
cfg.QueueSize = defaultPubsubQueueSize
}

View File

@@ -259,11 +259,11 @@ func (s *Service) custodyGroupCountFromPeerENR(pid peer.ID) uint64 {
}
func fuluForkSlot() (primitives.Slot, error) {
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
fuluForkEpoch := beaconConfig.FuluForkEpoch
if fuluForkEpoch == beaconConfig.FarFutureEpoch {
return beaconConfig.FarFutureSlot, nil
fuluForkEpoch := cfg.FuluForkEpoch
if fuluForkEpoch == cfg.FarFutureEpoch {
return cfg.FarFutureSlot, nil
}
forkFuluSlot, err := slots.EpochStart(fuluForkEpoch)

View File

@@ -2,6 +2,7 @@ package p2p
import (
"context"
"fmt"
"math"
"net"
"reflect"
@@ -106,18 +107,26 @@ func peerScoringParams(colocationWhitelist []*net.IPNet) (*pubsub.PeerScoreParam
}
func (s *Service) topicScoreParams(topic string) (*pubsub.TopicScoreParams, error) {
activeValidators, err := s.retrieveActiveValidators()
if err != nil {
return nil, err
}
switch {
case strings.Contains(topic, GossipBlockMessage):
return defaultBlockTopicParams(), nil
case strings.Contains(topic, GossipAggregateAndProofMessage):
activeValidators, err := s.retrieveActiveValidators()
if err != nil {
return nil, fmt.Errorf("failed to compute active validator count for topic %s: %w", GossipAggregateAndProofMessage, err)
}
return defaultAggregateTopicParams(activeValidators), nil
case strings.Contains(topic, GossipAttestationMessage):
activeValidators, err := s.retrieveActiveValidators()
if err != nil {
return nil, fmt.Errorf("failed to compute active validator count for topic %s: %w", GossipAttestationMessage, err)
}
return defaultAggregateSubnetTopicParams(activeValidators), nil
case strings.Contains(topic, GossipSyncCommitteeMessage):
activeValidators, err := s.retrieveActiveValidators()
if err != nil {
return nil, fmt.Errorf("failed to compute active validator count for topic %s: %w", GossipSyncCommitteeMessage, err)
}
return defaultSyncSubnetTopicParams(activeValidators), nil
case strings.Contains(topic, GossipContributionAndProofMessage):
return defaultSyncContributionTopicParams(), nil
@@ -142,6 +151,8 @@ func (s *Service) topicScoreParams(topic string) (*pubsub.TopicScoreParams, erro
}
func (s *Service) retrieveActiveValidators() (uint64, error) {
s.activeValidatorCountLock.Lock()
defer s.activeValidatorCountLock.Unlock()
if s.activeValidatorCount != 0 {
return s.activeValidatorCount, nil
}

View File

@@ -3,6 +3,7 @@ package p2p
import (
"strings"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/prometheus/client_golang/prometheus"
@@ -26,12 +27,25 @@ var (
Help: "The number of peers in a given state.",
},
[]string{"state"})
p2pMaxPeers = promauto.NewGauge(prometheus.GaugeOpts{
Name: "p2p_max_peers",
Help: "The target maximum number of peers.",
})
p2pPeerCountDirectionType = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "p2p_peer_count_direction_type",
Help: "The number of peers in a given direction and type.",
},
[]string{"direction", "type"})
connectedPeersCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "connected_libp2p_peers",
Help: "Tracks the total number of connected libp2p peers by agent string",
},
[]string{"agent"},
)
minimumPeersPerSubnet = promauto.NewGauge(prometheus.GaugeOpts{
Name: "p2p_minimum_peers_per_subnet",
Help: "The minimum number of peers to connect to per subnet",
})
avgScoreConnectedClients = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "connected_libp2p_peers_average_scores",
Help: "Tracks the overall p2p scores of connected libp2p peers by agent string",
@@ -174,18 +188,26 @@ var (
)
func (s *Service) updateMetrics() {
store := s.Host().Peerstore()
connectedPeers := s.peers.Connected()
p2pPeerCount.WithLabelValues("Connected").Set(float64(len(connectedPeers)))
p2pPeerCount.WithLabelValues("Disconnected").Set(float64(len(s.peers.Disconnected())))
p2pPeerCount.WithLabelValues("Connecting").Set(float64(len(s.peers.Connecting())))
p2pPeerCount.WithLabelValues("Disconnecting").Set(float64(len(s.peers.Disconnecting())))
p2pPeerCount.WithLabelValues("Bad").Set(float64(len(s.peers.Bad())))
store := s.Host().Peerstore()
numConnectedPeersByClient := make(map[string]float64)
upperTCP := strings.ToUpper(string(peers.TCP))
upperQUIC := strings.ToUpper(string(peers.QUIC))
p2pPeerCountDirectionType.WithLabelValues("inbound", upperTCP).Set(float64(len(s.peers.InboundConnectedWithProtocol(peers.TCP))))
p2pPeerCountDirectionType.WithLabelValues("inbound", upperQUIC).Set(float64(len(s.peers.InboundConnectedWithProtocol(peers.QUIC))))
p2pPeerCountDirectionType.WithLabelValues("outbound", upperTCP).Set(float64(len(s.peers.OutboundConnectedWithProtocol(peers.TCP))))
p2pPeerCountDirectionType.WithLabelValues("outbound", upperQUIC).Set(float64(len(s.peers.OutboundConnectedWithProtocol(peers.QUIC))))
connectedPeersCountByClient := make(map[string]float64)
peerScoresByClient := make(map[string][]float64)
for i := 0; i < len(connectedPeers); i++ {
p := connectedPeers[i]
for _, p := range connectedPeers {
pid, err := peer.Decode(p.String())
if err != nil {
log.WithError(err).Debug("Could not decode peer string")
@@ -193,16 +215,18 @@ func (s *Service) updateMetrics() {
}
foundName := agentFromPid(pid, store)
numConnectedPeersByClient[foundName] += 1
connectedPeersCountByClient[foundName] += 1
// Get peer scoring data.
overallScore := s.peers.Scorers().Score(pid)
peerScoresByClient[foundName] = append(peerScoresByClient[foundName], overallScore)
}
connectedPeersCount.Reset() // Clear out previous results.
for agent, total := range numConnectedPeersByClient {
for agent, total := range connectedPeersCountByClient {
connectedPeersCount.WithLabelValues(agent).Set(total)
}
avgScoreConnectedClients.Reset() // Clear out previous results.
for agent, scoringData := range peerScoresByClient {
avgScore := average(scoringData)

View File

@@ -25,6 +25,7 @@ package peers
import (
"context"
"net"
"slices"
"sort"
"strings"
"time"
@@ -81,29 +82,31 @@ const (
type InternetProtocol string
const (
TCP = "tcp"
QUIC = "quic"
TCP = InternetProtocol("tcp")
QUIC = InternetProtocol("quic")
)
// Status is the structure holding the peer status information.
type Status struct {
ctx context.Context
scorers *scorers.Service
store *peerdata.Store
ipTracker map[string]uint64
rand *rand.Rand
ipColocationWhitelist []*net.IPNet
}
type (
// Status is the structure holding the peer status information.
Status struct {
ctx context.Context
scorers *scorers.Service
store *peerdata.Store
ipTracker map[string]uint64
rand *rand.Rand
ipColocationWhitelist []*net.IPNet
}
// StatusConfig represents peer status service params.
type StatusConfig struct {
// PeerLimit specifies maximum amount of concurrent peers that are expected to be connect to the node.
PeerLimit int
// ScorerParams holds peer scorer configuration params.
ScorerParams *scorers.Config
// IPColocationWhitelist contains CIDR ranges that are exempt from IP colocation limits.
IPColocationWhitelist []*net.IPNet
}
// StatusConfig represents peer status service params.
StatusConfig struct {
// PeerLimit specifies maximum amount of concurrent peers that are expected to be connect to the node.
PeerLimit int
// ScorerParams holds peer scorer configuration params.
ScorerParams *scorers.Config
// IPColocationWhitelist contains CIDR ranges that are exempt from IP colocation limits.
IPColocationWhitelist []*net.IPNet
}
)
// NewStatus creates a new status entity.
func NewStatus(ctx context.Context, config *StatusConfig) *Status {
@@ -304,11 +307,8 @@ func (p *Status) SubscribedToSubnet(index uint64) []peer.ID {
connectedStatus := peerData.ConnState == Connecting || peerData.ConnState == Connected
if connectedStatus && peerData.MetaData != nil && !peerData.MetaData.IsNil() && peerData.MetaData.AttnetsBitfield() != nil {
indices := indicesFromBitfield(peerData.MetaData.AttnetsBitfield())
for _, idx := range indices {
if idx == index {
peers = append(peers, pid)
break
}
if slices.Contains(indices, index) {
peers = append(peers, pid)
}
}
}

View File

@@ -345,17 +345,17 @@ func TopicFromMessage(msg string, epoch primitives.Epoch) (string, error) {
return "", errors.Errorf("%s: %s", invalidRPCMessageType, msg)
}
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
// Check if the message is to be updated in fulu.
if epoch >= beaconConfig.FuluForkEpoch {
if epoch >= cfg.FuluForkEpoch {
if version, ok := fuluMapping[msg]; ok {
return protocolPrefix + msg + version, nil
}
}
// Check if the message is to be updated in altair.
if epoch >= beaconConfig.AltairForkEpoch {
if epoch >= cfg.AltairForkEpoch {
if version, ok := altairMapping[msg]; ok {
return protocolPrefix + msg + version, nil
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v6/config/features"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
@@ -64,35 +65,36 @@ var (
// Service for managing peer to peer (p2p) networking.
type Service struct {
started bool
isPreGenesis bool
pingMethod func(ctx context.Context, id peer.ID) error
pingMethodLock sync.RWMutex
cancel context.CancelFunc
cfg *Config
peers *peers.Status
addrFilter *multiaddr.Filters
ipLimiter *leakybucket.Collector
privKey *ecdsa.PrivateKey
metaData metadata.Metadata
pubsub *pubsub.PubSub
joinedTopics map[string]*pubsub.Topic
joinedTopicsLock sync.RWMutex
subnetsLock map[uint64]*sync.RWMutex
subnetsLockLock sync.Mutex // Lock access to subnetsLock
initializationLock sync.Mutex
dv5Listener ListenerRebooter
startupErr error
ctx context.Context
host host.Host
genesisTime time.Time
genesisValidatorsRoot []byte
activeValidatorCount uint64
peerDisconnectionTime *cache.Cache
custodyInfo *custodyInfo
custodyInfoLock sync.RWMutex // Lock access to custodyInfo
custodyInfoSet chan struct{}
allForkDigests map[[4]byte]struct{}
started bool
isPreGenesis bool
pingMethod func(ctx context.Context, id peer.ID) error
pingMethodLock sync.RWMutex
cancel context.CancelFunc
cfg *Config
peers *peers.Status
addrFilter *multiaddr.Filters
ipLimiter *leakybucket.Collector
privKey *ecdsa.PrivateKey
metaData metadata.Metadata
pubsub *pubsub.PubSub
joinedTopics map[string]*pubsub.Topic
joinedTopicsLock sync.RWMutex
subnetsLock map[uint64]*sync.RWMutex
subnetsLockLock sync.Mutex // Lock access to subnetsLock
initializationLock sync.Mutex
dv5Listener ListenerRebooter
startupErr error
ctx context.Context
host host.Host
genesisTime time.Time
genesisValidatorsRoot []byte
activeValidatorCount uint64
activeValidatorCountLock sync.Mutex
peerDisconnectionTime *cache.Cache
custodyInfo *custodyInfo
custodyInfoLock sync.RWMutex // Lock access to custodyInfo
custodyInfoSet chan struct{}
allForkDigests map[[4]byte]struct{}
}
type custodyInfo struct {
@@ -106,12 +108,16 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
_ = cancel // govet fix for lost cancel. Cancel is handled in service.Stop().
cfg = validateConfig(cfg)
validateConfig(cfg)
privKey, err := privKey(cfg)
if err != nil {
return nil, errors.Wrapf(err, "failed to generate p2p private key")
}
p2pMaxPeers.Set(float64(cfg.MaxPeers))
minimumPeersPerSubnet.Set(float64(flags.Get().MinimumPeersPerSubnet))
metaData, err := metaDataFromDB(ctx, cfg.DB)
if err != nil {
log.WithError(err).Error("Failed to create peer metadata")

View File

@@ -514,18 +514,18 @@ func initializePersistentSubnets(id enode.ID, epoch primitives.Epoch) error {
//
// return [compute_subscribed_subnet(node_id, epoch, index) for index in range(SUBNETS_PER_NODE)]
func computeSubscribedSubnets(nodeID enode.ID, epoch primitives.Epoch) ([]uint64, error) {
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
if flags.Get().SubscribeToAllSubnets {
subnets := make([]uint64, 0, beaconConfig.AttestationSubnetCount)
for i := range beaconConfig.AttestationSubnetCount {
subnets := make([]uint64, 0, cfg.AttestationSubnetCount)
for i := range cfg.AttestationSubnetCount {
subnets = append(subnets, i)
}
return subnets, nil
}
subnets := make([]uint64, 0, beaconConfig.SubnetsPerNode)
for i := range beaconConfig.SubnetsPerNode {
subnets := make([]uint64, 0, cfg.SubnetsPerNode)
for i := range cfg.SubnetsPerNode {
sub, err := computeSubscribedSubnet(nodeID, epoch, i)
if err != nil {
return nil, errors.Wrap(err, "compute subscribed subnet")

View File

@@ -524,12 +524,12 @@ func TestSubnetComputation(t *testing.T) {
require.NoError(t, err)
localNode := enode.NewLocalNode(db, convertedKey)
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
t.Run("standard", func(t *testing.T) {
retrievedSubnets, err := computeSubscribedSubnets(localNode.ID(), 1000)
require.NoError(t, err)
require.Equal(t, beaconConfig.SubnetsPerNode, uint64(len(retrievedSubnets)))
require.Equal(t, cfg.SubnetsPerNode, uint64(len(retrievedSubnets)))
require.Equal(t, retrievedSubnets[0]+1, retrievedSubnets[1])
})
@@ -541,8 +541,8 @@ func TestSubnetComputation(t *testing.T) {
retrievedSubnets, err := computeSubscribedSubnets(localNode.ID(), 1000)
require.NoError(t, err)
require.Equal(t, beaconConfig.AttestationSubnetCount, uint64(len(retrievedSubnets)))
for i := range beaconConfig.AttestationSubnetCount {
require.Equal(t, cfg.AttestationSubnetCount, uint64(len(retrievedSubnets)))
for i := range cfg.AttestationSubnetCount {
require.Equal(t, i, retrievedSubnets[i])
}
})

View File

@@ -68,7 +68,10 @@ func privKey(cfg *Config) (*ecdsa.PrivateKey, error) {
}
if defaultKeysExist {
log.WithField("filePath", defaultKeyPath).Info("Reading static P2P private key from a file. To generate a new random private key at every start, please remove this file.")
if !params.FuluEnabled() {
log.WithField("filePath", defaultKeyPath).Info("Reading static P2P private key from a file. To generate a new random private key at every start, please remove this file.")
}
return privKeyFromFile(defaultKeyPath)
}

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"slices"
"strconv"
"strings"
@@ -388,12 +389,9 @@ func syncRewardsVals(
scIndices := make([]primitives.ValidatorIndex, 0, len(allScIndices))
scVals := make([]*precompute.Validator, 0, len(allScIndices))
for _, valIdx := range valIndices {
for _, scIdx := range allScIndices {
if valIdx == scIdx {
scVals = append(scVals, allVals[valIdx])
scIndices = append(scIndices, valIdx)
break
}
if slices.Contains(allScIndices, valIdx) {
scVals = append(scVals, allVals[valIdx])
scIndices = append(scIndices, valIdx)
}
}

View File

@@ -312,14 +312,14 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
rob, err := blocks.NewROBlockWithRoot(block, root)
if block.IsBlinded() {
block, blobSidecars, err = vs.handleBlindedBlock(ctx, block)
if errors.Is(err, builderapi.ErrBadGateway) {
log.WithError(err).Info("Optimistically proposed block - builder relay temporarily unavailable, block may arrive over P2P")
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
}
} else if block.Version() >= version.Deneb {
blobSidecars, dataColumnSidecars, err = vs.handleUnblindedBlock(rob, req)
}
if err != nil {
if errors.Is(err, builderapi.ErrBadGateway) && block.IsBlinded() {
log.WithError(err).Info("Optimistically proposed block - builder relay temporarily unavailable, block may arrive over P2P")
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
}
return nil, status.Errorf(codes.Internal, "%s: %v", "handle block failed", err)
}

View File

@@ -90,10 +90,10 @@ func (s *Service) updateCustodyInfoIfNeeded() error {
// custodyGroupCount computes the custody group count based on the custody requirement,
// the validators custody requirement, and whether the node is subscribed to all data subnets.
func (s *Service) custodyGroupCount(context.Context) (uint64, error) {
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
if flags.Get().SubscribeAllDataSubnets {
return beaconConfig.NumberOfCustodyGroups, nil
return cfg.NumberOfCustodyGroups, nil
}
validatorsCustodyRequirement, err := s.validatorsCustodyRequirement()
@@ -101,7 +101,7 @@ func (s *Service) custodyGroupCount(context.Context) (uint64, error) {
return 0, errors.Wrap(err, "validators custody requirement")
}
return max(beaconConfig.CustodyRequirement, validatorsCustodyRequirement), nil
return max(cfg.CustodyRequirement, validatorsCustodyRequirement), nil
}
// validatorsCustodyRequirements computes the custody requirements based on the

View File

@@ -116,11 +116,11 @@ func withSubscribeAllDataSubnets(t *testing.T, fn func()) {
func TestUpdateCustodyInfoIfNeeded(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.NumberOfCustodyGroups = 128
beaconConfig.CustodyRequirement = 4
beaconConfig.SamplesPerSlot = 8
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.NumberOfCustodyGroups = 128
cfg.CustodyRequirement = 4
cfg.SamplesPerSlot = 8
params.OverrideBeaconConfig(cfg)
t.Run("Skip update when actual custody count >= target", func(t *testing.T) {
setup := setupCustodyTest(t, false)
@@ -159,7 +159,7 @@ func TestUpdateCustodyInfoIfNeeded(t *testing.T) {
require.NoError(t, err)
const expectedSlot = primitives.Slot(100)
setup.assertCustodyInfo(t, expectedSlot, beaconConfig.NumberOfCustodyGroups)
setup.assertCustodyInfo(t, expectedSlot, cfg.NumberOfCustodyGroups)
})
})
}

View File

@@ -1366,16 +1366,16 @@ func TestFetchSidecars(t *testing.T) {
})
t.Run("Nominal", func(t *testing.T) {
beaconConfig := params.BeaconConfig()
numberOfColumns := beaconConfig.NumberOfColumns
samplesPerSlot := beaconConfig.SamplesPerSlot
cfg := params.BeaconConfig()
numberOfColumns := cfg.NumberOfColumns
samplesPerSlot := cfg.SamplesPerSlot
// Define "now" to be one epoch after genesis time + retention period.
genesisTime := time.Date(2025, time.August, 10, 0, 0, 0, 0, time.UTC)
secondsPerSlot := beaconConfig.SecondsPerSlot
slotsPerEpoch := beaconConfig.SlotsPerEpoch
secondsPerSlot := cfg.SecondsPerSlot
slotsPerEpoch := cfg.SlotsPerEpoch
secondsPerEpoch := uint64(slotsPerEpoch.Mul(secondsPerSlot))
retentionEpochs := beaconConfig.MinEpochsForDataColumnSidecarsRequest
retentionEpochs := cfg.MinEpochsForDataColumnSidecarsRequest
nowWrtGenesisSecs := retentionEpochs.Add(1).Mul(secondsPerEpoch)
now := genesisTime.Add(time.Duration(nowWrtGenesisSecs) * time.Second)

View File

@@ -530,12 +530,12 @@ func TestOriginOutsideRetention(t *testing.T) {
func TestFetchOriginSidecars(t *testing.T) {
ctx := t.Context()
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
genesisTime := time.Date(2025, time.August, 10, 0, 0, 0, 0, time.UTC)
secondsPerSlot := beaconConfig.SecondsPerSlot
slotsPerEpoch := beaconConfig.SlotsPerEpoch
secondsPerSlot := cfg.SecondsPerSlot
slotsPerEpoch := cfg.SlotsPerEpoch
secondsPerEpoch := uint64(slotsPerEpoch.Mul(secondsPerSlot))
retentionEpochs := beaconConfig.MinEpochsForDataColumnSidecarsRequest
retentionEpochs := cfg.MinEpochsForDataColumnSidecarsRequest
genesisValidatorRoot := [fieldparams.RootLength]byte{}

View File

@@ -192,6 +192,13 @@ var (
},
)
dataColumnsRecoveredFromELTotal = promauto.NewCounter(
prometheus.CounterOpts{
Name: "data_columns_recovered_from_el_total",
Help: "Count the number of times data columns have been recovered from the execution layer.",
},
)
// Data column sidecar validation, beacon metrics specs
dataColumnSidecarVerificationRequestsCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "beacon_data_column_sidecar_processing_requests_total",
@@ -279,6 +286,7 @@ func (s *Service) updateMetrics() {
topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.cfg.p2p.PubSub().ListPeers(formattedTopic))))
}
subscribedTopicPeerCount.Reset()
for _, topic := range s.cfg.p2p.PubSub().GetTopics() {
subscribedTopicPeerCount.WithLabelValues(topic).Set(float64(len(s.cfg.p2p.PubSub().ListPeers(topic))))
}

View File

@@ -5,6 +5,7 @@ import (
"context"
"encoding/hex"
"fmt"
"slices"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/blocks"
@@ -382,10 +383,8 @@ func (s *Service) savePending(root [32]byte, pending any, isEqual func(other any
// Skip if the attestation/aggregate from the same validator already exists in
// the pending queue.
for _, a := range s.blkRootToPendingAtts[root] {
if isEqual(a) {
return
}
if slices.ContainsFunc(s.blkRootToPendingAtts[root], isEqual) {
return
}
pendingAttCount.Inc()

View File

@@ -10,6 +10,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
@@ -58,6 +59,17 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface
return errors.Wrapf(err, "unexpected error computing min valid blob request slot, current_slot=%d", cs)
}
// Extract all needed roots.
roots := make([][fieldparams.RootLength]byte, 0, len(blobIdents))
for _, ident := range blobIdents {
root := bytesutil.ToBytes32(ident.BlockRoot)
roots = append(roots, root)
}
// Filter all available roots in block storage.
availableRoots := s.cfg.beaconDB.AvailableBlocks(ctx, roots)
// Serve each requested blob sidecar.
for i := range blobIdents {
if err := ctx.Err(); err != nil {
closeStream(stream, log)
@@ -69,7 +81,15 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface
<-ticker.C
}
s.rateLimiter.add(stream, 1)
root, idx := bytesutil.ToBytes32(blobIdents[i].BlockRoot), blobIdents[i].Index
// Do not serve a blob sidecar if the corresponding block is not available.
if !availableRoots[root] {
log.Trace("Peer requested blob sidecar by root but corresponding block not found in db")
continue
}
sc, err := s.cfg.blobStorage.Get(root, idx)
if err != nil {
log := log.WithFields(logrus.Fields{
@@ -113,19 +133,19 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface
}
func validateBlobByRootRequest(blobIdents types.BlobSidecarsByRootReq, slot primitives.Slot) error {
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
epoch := slots.ToEpoch(slot)
blobIdentCount := uint64(len(blobIdents))
if epoch >= beaconConfig.ElectraForkEpoch {
if blobIdentCount > beaconConfig.MaxRequestBlobSidecarsElectra {
if epoch >= cfg.ElectraForkEpoch {
if blobIdentCount > cfg.MaxRequestBlobSidecarsElectra {
return types.ErrMaxBlobReqExceeded
}
return nil
}
if blobIdentCount > beaconConfig.MaxRequestBlobSidecars {
if blobIdentCount > cfg.MaxRequestBlobSidecars {
return types.ErrMaxBlobReqExceeded
}

View File

@@ -38,8 +38,8 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i
defer cancel()
SetRPCStreamDeadlines(stream)
beaconConfig := params.BeaconConfig()
maxRequestDataColumnSidecars := beaconConfig.MaxRequestDataColumnSidecars
cfg := params.BeaconConfig()
maxRequestDataColumnSidecars := cfg.MaxRequestDataColumnSidecars
remotePeer := stream.Conn().RemotePeer()
log := log.WithFields(logrus.Fields{
@@ -102,7 +102,7 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i
// Once the quota is reached, we're done serving the request.
if maxRequestDataColumnSidecars == 0 {
log.WithField("initialQuota", beaconConfig.MaxRequestDataColumnSidecars).Trace("Reached quota for data column sidecars by range request")
log.WithField("initialQuota", cfg.MaxRequestDataColumnSidecars).Trace("Reached quota for data column sidecars by range request")
break
}
}

View File

@@ -31,9 +31,9 @@ import (
func TestDataColumnSidecarsByRangeRPCHandler(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.FuluForkEpoch = 0
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.FuluForkEpoch = 0
params.OverrideBeaconConfig(cfg)
params.BeaconConfig().InitializeForkSchedule()
ctx := context.Background()
t.Run("wrong message type", func(t *testing.T) {

View File

@@ -56,18 +56,6 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
return errors.Wrap(err, "validate data columns by root request")
}
requestedColumnsByRoot := make(map[[fieldparams.RootLength]byte][]uint64)
for _, columnIdent := range requestedColumnIdents {
var root [fieldparams.RootLength]byte
copy(root[:], columnIdent.BlockRoot)
requestedColumnsByRoot[root] = append(requestedColumnsByRoot[root], columnIdent.Columns...)
}
// Sort by column index for each root.
for _, columns := range requestedColumnsByRoot {
slices.Sort(columns)
}
// Compute the oldest slot we'll allow a peer to request, based on the current slot.
minReqSlot, err := dataColumnsRPCMinValidSlot(s.cfg.clock.CurrentSlot())
if err != nil {
@@ -84,6 +72,12 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
}
if log.Logger.Level >= logrus.TraceLevel {
requestedColumnsByRoot := make(map[[fieldparams.RootLength]byte][]uint64)
for _, ident := range requestedColumnIdents {
root := bytesutil.ToBytes32(ident.BlockRoot)
requestedColumnsByRoot[root] = append(requestedColumnsByRoot[root], ident.Columns...)
}
// We optimistially assume the peer requests the same set of columns for all roots,
// pre-sizing the map accordingly.
requestedRootsByColumnSet := make(map[string][]string, 1)
@@ -96,6 +90,17 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
log.WithField("requested", requestedRootsByColumnSet).Trace("Serving data column sidecars by root")
}
// Extract all requested roots.
roots := make([][fieldparams.RootLength]byte, 0, len(requestedColumnIdents))
for _, ident := range requestedColumnIdents {
root := bytesutil.ToBytes32(ident.BlockRoot)
roots = append(roots, root)
}
// Filter all available roots in block storage.
availableRoots := s.cfg.beaconDB.AvailableBlocks(ctx, roots)
// Serve each requested data column sidecar.
count := 0
for _, ident := range requestedColumnIdents {
if err := ctx.Err(); err != nil {
@@ -117,6 +122,12 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
s.rateLimiter.add(stream, int64(len(columns)))
// Do not serve a blob sidecar if the corresponding block is not available.
if !availableRoots[root] {
log.Trace("Peer requested blob sidecar by root but corresponding block not found in db")
continue
}
// Retrieve the requested sidecars from the store.
verifiedRODataColumns, err := s.cfg.dataColumnStorage.Get(root, columns)
if err != nil {
@@ -163,9 +174,9 @@ func dataColumnsRPCMinValidSlot(currentSlot primitives.Slot) (primitives.Slot, e
return primitives.Slot(math.MaxUint64), nil
}
beaconConfig := params.BeaconConfig()
minReqEpochs := beaconConfig.MinEpochsForDataColumnSidecarsRequest
minStartEpoch := beaconConfig.FuluForkEpoch
cfg := params.BeaconConfig()
minReqEpochs := cfg.MinEpochsForDataColumnSidecarsRequest
minStartEpoch := cfg.FuluForkEpoch
currEpoch := slots.ToEpoch(currentSlot)
if currEpoch > minReqEpochs && currEpoch-minReqEpochs > minStartEpoch {

View File

@@ -10,6 +10,7 @@ import (
chainMock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
@@ -19,6 +20,7 @@ import (
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/libp2p/go-libp2p/core/network"
@@ -28,9 +30,9 @@ import (
func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.FuluForkEpoch = 0
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.FuluForkEpoch = 0
params.OverrideBeaconConfig(cfg)
params.BeaconConfig().InitializeForkSchedule()
ctxMap, err := ContextByteVersionsForValRoot(params.BeaconConfig().GenesisValidatorsRoot)
require.NoError(t, err)
@@ -43,9 +45,9 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) {
t.Run("invalid request", func(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.MaxRequestDataColumnSidecars = 1
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.MaxRequestDataColumnSidecars = 1
params.OverrideBeaconConfig(cfg)
localP2P := p2ptest.NewTestP2P(t)
service := &Service{cfg: &config{p2p: localP2P}}
@@ -96,30 +98,54 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) {
}()
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.FuluForkEpoch = 1
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.FuluForkEpoch = 1
params.OverrideBeaconConfig(cfg)
localP2P := p2ptest.NewTestP2P(t)
clock := startup.NewClock(time.Now(), [fieldparams.RootLength]byte{})
params := []util.DataColumnParam{
{Slot: 10, Index: 1}, {Slot: 10, Index: 2}, {Slot: 10, Index: 3},
{Slot: 40, Index: 4}, {Slot: 40, Index: 6},
{Slot: 45, Index: 7}, {Slot: 45, Index: 8}, {Slot: 45, Index: 9},
_, verifiedRODataColumns := util.CreateTestVerifiedRoDataColumnSidecars(
t,
[]util.DataColumnParam{
{Slot: 10, Index: 1}, {Slot: 10, Index: 2}, {Slot: 10, Index: 3},
{Slot: 40, Index: 4}, {Slot: 40, Index: 6},
{Slot: 45, Index: 7}, {Slot: 45, Index: 8}, {Slot: 45, Index: 9},
{Slot: 46, Index: 10}, // Corresponding block won't be saved in DB
},
)
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
err := dataColumnStorage.Save(verifiedRODataColumns)
require.NoError(t, err)
beaconDB := testDB.SetupDB(t)
indices := [...]int{0, 3, 5}
roBlocks := make([]blocks.ROBlock, 0, len(indices))
for _, i := range indices {
blockPb := util.NewBeaconBlock()
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(blockPb)
require.NoError(t, err)
// Here the block root has to match the sidecar's block root.
// (However, the block root does not match the actual root of the block, but we don't care for this test.)
roBlock, err := blocks.NewROBlockWithRoot(signedBeaconBlock, verifiedRODataColumns[i].BlockRoot())
require.NoError(t, err)
roBlocks = append(roBlocks, roBlock)
}
_, verifiedRODataColumns := util.CreateTestVerifiedRoDataColumnSidecars(t, params)
storage := filesystem.NewEphemeralDataColumnStorage(t)
err := storage.Save(verifiedRODataColumns)
err = beaconDB.SaveROBlocks(ctx, roBlocks, false /*cache*/)
require.NoError(t, err)
service := &Service{
cfg: &config{
p2p: localP2P,
beaconDB: beaconDB,
clock: clock,
dataColumnStorage: storage,
dataColumnStorage: dataColumnStorage,
chain: &chainMock.ChainService{},
},
rateLimiter: newRateLimiter(localP2P),
@@ -134,6 +160,7 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) {
root0 := verifiedRODataColumns[0].BlockRoot()
root3 := verifiedRODataColumns[3].BlockRoot()
root5 := verifiedRODataColumns[5].BlockRoot()
root8 := verifiedRODataColumns[8].BlockRoot()
remoteP2P.BHost.SetStreamHandler(protocolID, func(stream network.Stream) {
defer wg.Done()
@@ -147,22 +174,22 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) {
break
}
require.NoError(t, err)
assert.NoError(t, err)
sidecars = append(sidecars, sidecar)
}
require.Equal(t, 5, len(sidecars))
require.Equal(t, root3, sidecars[0].BlockRoot())
require.Equal(t, root3, sidecars[1].BlockRoot())
require.Equal(t, root5, sidecars[2].BlockRoot())
require.Equal(t, root5, sidecars[3].BlockRoot())
require.Equal(t, root5, sidecars[4].BlockRoot())
assert.Equal(t, 5, len(sidecars))
assert.Equal(t, root3, sidecars[0].BlockRoot())
assert.Equal(t, root3, sidecars[1].BlockRoot())
assert.Equal(t, root5, sidecars[2].BlockRoot())
assert.Equal(t, root5, sidecars[3].BlockRoot())
assert.Equal(t, root5, sidecars[4].BlockRoot())
require.Equal(t, uint64(4), sidecars[0].Index)
require.Equal(t, uint64(6), sidecars[1].Index)
require.Equal(t, uint64(7), sidecars[2].Index)
require.Equal(t, uint64(8), sidecars[3].Index)
require.Equal(t, uint64(9), sidecars[4].Index)
assert.Equal(t, uint64(4), sidecars[0].Index)
assert.Equal(t, uint64(6), sidecars[1].Index)
assert.Equal(t, uint64(7), sidecars[2].Index)
assert.Equal(t, uint64(8), sidecars[3].Index)
assert.Equal(t, uint64(9), sidecars[4].Index)
})
localP2P.Connect(remoteP2P)
@@ -182,6 +209,10 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) {
BlockRoot: root5[:],
Columns: []uint64{7, 8, 9},
},
{
BlockRoot: root8[:],
Columns: []uint64{10},
},
}
err = service.dataColumnSidecarByRootRPCHandler(ctx, msg, stream)

View File

@@ -465,8 +465,8 @@ func SendDataColumnSidecarsByRangeRequest(
return nil, nil
}
beaconConfig := params.BeaconConfig()
numberOfColumns := beaconConfig.NumberOfColumns
cfg := params.BeaconConfig()
numberOfColumns := cfg.NumberOfColumns
maxRequestDataColumnSidecars := params.BeaconConfig().MaxRequestDataColumnSidecars
// Check if we do not request too many sidecars.

View File

@@ -889,9 +889,9 @@ func TestErrInvalidFetchedDataDistinction(t *testing.T) {
func TestSendDataColumnSidecarsByRangeRequest(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.FuluForkEpoch = 0
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.FuluForkEpoch = 0
params.OverrideBeaconConfig(cfg)
params.BeaconConfig().InitializeForkSchedule()
ctxMap, err := ContextByteVersionsForValRoot(params.BeaconConfig().GenesisValidatorsRoot)
require.NoError(t, err)
@@ -923,9 +923,9 @@ func TestSendDataColumnSidecarsByRangeRequest(t *testing.T) {
t.Run("too many columns in request", func(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.MaxRequestDataColumnSidecars = 0
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.MaxRequestDataColumnSidecars = 0
params.OverrideBeaconConfig(cfg)
request := &ethpb.DataColumnSidecarsByRangeRequest{Count: 1, Columns: []uint64{1, 2, 3}}
_, err := SendDataColumnSidecarsByRangeRequest(DataColumnSidecarsParams{Ctx: t.Context()}, "", request)
@@ -1193,9 +1193,9 @@ func TestIsSidecarIndexRequested(t *testing.T) {
func TestSendDataColumnSidecarsByRootRequest(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.FuluForkEpoch = 0
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.FuluForkEpoch = 0
params.OverrideBeaconConfig(cfg)
params.BeaconConfig().InitializeForkSchedule()
ctxMap, err := ContextByteVersionsForValRoot(params.BeaconConfig().GenesisValidatorsRoot)
require.NoError(t, err)
@@ -1223,9 +1223,9 @@ func TestSendDataColumnSidecarsByRootRequest(t *testing.T) {
t.Run("too many columns in request", func(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.MaxRequestDataColumnSidecars = 4
params.OverrideBeaconConfig(beaconConfig)
cfg := params.BeaconConfig()
cfg.MaxRequestDataColumnSidecars = 4
params.OverrideBeaconConfig(cfg)
request := p2ptypes.DataColumnsByRootIdentifiers{
{Columns: []uint64{1, 2, 3}},

View File

@@ -445,7 +445,7 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) {
custodyGroupCount = uint64(4)
)
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
ctx := t.Context()
testCases := []struct {
@@ -456,7 +456,7 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) {
}{
{
name: "before fulu",
fuluForkEpoch: beaconConfig.FarFutureEpoch,
fuluForkEpoch: cfg.FarFutureEpoch,
topic: "/eth2/beacon_chain/req/status/1/ssz_snappy",
streamHandler: func(service *Service, stream network.Stream, genesisState beaconState.BeaconState, beaconRoot, headRoot, finalizedRoot []byte) {
out := &ethpb.Status{}

View File

@@ -695,10 +695,10 @@ func (s *Service) dataColumnSubnetIndices(primitives.Slot) map[uint64]bool {
// the validators custody requirement, and whether the node is subscribed to all data subnets.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#custody-sampling
func (s *Service) samplingSize() (uint64, error) {
beaconConfig := params.BeaconConfig()
cfg := params.BeaconConfig()
if flags.Get().SubscribeAllDataSubnets {
return beaconConfig.DataColumnSidecarSubnetCount, nil
return cfg.DataColumnSidecarSubnetCount, nil
}
// Compute the validators custody requirement.
@@ -712,7 +712,7 @@ func (s *Service) samplingSize() (uint64, error) {
return 0, errors.Wrap(err, "custody group count")
}
return max(beaconConfig.SamplesPerSlot, validatorsCustodyRequirement, custodyGroupCount), nil
return max(cfg.SamplesPerSlot, validatorsCustodyRequirement, custodyGroupCount), nil
}
func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) map[uint64]bool {

View File

@@ -224,6 +224,8 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
}
if len(unseenIndices) > 0 {
dataColumnsRecoveredFromELTotal.Inc()
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", source.Root()),
"slot": source.Slot(),

View File

@@ -3,6 +3,7 @@ package sync
import (
"context"
"fmt"
"slices"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/blocks"
@@ -290,11 +291,8 @@ func (s *Service) validateIndexInCommittee(ctx context.Context, a ethpb.Att, val
}
var withinCommittee bool
for _, i := range committee {
if validatorIndex == i {
withinCommittee = true
break
}
if slices.Contains(committee, validatorIndex) {
withinCommittee = true
}
if !withinCommittee {
return pubsub.ValidationReject, fmt.Errorf("validator index %d is not within the committee: %v",

View File

@@ -5,6 +5,7 @@ import (
"encoding/binary"
"fmt"
"reflect"
"slices"
"strings"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain"
@@ -26,6 +27,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// Validation
@@ -294,7 +296,19 @@ func validateAttesterData(
if !ok {
return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", &eth.SingleAttestation{}, a)
}
return validateAttestingIndex(ctx, singleAtt.AttesterIndex, committee)
status, err := validateAttestingIndex(ctx, singleAtt.AttesterIndex, committee)
if status == pubsub.ValidationReject {
log.WithFields(logrus.Fields{
"targetRoot": fmt.Sprintf("%#x", a.GetData().Target.Root),
"targetEpoch": a.GetData().Target.Epoch,
"sourceRoot": fmt.Sprintf("%#x", a.GetData().Source.Root),
"sourceEpoch": a.GetData().Source.Epoch,
"blockRoot": fmt.Sprintf("%#x", a.GetData().BeaconBlockRoot),
"slot": fmt.Sprintf("%#x", a.GetData().Slot),
}).Info("Bad attestation")
}
return status, err
}
// Verify number of aggregation bits matches the committee size.
@@ -336,17 +350,10 @@ func validateAttestingIndex(
// _[REJECT]_ The attester is a member of the committee -- i.e.
// `attestation.attester_index in get_beacon_committee(state, attestation.data.slot, index)`.
inCommittee := false
for _, ix := range committee {
if attestingIndex == ix {
inCommittee = true
break
}
}
inCommittee := slices.Contains(committee, attestingIndex)
if !inCommittee {
return pubsub.ValidationReject, errors.New("attester is not a member of the committee")
}
return pubsub.ValidationAccept, nil
}

View File

@@ -1,3 +0,0 @@
### Fixed
- Fix recoverStateSummary to persist state summaries in stateSummaryBucket instead of stateBucket (#15896).

View File

@@ -0,0 +1,3 @@
### Ignored
- Add log prefix to the light-client package.

View File

@@ -0,0 +1,3 @@
### Added
- Added GeneralizedIndicesFromPath function to calculate the GIs for a given sszInfo object and a PathElement

View File

@@ -0,0 +1,3 @@
## Changed
- Introduced Path type for SSZ-QL queries and updated PathElement (removed Length field, kept Index) enforcing that len queries are terminal (at most one per path).
- Changed length query syntax from `block.payload.len(transactions)` to `len(block.payload.transactions)`

View File

@@ -0,0 +1,3 @@
### Removed
- log mentioning removed flag `--show-deposit-data`

View File

@@ -1,3 +0,0 @@
### Ignored
- Changelog entries for v6.1.3 through v6.1.2

View File

@@ -0,0 +1,3 @@
### Ignored
- Changelog entries for v6.1.4 through v6.1.3

View File

@@ -1,2 +0,0 @@
### Fixed
- Delete the genesis state file when --clear-db / --force-clear-db is specified.

View File

@@ -0,0 +1,2 @@
### Changed
- Use the `by-epoch' blob storage layout by default and log a warning to users who continue to use the flat layout, encouraging them to switch.

View File

@@ -0,0 +1,2 @@
### Ignored
- Fix bug with layout detection when readdirnames returns io.EOF.

View File

@@ -1,2 +0,0 @@
### Fixed
- Correctly advertise (in ENR and beacon API) attestation subnets when using `--subscribe-all-subnets`.

View File

@@ -0,0 +1,3 @@
### Fixed
- `blobSidecarByRootRPCHandler`: Do not serve a sidecar if the corresponding block is not available.
- `dataColumnSidecarByRootRPCHandler`: Do not serve a sidecar if the corresponding block is not available.

View File

@@ -0,0 +1,2 @@
### Ignored
- `BeaconBlockContainerToSignedBeaconBlock`: Add Fulu.

View File

@@ -0,0 +1,2 @@
### Fixed
- Remove `Reading static P2P private key from a file.` log if Fulu is enabled.

View File

@@ -0,0 +1,2 @@
### Changed
- Update `go-netroute` to `v0.4.0`

View File

@@ -0,0 +1,2 @@
### Changed
- Update go-netroute to `v0.3.0`

View File

@@ -0,0 +1,4 @@
### Added
- Metrics: Add count of peers per direction and type (inbound/outbound), (TCP/QUIC).
- `p2p_subscribed_topic_peer_total`: Reset to avoid dangling values.
- Add `p2p_minimum_peers_per_subnet` metric.

View File

@@ -1,2 +0,0 @@
### Fixed
- `updateCustodyInfoInDB`: Use `NumberOfCustodyGroups` instead of `NumberOfColumns`.

View File

@@ -1,2 +0,0 @@
### Fixed
- `randomPeer`: Return if the context is cancelled when waiting for peers.

View File

@@ -1,2 +0,0 @@
### Fixed
- Improve error message when the byte count read from disk when reading a data column sidecars is lower than expected. (Mostly, because the file is truncated.)

View File

@@ -1,2 +0,0 @@
### Fixed
- `VerifyDataColumnsSidecarKZGProofs`: Check if sizes match.

View File

@@ -0,0 +1,2 @@
### Fixed
- Fix incorrect version used when sending attestation version in Fulu

View File

@@ -1,2 +0,0 @@
### Fixed
- Fixed metadata extraction on Windows by correctly splitting file paths

View File

@@ -1,3 +0,0 @@
### Added
- Add native state diff type and marshalling functions

View File

@@ -0,0 +1,4 @@
### Fixed
- Changed the behavior of topic subscriptions such that only topics that require the active validator count will compute that value.
- Added a Mutex to the computation of active validator count during topic subscription to avoid a race condition where multiple goroutines are computing the same work.

View File

@@ -0,0 +1,3 @@
### Added
- Allow custom headers in validator client HTTP requests.

View File

@@ -0,0 +1,3 @@
### Ignored
- Use slices.Contains to simplify code

View File

@@ -1,3 +0,0 @@
### Added
- Update the earliest available slot after pruning operations in beacon chain database pruner. This ensures the P2P layer accurately knows which historical data is available after pruning, preventing nodes from advertising or attempting to serve data that has been pruned.

View File

@@ -1,3 +0,0 @@
### Added
- SSZ-QL: Add endpoints for `BeaconState`/`BeaconBlock`.

View File

@@ -0,0 +1,3 @@
### Added
- Metric to track data columns recovered from execution layer

View File

@@ -0,0 +1,3 @@
### Ignored
- Return optimistic response only when handling blinded blocks in proposer

View File

@@ -1,3 +0,0 @@
### Fixed
- Sync committee uses correct state to calculate position

View File

@@ -1,3 +0,0 @@
### Fixed
- Fix sync committee subscription to use subnet indices instead of committee indices

View File

@@ -0,0 +1,3 @@
### Changed
- Updated consensus spec tests to v1.6.0-beta.1 with new hashes and URL template

View File

@@ -0,0 +1,3 @@
### Ignored
- Use SlotTicker with offset instead of time.Ticker for attestation pool pruning to avoid conflicts with slot boundary operations

View File

@@ -12,6 +12,7 @@ go_library(
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
],
)
@@ -19,8 +20,10 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["options_test.go"],
data = glob(["testdata/**"]),
embed = [":go_default_library"],
deps = [
"//beacon-chain/db/filesystem:go_default_library",
"//cmd:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",

View File

@@ -1,7 +1,11 @@
package storage
import (
"fmt"
"io"
"os"
"path"
"slices"
"strings"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
@@ -10,6 +14,7 @@ import (
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)
@@ -25,9 +30,9 @@ var (
Aliases: []string{"extend-blob-retention-epoch"},
}
BlobStorageLayout = &cli.StringFlag{
Name: "blob-storage-layout",
Usage: layoutFlagUsage(),
Value: filesystem.LayoutNameFlat,
Name: "blob-storage-layout",
Usage: layoutFlagUsage(),
DefaultText: fmt.Sprintf("\"%s\", unless a different existing layout is detected", filesystem.LayoutNameByEpoch),
}
DataColumnStoragePathFlag = &cli.PathFlag{
Name: "data-column-path",
@@ -35,6 +40,14 @@ var (
}
)
// Flags is the list of CLI flags for configuring blob storage.
var Flags = []cli.Flag{
BlobStoragePathFlag,
BlobRetentionEpochFlag,
BlobStorageLayout,
DataColumnStoragePathFlag,
}
func layoutOptions() string {
return "available options are: " + strings.Join(filesystem.LayoutNames, ", ") + "."
}
@@ -44,10 +57,8 @@ func layoutFlagUsage() string {
}
func validateLayoutFlag(_ *cli.Context, v string) error {
for _, l := range filesystem.LayoutNames {
if v == l {
return nil
}
if slices.Contains(filesystem.LayoutNames, v) {
return nil
}
return errors.Errorf("invalid value '%s' for flag --%s, %s", v, BlobStorageLayout.Name, layoutOptions())
}
@@ -62,10 +73,20 @@ func BeaconNodeOptions(c *cli.Context) ([]node.Option, error) {
return nil, errors.Wrap(err, "blob retention epoch")
}
blobPath := blobStoragePath(c)
layout, err := detectLayout(blobPath, c)
if err != nil {
return nil, errors.Wrap(err, "detecting blob storage layout")
}
if layout == filesystem.LayoutNameFlat {
log.Warnf("Existing '%s' blob storage layout detected. Consider setting the flag --%s=%s for faster startup and more reliable pruning. Setting this flag will automatically migrate your existing blob storage to the newer layout on the next restart.",
filesystem.LayoutNameFlat, BlobStorageLayout.Name, filesystem.LayoutNameByEpoch)
}
blobStorageOptions := node.WithBlobStorageOptions(
filesystem.WithBlobRetentionEpochs(blobRetentionEpoch),
filesystem.WithBasePath(blobStoragePath(c)),
filesystem.WithLayout(c.String(BlobStorageLayout.Name)), // This is validated in the Action func for BlobStorageLayout.
filesystem.WithBasePath(blobPath),
filesystem.WithLayout(layout), // This is validated in the Action func for BlobStorageLayout.
)
dataColumnRetentionEpoch, err := dataColumnRetentionEpoch(c)
@@ -82,6 +103,57 @@ func BeaconNodeOptions(c *cli.Context) ([]node.Option, error) {
return opts, nil
}
// stringFlagGetter makes testing detectLayout easier
// because we don't need to mess with FlagSets and cli types.
type stringFlagGetter interface {
String(name string) string
}
// detectLayout determines which layout to use based on explicit user flags or by probing the
// blob directory to determine the previously used layout.
// - explicit: If the user has specified a layout flag, that layout is returned.
// - flat: If directories that look like flat layout's block root paths are present.
// - by-epoch: default if neither of the above is true.
func detectLayout(dir string, c stringFlagGetter) (string, error) {
explicit := c.String(BlobStorageLayout.Name)
if explicit != "" {
return explicit, nil
}
dir = path.Clean(dir)
// nosec: this path is provided by the node operator via flag
base, err := os.Open(dir) // #nosec G304
if err != nil {
// 'blobs' directory does not exist yet, so default to by-epoch.
return filesystem.LayoutNameByEpoch, nil
}
defer func() {
if err := base.Close(); err != nil {
log.WithError(err).Errorf("Could not close blob storage directory")
}
}()
// When we go looking for existing by-root directories, we only need to find one directory
// but one of those directories could be the `by-epoch` layout's top-level directory,
// and it seems possible that on some platforms we could get extra system directories that I don't
// know how to anticipate (looking at you, Windows), so I picked 16 as a small number with a generous
// amount of wiggle room to be confident that we'll likely see a by-root director if one exists.
entries, err := base.Readdirnames(16)
if err != nil {
// We can get this error if the directory exists and is empty
if errors.Is(err, io.EOF) {
return filesystem.LayoutNameByEpoch, nil
}
return "", errors.Wrap(err, "reading blob storage directory")
}
for _, entry := range entries {
if filesystem.IsBlockRootDir(entry) {
return filesystem.LayoutNameFlat, nil
}
}
return filesystem.LayoutNameByEpoch, nil
}
func blobStoragePath(c *cli.Context) string {
blobsPath := c.Path(BlobStoragePathFlag.Name)
if blobsPath == "" {

View File

@@ -3,8 +3,14 @@ package storage
import (
"flag"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"syscall"
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
"github.com/OffchainLabs/prysm/v6/cmd"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
@@ -109,3 +115,112 @@ func TestDataColumnStoragePath_FlagSpecified(t *testing.T) {
assert.Equal(t, "/blah/blah", storagePath)
}
type mockStringFlagGetter struct {
v string
}
func (m mockStringFlagGetter) String(name string) string {
return m.v
}
func TestDetectLayout(t *testing.T) {
fakeRoot := "0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
require.Equal(t, true, filesystem.IsBlockRootDir(fakeRoot))
withFlatRoot := func(t *testing.T, dir string) {
require.NoError(t, os.MkdirAll(path.Join(dir, fakeRoot), 0o755))
}
withByEpoch := func(t *testing.T, dir string) {
require.NoError(t, os.MkdirAll(path.Join(dir, filesystem.PeriodicEpochBaseDir), 0o755))
}
cases := []struct {
name string
expected string
expectedErr error
setup func(t *testing.T, dir string)
getter mockStringFlagGetter
}{
{
name: "no blobs dir",
expected: filesystem.LayoutNameByEpoch,
},
{
name: "blobs dir without root dirs",
expected: filesystem.LayoutNameByEpoch,
// empty subdirectory under blobs which doesn't match the block root pattern
setup: func(t *testing.T, dir string) {
require.NoError(t, os.MkdirAll(path.Join(dir, "some-dir"), 0o755))
},
},
{
name: "blobs dir with root dir",
setup: withFlatRoot,
expected: filesystem.LayoutNameFlat,
},
{
name: "blobs dir with root dir overridden by flag",
setup: withFlatRoot,
expected: filesystem.LayoutNameByEpoch,
getter: mockStringFlagGetter{v: filesystem.LayoutNameByEpoch},
},
{
name: "only has by-epoch dir",
setup: withByEpoch,
expected: filesystem.LayoutNameByEpoch,
},
{
name: "contains by-epoch dir and root dirs",
setup: func(t *testing.T, dir string) {
withFlatRoot(t, dir)
withByEpoch(t, dir)
},
expected: filesystem.LayoutNameFlat,
},
{
name: "unreadable dir",
// It isn't detectLayout's job to detect any errors reading the directory,
// so it ignores errors from the os.Open call. But we can also get errors
// from readdirnames, but this is hard to simulate in a test. So in the test
// write a file in place of the dir, which will succeed in the Open call, but
// fail when read as a directory. This is why the expected error is syscall.ENOTDIR
// (syscall error code from using readdirnames syscall on an ordinary file).
setup: func(t *testing.T, dir string) {
parent := filepath.Dir(dir)
require.NoError(t, os.MkdirAll(parent, 0o755))
require.NoError(t, os.WriteFile(dir, []byte{}, 0o755))
},
expectedErr: syscall.ENOTDIR,
},
{
name: "empty blobs dir",
setup: func(t *testing.T, dir string) {
require.NoError(t, os.MkdirAll(dir, 0o755))
},
expected: filesystem.LayoutNameByEpoch,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
dir := strings.Replace(t.Name(), " ", "_", -1)
dir = path.Join(os.TempDir(), dir)
if tc.setup != nil {
tc.setup(t, dir)
}
if tc.expectedErr != nil {
t.Log("hi")
}
layout, err := detectLayout(dir, tc.getter)
if tc.expectedErr != nil {
require.ErrorIs(t, err, tc.expectedErr)
return
}
require.NoError(t, err)
require.Equal(t, tc.expected, layout)
assert.Equal(t, tc.expectedErr, err)
assert.Equal(t, tc.expected, layout)
})
}
}

View File

@@ -4,6 +4,7 @@ package flags
import (
"fmt"
"slices"
"strings"
"github.com/urfave/cli/v2"
@@ -19,11 +20,9 @@ type EnumValue struct {
}
func (e *EnumValue) Set(value string) error {
for _, enum := range e.Enum {
if enum == value {
*e.Destination = value
return nil
}
if slices.Contains(e.Enum, value) {
*e.Destination = value
return nil
}
return fmt.Errorf("allowed values are %s", strings.Join(e.Enum, ", "))

View File

@@ -45,6 +45,13 @@ var (
Usage: "Beacon node REST API provider endpoint.",
Value: "http://127.0.0.1:3500",
}
// BeaconRESTApiHeaders defines a list of headers to send with all HTTP requests to the beacon node.
BeaconRESTApiHeaders = &cli.StringFlag{
Name: "beacon-rest-api-headers",
Usage: `Comma-separated list of key value pairs to pass as headers for all HTTP calls to the beacon node.
To provide multiple values for the same key, specify the same key for each value.
Example: --grpc-headers=key1=value1,key1=value2,key2=value3`,
}
// CertFlag defines a flag for the node's TLS certificate.
CertFlag = &cli.StringFlag{
Name: "tls-cert",

View File

@@ -51,6 +51,7 @@ func startNode(ctx *cli.Context) error {
var appFlags = []cli.Flag{
flags.BeaconRPCProviderFlag,
flags.BeaconRESTApiProviderFlag,
flags.BeaconRESTApiHeaders,
flags.CertFlag,
flags.GraffitiFlag,
flags.DisablePenaltyRewardLogFlag,

View File

@@ -93,6 +93,7 @@ var appHelpFlagGroups = []flagGroup{
Flags: []cli.Flag{
flags.CertFlag,
flags.BeaconRPCProviderFlag,
flags.BeaconRESTApiHeaders,
flags.EnableRPCFlag,
flags.RPCHost,
flags.RPCPort,

View File

@@ -640,6 +640,10 @@ func BuildSignedBeaconBlockFromExecutionPayload(blk interfaces.ReadOnlySignedBea
// This is particularly useful for using the values from API calls.
func BeaconBlockContainerToSignedBeaconBlock(obj *eth.BeaconBlockContainer) (interfaces.ReadOnlySignedBeaconBlock, error) {
switch obj.Block.(type) {
case *eth.BeaconBlockContainer_BlindedFuluBlock:
return NewSignedBeaconBlock(obj.GetBlindedFuluBlock())
case *eth.BeaconBlockContainer_FuluBlock:
return NewSignedBeaconBlock(obj.GetFuluBlock())
case *eth.BeaconBlockContainer_BlindedElectraBlock:
return NewSignedBeaconBlock(obj.GetBlindedElectraBlock())
case *eth.BeaconBlockContainer_ElectraBlock:

View File

@@ -268,20 +268,16 @@ func (s *Slice[V]) At(obj Identifiable, index uint64) (V, error) {
return s.sharedItems[index], nil
}
for _, v := range ind.Values {
for _, id := range v.ids {
if id == obj.Id() {
return v.val, nil
}
if slices.Contains(v.ids, obj.Id()) {
return v.val, nil
}
}
return s.sharedItems[index], nil
} else {
item := s.appendedItems[index-uint64(len(s.sharedItems))]
for _, v := range item.Values {
for _, id := range v.ids {
if id == obj.Id() {
return v.val, nil
}
if slices.Contains(v.ids, obj.Id()) {
return v.val, nil
}
}
var def V

Some files were not shown because too many files have changed in this diff Show More