Compare commits

..

15 Commits

Author SHA1 Message Date
terence
6c045083a6 gloas: add modified attestation processing (#15736)
This PR implements
[process_attestation](https://github.com/ethereum/consensus-specs/blob/master/specs/gloas/beacon-chain.md#modified-process_attestation)
alongside spec tests

---------

Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
2026-02-11 23:22:42 +00:00
james-prysm
09d0338aa9 adding db functions for saving gloas block and payload (#16301)
<!-- Thanks for sending a PR! Before submitting:

1. If this is your first PR, check out our contribution guide here
https://docs.prylabs.network/docs/contribute/contribution-guidelines
You will then need to sign our Contributor License Agreement (CLA),
which will show up as a comment from a bot in this pull request after
you open it. We cannot review code without a signed CLA.
2. Please file an associated tracking issue if this pull request is
non-trivial and requires context for our team to understand. All
features and most bug fixes should have
an associated issue with a design discussed and decided upon. Small bug
   fixes and documentation improvements don't need issues.
3. New features and bug fixes must have tests. Documentation may need to
be updated. If you're unsure what to update, send the PR, and we'll
discuss
   in review.
4. Note that PRs updating dependencies and new Go versions are not
accepted.
   Please file an issue instead.
5. A changelog entry is required for user facing issues.
-->

**What type of PR is this?**

Feature

**What does this PR do? Why is it needed?**

gloas doesn't have the concept of blinded block anymore so instead we
save the full gloas block. that being said the full block does not
contain the payload envelope so there are separate functions for saving
those. this pr introduces these types and functions, the payload
envelope doesn't actually get saved yet in this pr.

a TODO comment is added for pruning as well

references epbs branch

**Which issues(s) does this PR fix?**

Fixes #

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [ ] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-02-11 17:59:25 +00:00
james-prysm
eaf3aa3e8e simplify get and post block parsing for REST (#16307)
**What type of PR is this?**

 Feature


**What does this PR do? Why is it needed?**

PR is attempts to remove code duplication and process through a map of
configurations for get and post block apis. this should simplify
maintainability.

**Which issues(s) does this PR fix?**

Fixes #

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-02-10 18:28:55 +00:00
terence
9c49bb484c Add gossip for payload attestation (#16333)
This PR implements gossip validation and subscription for payload
attestation
2026-02-10 16:17:22 +00:00
terence
bb0f70ad60 gloas: add read only wrapper for payload envelope (#16339)
This PR adds read only wrapper for execution payload envelope
2026-02-09 17:23:12 +00:00
satushh
dc66f8872d Close libp2p host (#16313)
**What type of PR is this?**

 Other

**What does this PR do? Why is it needed?**

Close host to save resource

**Which issues(s) does this PR fix?**

Fixes #

**Other notes for review**

**Acknowledgements**

- [ ] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [ ] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [ ] I have added a description with sufficient context for reviewers
to understand this PR.
- [ ] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).

---------

Co-authored-by: Bastin <43618253+Inspector-Butters@users.noreply.github.com>
2026-02-09 14:40:07 +00:00
Preston Van Loon
db2bb5505c db: Copy byte slices that live outside of the view transaction (#16332)
**What type of PR is this?**

Bug fix

**What does this PR do? Why is it needed?**

The bbolt documentation suggests that the byte slices used during the
View transaction should not be used outside of the transaction and
mutating those slices could break things.

**Which issues(s) does this PR fix?**

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-02-07 14:10:07 +00:00
terence
14f01bbc6c gloas: move kzg commitments to bid (#16309)
This PR moves kzg commitments to bid. The rationale behind is captured
in this [issue](https://github.com/ethereum/consensus-specs/issues/4870)
* Moves blob KZG commitments to the earlier point where builder intent
is known
* Removes duplicated commitments from data column sidecars which saves
descent b/w per slot
* Enables nodes to start fetching blobs via getBlobs as soon as the bid
is received
* Slightly increases bid size and may add minor bidding channel latency
but the tradeoff favors lower network load and simpler DA handling
2026-02-06 23:07:54 +00:00
Potuz
c3e74e4a5d Remove unused method in forkchoice (#16337) 2026-02-06 19:21:58 +00:00
Potuz
e7ae6a004b Remove unused method in forkchoice (#16331) 2026-02-06 15:30:50 +00:00
Bastin
862fb2eb4a Fix gen-logs.sh - gitignore bug (#16328)
**What does this PR do? Why is it needed?**
`gen-logs.sh` was skipping `cmd/beacon-chain/execution/` due to a rule
in `.gitignore`.
Added a fix in `gen-logs.sh` to ignore `.gitignore` entries by
specification.
2026-02-05 14:06:42 +00:00
Potuz
bb80a9c832 Remove unused map in forkchoice (#16329)
nodeByPayload was not being used.
2026-02-05 13:25:06 +00:00
Bastin
c1b668a50a Fix logging issue (#16322)
**What does this PR do? Why is it needed?**
This PR, in an attempt to fix the logging issue described in #16314,
does the following:
- Adds a new field `Identifier` to the `WriterHook` struct, and filters
out log entries that have the key `log_target` and the value of the
hook's `Identifier`. For now the identifiers are `ephemeral` and `user`,
differentiating between the user facing terminal/log file, and the
debugger facing ephemeral log file.
- Stores the value of the `--verbosity` and `--log.vmodule` flags in
`io/logs`, so it can be accessed by packages that need to know the
verbosity they're logging with. (note that since #16272 each package can
have a different verbosity, so verbosity is now defined per package
instead of globally)
- Improves the calculation of the global logging level by ignoring the
`ephemeralLogFileVerbosity` when the `--disable-ephemeral-log-file` flag
is enabled.
- Uses these added logic to fix the problem in
`logStateTransitionData()` (described in #16314)

Note: since we're saving this new data in `io/logs`, we should refactor
`prefixFormatter` to read the data from here. but that creates a
circular import error. I will try to fix this and refactor the formatter
in a future PR.

---------

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
2026-02-05 10:24:50 +00:00
Justin Traglia
fab687d96d Improve ethspecify integration (#16304)
**What type of PR is this?**

Documentation

**What does this PR do? Why is it needed?**

* Move the ethspecify config from `/specrefs/.ethspecify` to
`/.ethspecify`.
* This allows developers to use inline specrefs (eg spec functions in
godoc comments).
* To do this, simply add a spec tag and run `ethspecify` to populate it.
* Clean up specref exceptions; organize by upgrade & put items in the
correct section.
* Update a few godoc comments to use the new inline specref feature.
* Update check-specrefs GitHub action so that it enforces up-to-date
godocs.
* Standardize specref naming; requiring a `#fork` tag for everything.
* Add new specrefs (which haven't been implemented yet) which were
missing.

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).

---------

Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
2026-02-04 18:44:01 +00:00
james-prysm
cf94ccbf72 node fallback cleanup (#16316)
**What type of PR is this?**

 Other

**What does this PR do? Why is it needed?**

Follow up to https://github.com/OffchainLabs/prysm/pull/16215 this pr
improves logging, fixes stuttering in package naming, adds additional
unit tests, and deduplicates fallback node code.

**Which issues(s) does this PR fix?**

fixes a potential race if reconnecting to the same host very quickly
which has a stale connection still.

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-02-04 15:59:42 +00:00
281 changed files with 10796 additions and 7398 deletions

View File

@@ -1,25 +1,39 @@
version: v1.7.0-alpha.1
version: v1.7.0-alpha.2
style: full
specrefs:
search_root: ..
search_root: .
auto_standardize_names: true
auto_add_missing_entries: true
require_exceptions_have_fork: true
files:
- configs.yml
- constants.yml
- containers.yml
- dataclasses.yml
- functions.yml
- presets.yml
- specrefs/configs.yml
- specrefs/constants.yml
- specrefs/containers.yml
- specrefs/dataclasses.yml
- specrefs/functions.yml
- specrefs/presets.yml
exceptions:
presets:
# Not implemented: gloas (future fork)
# gloas
- BUILDER_PENDING_WITHDRAWALS_LIMIT#gloas
- MAX_PAYLOAD_ATTESTATIONS#gloas
- PTC_SIZE#gloas
constants:
# Constants in the KZG library
# phase0
- BASIS_POINTS#phase0
- ENDIANNESS#phase0
- MAX_CONCURRENT_REQUESTS#phase0
- UINT64_MAX#phase0
- UINT64_MAX_SQRT#phase0
# altair
- PARTICIPATION_FLAG_WEIGHTS#altair
# bellatrix
- SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY#bellatrix
# deneb
- BLS_MODULUS#deneb
- BYTES_PER_COMMITMENT#deneb
- BYTES_PER_FIELD_ELEMENT#deneb
@@ -33,18 +47,9 @@ exceptions:
- PRIMITIVE_ROOT_OF_UNITY#deneb
- RANDOM_CHALLENGE_KZG_BATCH_DOMAIN#deneb
- RANDOM_CHALLENGE_KZG_CELL_BATCH_DOMAIN#fulu
# Not implemented
- BASIS_POINTS#phase0
- ENDIANNESS#phase0
- MAX_CONCURRENT_REQUESTS#phase0
- PARTICIPATION_FLAG_WEIGHTS#altair
- SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY#bellatrix
# fulu
- UINT256_MAX#fulu
- UINT64_MAX#phase0
- UINT64_MAX_SQRT#phase0
# Not implemented: gloas (future fork)
# gloas
- BUILDER_PAYMENT_THRESHOLD_DENOMINATOR#gloas
- BUILDER_PAYMENT_THRESHOLD_NUMERATOR#gloas
- BUILDER_WITHDRAWAL_PREFIX#gloas
@@ -61,61 +66,62 @@ exceptions:
- PTC_TIMELINESS_INDEX#gloas
configs:
# Not implemented: gloas (future fork)
# gloas
- AGGREGATE_DUE_BPS_GLOAS#gloas
- ATTESTATION_DUE_BPS_GLOAS#gloas
- CONTRIBUTION_DUE_BPS_GLOAS#gloas
- GLOAS_FORK_EPOCH#gloas
- GLOAS_FORK_VERSION#gloas
- MAX_REQUEST_PAYLOADS#gloas
- MIN_BUILDER_WITHDRAWABILITY_DELAY#gloas
- PAYLOAD_ATTESTATION_DUE_BPS#gloas
- SYNC_MESSAGE_DUE_BPS_GLOAS#gloas
- MIN_BUILDER_WITHDRAWABILITY_DELAY#gloas
ssz_objects:
# Not implemented
# phase0
- Eth1Block#phase0
- MatrixEntry#fulu
# Not implemented: capella
# capella
- LightClientBootstrap#capella
- LightClientFinalityUpdate#capella
- LightClientOptimisticUpdate#capella
- LightClientUpdate#capella
# Not implemented: gloas (future fork)
# fulu
- MatrixEntry#fulu
# gloas
- BeaconBlockBody#gloas
- BeaconState#gloas
- Builder#gloas
- BuilderPendingPayment#gloas
- BuilderPendingWithdrawal#gloas
- DataColumnSidecar#gloas
- ExecutionPayloadEnvelope#gloas
- ExecutionPayloadBid#gloas
- ExecutionPayloadEnvelope#gloas
- ForkChoiceNode#gloas
- IndexedPayloadAttestation#gloas
- PayloadAttestation#gloas
- PayloadAttestationData#gloas
- PayloadAttestationMessage#gloas
- SignedExecutionPayloadEnvelope#gloas
- SignedExecutionPayloadBid#gloas
- Builder#gloas
- ProposerPreferences#gloas
- SignedExecutionPayloadBid#gloas
- SignedExecutionPayloadEnvelope#gloas
- SignedProposerPreferences#gloas
dataclasses:
# Not implemented
- BlobParameters#fulu
- ExpectedWithdrawals#capella
- ExpectedWithdrawals#electra
# phase0
- LatestMessage#phase0
- LightClientStore#altair
- OptimisticStore#bellatrix
- Store#phase0
# Not implemented: capella
# altair
- LightClientStore#altair
# bellatrix
- OptimisticStore#bellatrix
# capella
- ExpectedWithdrawals#capella
- LightClientStore#capella
# Not implemented: gloas (future fork)
# electra
- ExpectedWithdrawals#electra
# fulu
- BlobParameters#fulu
# gloas
- ExpectedWithdrawals#gloas
- LatestMessage#gloas
- Store#gloas
@@ -140,7 +146,6 @@ exceptions:
- g1_lincomb#deneb
- hash_to_bls_field#deneb
- is_power_of_two#deneb
- multi_exp#deneb
- reverse_bits#deneb
- validate_kzg_g1#deneb
- verify_blob_kzg_proof#deneb
@@ -175,7 +180,12 @@ exceptions:
- verify_cell_kzg_proof_batch#fulu
- verify_cell_kzg_proof_batch_impl#fulu
# Not implemented: phase0
# phase0
- update_proposer_boost_root#phase0
- is_proposer_equivocation#phase0
- record_block_timeliness#phase0
- compute_proposer_score#phase0
- get_attestation_score#phase0
- calculate_committee_fraction#phase0
- compute_fork_version#phase0
- compute_pulled_up_tip#phase0
@@ -221,8 +231,7 @@ exceptions:
- validate_on_attestation#phase0
- validate_target_epoch_against_current_time#phase0
- xor#phase0
# Not implemented: altair
# altair
- compute_merkle_proof#altair
- compute_sync_committee_period_at_slot#altair
- get_contribution_and_proof#altair
@@ -244,27 +253,29 @@ exceptions:
- process_sync_committee_contributions#altair
- set_or_append_list#altair
- validate_light_client_update#altair
# Not implemented: bellatrix
# bellatrix
- get_execution_payload#bellatrix
- is_merge_transition_block#bellatrix
- is_optimistic_candidate_block#bellatrix
- latest_verified_ancestor#bellatrix
- prepare_execution_payload#bellatrix
# Not implemented: capella
# capella
- apply_withdrawals#capella
- get_balance_after_withdrawals#capella
- get_lc_execution_root#capella
- get_validators_sweep_withdrawals#capella
- is_valid_light_client_header#capella
- prepare_execution_payload#capella
- process_epoch#capella
- update_next_withdrawal_index#capella
- update_next_withdrawal_validator_index#capella
- upgrade_lc_bootstrap_to_capella#capella
- upgrade_lc_finality_update_to_capella#capella
- upgrade_lc_header_to_capella#capella
- upgrade_lc_optimistic_update_to_capella#capella
- upgrade_lc_store_to_capella#capella
- upgrade_lc_update_to_capella#capella
# Not implemented: deneb
# deneb
- get_lc_execution_root#deneb
- is_valid_light_client_header#deneb
- prepare_execution_payload#deneb
@@ -274,33 +285,34 @@ exceptions:
- upgrade_lc_optimistic_update_to_deneb#deneb
- upgrade_lc_store_to_deneb#deneb
- upgrade_lc_update_to_deneb#deneb
# Not implemented: electra
# electra
- compute_weak_subjectivity_period#electra
- current_sync_committee_gindex_at_slot#electra
- finalized_root_gindex_at_slot#electra
- get_eth1_vote#electra
- get_lc_execution_root#electra
- get_pending_partial_withdrawals#electra
- get_validators_sweep_withdrawals#electra
- is_compounding_withdrawal_credential#electra
- is_eligible_for_partial_withdrawals#electra
- is_within_weak_subjectivity_period#electra
- next_sync_committee_gindex_at_slot#electra
- normalize_merkle_branch#electra
- prepare_execution_payload#electra
- update_pending_partial_withdrawals#electra
- upgrade_lc_bootstrap_to_electra#electra
- upgrade_lc_finality_update_to_electra#electra
- upgrade_lc_header_to_electra#electra
- upgrade_lc_optimistic_update_to_electra#electra
- upgrade_lc_store_to_electra#electra
- upgrade_lc_update_to_electra#electra
# Not implemented: fulu
# fulu
- compute_matrix#fulu
- get_blob_parameters#fulu
- get_data_column_sidecars_from_block#fulu
- get_data_column_sidecars_from_column_sidecar#fulu
- recover_matrix#fulu
# Not implemented: gloas (future fork)
# gloas
- compute_balance_weighted_acceptance#gloas
- compute_balance_weighted_selection#gloas
- compute_fork_version#gloas
@@ -368,49 +380,42 @@ exceptions:
- verify_execution_payload_bid_signature#gloas
- add_builder_to_registry#gloas
- apply_deposit_for_builder#gloas
- apply_withdrawals#capella
- apply_withdrawals#gloas
- can_builder_cover_bid#gloas
- compute_proposer_score#phase0
- convert_builder_index_to_validator_index#gloas
- convert_validator_index_to_builder_index#gloas
- get_attestation_score#gloas
- get_attestation_score#phase0
- get_balance_after_withdrawals#capella
- get_builder_from_deposit#gloas
- get_builder_withdrawals#gloas
- get_builders_sweep_withdrawals#gloas
- get_index_for_new_builder#gloas
- get_pending_balance_to_withdraw_for_builder#gloas
- get_pending_partial_withdrawals#electra
- get_proposer_preferences_signature#gloas
- get_upcoming_proposal_slots#gloas
- get_validators_sweep_withdrawals#capella
- get_validators_sweep_withdrawals#electra
- initiate_builder_exit#gloas
- is_active_builder#gloas
- is_builder_index#gloas
- is_data_available#gloas
- is_eligible_for_partial_withdrawals#electra
- is_head_late#gloas
- is_head_weak#gloas
- is_parent_strong#gloas
- is_proposer_equivocation#phase0
- is_valid_proposal_slot#gloas
- onboard_builders_from_pending_deposits#gloas
- process_deposit_request#gloas
- process_voluntary_exit#gloas
- record_block_timeliness#gloas
- record_block_timeliness#phase0
- verify_data_column_sidecar_kzg_proofs#gloas
- should_apply_proposer_boost#gloas
- update_builder_pending_withdrawals#gloas
- update_next_withdrawal_builder_index#gloas
- update_next_withdrawal_index#capella
- update_next_withdrawal_validator_index#capella
- update_payload_expected_withdrawals#gloas
- update_pending_partial_withdrawals#electra
- update_proposer_boost_root#gloas
- update_proposer_boost_root#phase0
presets:
# gloas
- BUILDER_PENDING_WITHDRAWALS_LIMIT#gloas
- BUILDER_REGISTRY_LIMIT#gloas
- MAX_BUILDERS_PER_WITHDRAWALS_SWEEP#gloas

View File

@@ -12,11 +12,11 @@ jobs:
- name: Check version consistency
run: |
WORKSPACE_VERSION=$(grep 'consensus_spec_version = ' WORKSPACE | sed 's/.*"\(.*\)"/\1/')
ETHSPECIFY_VERSION=$(grep '^version:' specrefs/.ethspecify.yml | sed 's/version: //')
ETHSPECIFY_VERSION=$(grep '^version:' .ethspecify.yml | sed 's/version: //')
if [ "$WORKSPACE_VERSION" != "$ETHSPECIFY_VERSION" ]; then
echo "Version mismatch between WORKSPACE and ethspecify"
echo " WORKSPACE: $WORKSPACE_VERSION"
echo " specrefs/.ethspecify.yml: $ETHSPECIFY_VERSION"
echo " .ethspecify.yml: $ETHSPECIFY_VERSION"
exit 1
else
echo "Versions match: $WORKSPACE_VERSION"
@@ -26,7 +26,7 @@ jobs:
run: python3 -mpip install ethspecify
- name: Update spec references
run: ethspecify process --path=specrefs
run: ethspecify
- name: Check for differences
run: |
@@ -40,4 +40,4 @@ jobs:
fi
- name: Check spec references
run: ethspecify check --path=specrefs
run: ethspecify check

View File

@@ -273,16 +273,16 @@ filegroup(
url = "https://github.com/ethereum/EIPs/archive/5480440fe51742ed23342b68cf106cefd427e39d.tar.gz",
)
consensus_spec_version = "v1.7.0-alpha.1"
consensus_spec_version = "v1.7.0-alpha.2"
load("@prysm//tools:download_spectests.bzl", "consensus_spec_tests")
consensus_spec_tests(
name = "consensus_spec_tests",
flavors = {
"general": "sha256-j5R3jA7Oo4OSDMTvpMuD+8RomaCByeFSwtfkq6fL0Zg=",
"minimal": "sha256-tdTqByoyswOS4r6OxFmo70y2BP7w1TgEok+gf4cbxB0=",
"mainnet": "sha256-5gB4dt6SnSDKzdBc06VedId3NkgvSYyv9n9FRxWKwYI=",
"general": "sha256-iGQsGZ1cHah+2CSod9jC3kN8Ku4n6KO0hIwfINrn/po=",
"minimal": "sha256-TgcYt8N8sXSttdHTGvOa+exUZ1zn1UzlAMz0V7i37xc=",
"mainnet": "sha256-LnXyiLoJtrvEvbqLDSAAqpLMdN/lXv92SAgYG8fNjCs=",
},
version = consensus_spec_version,
)
@@ -298,7 +298,7 @@ filegroup(
visibility = ["//visibility:public"],
)
""",
integrity = "sha256-J+43DrK1pF658kTXTwMS6zGf4KDjvas++m8w2a8swpg=",
integrity = "sha256-Y/67Dg393PksZj5rTFNLntiJ6hNdB7Rxbu5gZE2gebY=",
strip_prefix = "consensus-specs-" + consensus_spec_version[1:],
url = "https://github.com/ethereum/consensus-specs/archive/refs/tags/%s.tar.gz" % consensus_spec_version,
)

19
api/fallback/BUILD.bazel Normal file
View File

@@ -0,0 +1,19 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"fallback.go",
"log.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/api/fallback",
visibility = ["//visibility:public"],
deps = ["@com_github_sirupsen_logrus//:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = ["fallback_test.go"],
embed = [":go_default_library"],
deps = ["//testing/assert:go_default_library"],
)

66
api/fallback/fallback.go Normal file
View File

@@ -0,0 +1,66 @@
package fallback
import (
"context"
"github.com/sirupsen/logrus"
)
// HostProvider is the subset of connection-provider methods that EnsureReady
// needs. Both grpc.GrpcConnectionProvider and rest.RestConnectionProvider
// satisfy this interface.
type HostProvider interface {
Hosts() []string
CurrentHost() string
SwitchHost(index int) error
}
// ReadyChecker can report whether the current endpoint is ready.
// iface.NodeClient satisfies this implicitly.
type ReadyChecker interface {
IsReady(ctx context.Context) bool
}
// EnsureReady iterates through the configured hosts and returns true as soon as
// one responds as ready. It starts from the provider's current host and wraps
// around using modular arithmetic, performing failover when a host is not ready.
func EnsureReady(ctx context.Context, provider HostProvider, checker ReadyChecker) bool {
hosts := provider.Hosts()
numHosts := len(hosts)
startingHost := provider.CurrentHost()
var attemptedHosts []string
// Find current index
currentIdx := 0
for i, h := range hosts {
if h == startingHost {
currentIdx = i
break
}
}
for i := range numHosts {
if checker.IsReady(ctx) {
if len(attemptedHosts) > 0 {
log.WithFields(logrus.Fields{
"previous": startingHost,
"current": provider.CurrentHost(),
"tried": attemptedHosts,
}).Info("Switched to responsive beacon node")
}
return true
}
attemptedHosts = append(attemptedHosts, provider.CurrentHost())
// Try next host if not the last iteration
if i < numHosts-1 {
nextIdx := (currentIdx + i + 1) % numHosts
if err := provider.SwitchHost(nextIdx); err != nil {
log.WithError(err).Error("Failed to switch host")
}
}
}
log.WithField("tried", attemptedHosts).Warn("No responsive beacon node found")
return false
}

View File

@@ -0,0 +1,94 @@
package fallback
import (
"context"
"testing"
"github.com/OffchainLabs/prysm/v7/testing/assert"
)
// mockHostProvider is a minimal HostProvider for unit tests.
type mockHostProvider struct {
hosts []string
hostIndex int
}
func (m *mockHostProvider) Hosts() []string { return m.hosts }
func (m *mockHostProvider) CurrentHost() string {
return m.hosts[m.hostIndex%len(m.hosts)]
}
func (m *mockHostProvider) SwitchHost(index int) error { m.hostIndex = index; return nil }
// mockReadyChecker records per-call IsReady results in sequence.
type mockReadyChecker struct {
results []bool
idx int
}
func (m *mockReadyChecker) IsReady(_ context.Context) bool {
if m.idx >= len(m.results) {
return false
}
r := m.results[m.idx]
m.idx++
return r
}
func TestEnsureReady_SingleHostReady(t *testing.T) {
provider := &mockHostProvider{hosts: []string{"http://host1:3500"}, hostIndex: 0}
checker := &mockReadyChecker{results: []bool{true}}
assert.Equal(t, true, EnsureReady(t.Context(), provider, checker))
assert.Equal(t, 0, provider.hostIndex)
}
func TestEnsureReady_SingleHostNotReady(t *testing.T) {
provider := &mockHostProvider{hosts: []string{"http://host1:3500"}, hostIndex: 0}
checker := &mockReadyChecker{results: []bool{false}}
assert.Equal(t, false, EnsureReady(t.Context(), provider, checker))
}
func TestEnsureReady_SingleHostError(t *testing.T) {
provider := &mockHostProvider{hosts: []string{"http://host1:3500"}, hostIndex: 0}
checker := &mockReadyChecker{results: []bool{false}}
assert.Equal(t, false, EnsureReady(t.Context(), provider, checker))
}
func TestEnsureReady_MultipleHostsFirstReady(t *testing.T) {
provider := &mockHostProvider{
hosts: []string{"http://host1:3500", "http://host2:3500"},
hostIndex: 0,
}
checker := &mockReadyChecker{results: []bool{true}}
assert.Equal(t, true, EnsureReady(t.Context(), provider, checker))
assert.Equal(t, 0, provider.hostIndex)
}
func TestEnsureReady_MultipleHostsFailoverToSecond(t *testing.T) {
provider := &mockHostProvider{
hosts: []string{"http://host1:3500", "http://host2:3500"},
hostIndex: 0,
}
checker := &mockReadyChecker{results: []bool{false, true}}
assert.Equal(t, true, EnsureReady(t.Context(), provider, checker))
assert.Equal(t, 1, provider.hostIndex)
}
func TestEnsureReady_MultipleHostsNoneReady(t *testing.T) {
provider := &mockHostProvider{
hosts: []string{"http://host1:3500", "http://host2:3500", "http://host3:3500"},
hostIndex: 0,
}
checker := &mockReadyChecker{results: []bool{false, false, false}}
assert.Equal(t, false, EnsureReady(t.Context(), provider, checker))
}
func TestEnsureReady_WrapAroundFromNonZeroIndex(t *testing.T) {
provider := &mockHostProvider{
hosts: []string{"http://host0:3500", "http://host1:3500", "http://host2:3500"},
hostIndex: 1,
}
// host1 (start) fails, host2 fails, host0 succeeds
checker := &mockReadyChecker{results: []bool{false, false, true}}
assert.Equal(t, true, EnsureReady(t.Context(), provider, checker))
assert.Equal(t, 0, provider.hostIndex)
}

View File

@@ -1,9 +1,9 @@
// Code generated by hack/gen-logs.sh; DO NOT EDIT.
// This file is created and regenerated automatically. Anything added here might get removed.
package blocks
package fallback
import "github.com/sirupsen/logrus"
// The prefix for logs from this package will be the text after the last slash in the package path.
// If you wish to change this, you should add your desired name in the runtime/logging/logrus-prefixed-formatter/prefix-replacement.go file.
var log = logrus.WithField("package", "consensus-types/blocks")
var log = logrus.WithField("package", "api/fallback")

View File

@@ -25,6 +25,11 @@ type GrpcConnectionProvider interface {
// SwitchHost switches to the endpoint at the given index.
// The new connection is created lazily on next CurrentConn() call.
SwitchHost(index int) error
// ConnectionCounter returns a monotonically increasing counter that increments
// each time SwitchHost changes the active endpoint. This allows consumers to
// detect connection changes even when the host string returns to a previous value
// (e.g., host0 → host1 → host0).
ConnectionCounter() uint64
// Close closes the current connection.
Close()
}
@@ -38,6 +43,7 @@ type grpcConnectionProvider struct {
// Current connection state (protected by mutex)
currentIndex uint64
conn *grpc.ClientConn
connCounter uint64
mu sync.Mutex
closed bool
@@ -138,6 +144,7 @@ func (p *grpcConnectionProvider) SwitchHost(index int) error {
p.conn = nil // Clear immediately - new connection created lazily
p.currentIndex = uint64(index)
p.connCounter++
// Close old connection asynchronously to avoid blocking the caller
if oldConn != nil {
@@ -155,6 +162,12 @@ func (p *grpcConnectionProvider) SwitchHost(index int) error {
return nil
}
func (p *grpcConnectionProvider) ConnectionCounter() uint64 {
p.mu.Lock()
defer p.mu.Unlock()
return p.connCounter
}
func (p *grpcConnectionProvider) Close() {
p.mu.Lock()
defer p.mu.Unlock()

View File

@@ -4,17 +4,24 @@ import "google.golang.org/grpc"
// MockGrpcProvider implements GrpcConnectionProvider for testing.
type MockGrpcProvider struct {
MockConn *grpc.ClientConn
MockHosts []string
MockConn *grpc.ClientConn
MockHosts []string
CurrentIndex int
ConnCounter uint64
}
func (m *MockGrpcProvider) CurrentConn() *grpc.ClientConn { return m.MockConn }
func (m *MockGrpcProvider) CurrentHost() string {
if len(m.MockHosts) > 0 {
return m.MockHosts[0]
return m.MockHosts[m.CurrentIndex]
}
return ""
}
func (m *MockGrpcProvider) Hosts() []string { return m.MockHosts }
func (m *MockGrpcProvider) SwitchHost(int) error { return nil }
func (m *MockGrpcProvider) Close() {}
func (m *MockGrpcProvider) Hosts() []string { return m.MockHosts }
func (m *MockGrpcProvider) SwitchHost(idx int) error {
m.CurrentIndex = idx
m.ConnCounter++
return nil
}
func (m *MockGrpcProvider) ConnectionCounter() uint64 { return m.ConnCounter }
func (m *MockGrpcProvider) Close() {}

View File

@@ -9,13 +9,13 @@ import (
// MockRestProvider implements RestConnectionProvider for testing.
type MockRestProvider struct {
MockClient *http.Client
MockHandler RestHandler
MockHandler Handler
MockHosts []string
HostIndex int
}
func (m *MockRestProvider) HttpClient() *http.Client { return m.MockClient }
func (m *MockRestProvider) RestHandler() RestHandler { return m.MockHandler }
func (m *MockRestProvider) Handler() Handler { return m.MockHandler }
func (m *MockRestProvider) CurrentHost() string {
if len(m.MockHosts) > 0 {
return m.MockHosts[m.HostIndex%len(m.MockHosts)]
@@ -25,25 +25,22 @@ func (m *MockRestProvider) CurrentHost() string {
func (m *MockRestProvider) Hosts() []string { return m.MockHosts }
func (m *MockRestProvider) SwitchHost(index int) error { m.HostIndex = index; return nil }
// MockRestHandler implements RestHandler for testing.
type MockRestHandler struct {
MockHost string
MockClient *http.Client
// MockHandler implements Handler for testing.
type MockHandler struct {
MockHost string
}
func (m *MockRestHandler) Get(_ context.Context, _ string, _ any) error { return nil }
func (m *MockRestHandler) GetStatusCode(_ context.Context, _ string) (int, error) {
func (m *MockHandler) Get(_ context.Context, _ string, _ any) error { return nil }
func (m *MockHandler) GetStatusCode(_ context.Context, _ string) (int, error) {
return http.StatusOK, nil
}
func (m *MockRestHandler) GetSSZ(_ context.Context, _ string) ([]byte, http.Header, error) {
func (m *MockHandler) GetSSZ(_ context.Context, _ string) ([]byte, http.Header, error) {
return nil, nil, nil
}
func (m *MockRestHandler) Post(_ context.Context, _ string, _ map[string]string, _ *bytes.Buffer, _ any) error {
func (m *MockHandler) Post(_ context.Context, _ string, _ map[string]string, _ *bytes.Buffer, _ any) error {
return nil
}
func (m *MockRestHandler) PostSSZ(_ context.Context, _ string, _ map[string]string, _ *bytes.Buffer) ([]byte, http.Header, error) {
func (m *MockHandler) PostSSZ(_ context.Context, _ string, _ map[string]string, _ *bytes.Buffer) ([]byte, http.Header, error) {
return nil, nil, nil
}
func (m *MockRestHandler) HttpClient() *http.Client { return m.MockClient }
func (m *MockRestHandler) Host() string { return m.MockHost }
func (m *MockRestHandler) SwitchHost(host string) { m.MockHost = host }
func (m *MockHandler) Host() string { return m.MockHost }

View File

@@ -17,8 +17,8 @@ import (
type RestConnectionProvider interface {
// HttpClient returns the configured HTTP client with headers, timeout, and optional tracing.
HttpClient() *http.Client
// RestHandler returns the REST handler for making API requests.
RestHandler() RestHandler
// Handler returns the REST handler for making API requests.
Handler() Handler
// CurrentHost returns the current REST API endpoint URL.
CurrentHost() string
// Hosts returns all configured REST API endpoint URLs.
@@ -54,7 +54,7 @@ func WithTracing() RestConnectionProviderOption {
type restConnectionProvider struct {
endpoints []string
httpClient *http.Client
restHandler RestHandler
restHandler *handler
currentIndex atomic.Uint64
timeout time.Duration
headers map[string][]string
@@ -96,7 +96,7 @@ func NewRestConnectionProvider(endpoint string, opts ...RestConnectionProviderOp
}
// Create the REST handler with the HTTP client and initial host
p.restHandler = newRestHandler(*p.httpClient, endpoints[0])
p.restHandler = newHandler(*p.httpClient, endpoints[0])
log.WithFields(logrus.Fields{
"endpoints": endpoints,
@@ -124,7 +124,7 @@ func (p *restConnectionProvider) HttpClient() *http.Client {
return p.httpClient
}
func (p *restConnectionProvider) RestHandler() RestHandler {
func (p *restConnectionProvider) Handler() Handler {
return p.restHandler
}

View File

@@ -21,32 +21,35 @@ import (
type reqOption func(*http.Request)
// RestHandler defines the interface for making REST API requests.
type RestHandler interface {
// Handler defines the interface for making REST API requests.
type Handler interface {
Get(ctx context.Context, endpoint string, resp any) error
GetStatusCode(ctx context.Context, endpoint string) (int, error)
GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error)
Post(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer, resp any) error
PostSSZ(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer) ([]byte, http.Header, error)
HttpClient() *http.Client
Host() string
SwitchHost(host string)
}
type restHandler struct {
type handler struct {
client http.Client
host string
reqOverrides []reqOption
}
// newRestHandler returns a RestHandler (internal use)
func newRestHandler(client http.Client, host string) RestHandler {
return NewRestHandler(client, host)
// newHandler returns a *handler for internal use within the rest package.
func newHandler(client http.Client, host string) *handler {
rh := &handler{
client: client,
host: host,
}
rh.appendAcceptOverride()
return rh
}
// NewRestHandler returns a RestHandler
func NewRestHandler(client http.Client, host string) RestHandler {
rh := &restHandler{
// NewHandler returns a Handler
func NewHandler(client http.Client, host string) Handler {
rh := &handler{
client: client,
host: host,
}
@@ -57,7 +60,7 @@ func NewRestHandler(client http.Client, host string) RestHandler {
// appendAcceptOverride enables the Accept header to be customized at runtime via an environment variable.
// This is specified as an env var because it is a niche option that prysm may use for performance testing or debugging
// bug which users are unlikely to need. Using an env var keeps the set of user-facing flags cleaner.
func (c *restHandler) appendAcceptOverride() {
func (c *handler) appendAcceptOverride() {
if accept := os.Getenv(params.EnvNameOverrideAccept); accept != "" {
c.reqOverrides = append(c.reqOverrides, func(req *http.Request) {
req.Header.Set("Accept", accept)
@@ -66,18 +69,18 @@ func (c *restHandler) appendAcceptOverride() {
}
// HttpClient returns the underlying HTTP client of the handler
func (c *restHandler) HttpClient() *http.Client {
func (c *handler) HttpClient() *http.Client {
return &c.client
}
// Host returns the underlying HTTP host
func (c *restHandler) Host() string {
func (c *handler) Host() string {
return c.host
}
// Get sends a GET request and decodes the response body as a JSON object into the passed in object.
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
func (c *restHandler) Get(ctx context.Context, endpoint string, resp any) error {
func (c *handler) Get(ctx context.Context, endpoint string, resp any) error {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@@ -100,7 +103,7 @@ func (c *restHandler) Get(ctx context.Context, endpoint string, resp any) error
// GetStatusCode sends a GET request and returns only the HTTP status code.
// This is useful for endpoints like /eth/v1/node/health that communicate status via HTTP codes
// (200 = ready, 206 = syncing, 503 = unavailable) rather than response bodies.
func (c *restHandler) GetStatusCode(ctx context.Context, endpoint string) (int, error) {
func (c *handler) GetStatusCode(ctx context.Context, endpoint string) (int, error) {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@@ -119,7 +122,7 @@ func (c *restHandler) GetStatusCode(ctx context.Context, endpoint string) (int,
return httpResp.StatusCode, nil
}
func (c *restHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
func (c *handler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@@ -174,7 +177,7 @@ func (c *restHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http
// Post sends a POST request and decodes the response body as a JSON object into the passed in object.
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
func (c *restHandler) Post(
func (c *handler) Post(
ctx context.Context,
apiEndpoint string,
headers map[string]string,
@@ -210,7 +213,7 @@ func (c *restHandler) Post(
}
// PostSSZ sends a POST request and prefers an SSZ (application/octet-stream) response body.
func (c *restHandler) PostSSZ(
func (c *handler) PostSSZ(
ctx context.Context,
apiEndpoint string,
headers map[string]string,
@@ -311,6 +314,6 @@ func decodeResp(httpResp *http.Response, resp any) error {
return nil
}
func (c *restHandler) SwitchHost(host string) {
func (c *handler) SwitchHost(host string) {
c.host = host
}

View File

@@ -509,17 +509,17 @@ func (s *SignedBlindedBeaconBlockFulu) SigString() string {
// ----------------------------------------------------------------------------
type ExecutionPayloadBid struct {
ParentBlockHash string `json:"parent_block_hash"`
ParentBlockRoot string `json:"parent_block_root"`
BlockHash string `json:"block_hash"`
PrevRandao string `json:"prev_randao"`
FeeRecipient string `json:"fee_recipient"`
GasLimit string `json:"gas_limit"`
BuilderIndex string `json:"builder_index"`
Slot string `json:"slot"`
Value string `json:"value"`
ExecutionPayment string `json:"execution_payment"`
BlobKzgCommitmentsRoot string `json:"blob_kzg_commitments_root"`
ParentBlockHash string `json:"parent_block_hash"`
ParentBlockRoot string `json:"parent_block_root"`
BlockHash string `json:"block_hash"`
PrevRandao string `json:"prev_randao"`
FeeRecipient string `json:"fee_recipient"`
GasLimit string `json:"gas_limit"`
BuilderIndex string `json:"builder_index"`
Slot string `json:"slot"`
Value string `json:"value"`
ExecutionPayment string `json:"execution_payment"`
BlobKzgCommitments []string `json:"blob_kzg_commitments"`
}
type SignedExecutionPayloadBid struct {

View File

@@ -2939,18 +2939,22 @@ func SignedExecutionPayloadBidFromConsensus(b *eth.SignedExecutionPayloadBid) *S
}
func ExecutionPayloadBidFromConsensus(b *eth.ExecutionPayloadBid) *ExecutionPayloadBid {
blobKzgCommitments := make([]string, len(b.BlobKzgCommitments))
for i := range b.BlobKzgCommitments {
blobKzgCommitments[i] = hexutil.Encode(b.BlobKzgCommitments[i])
}
return &ExecutionPayloadBid{
ParentBlockHash: hexutil.Encode(b.ParentBlockHash),
ParentBlockRoot: hexutil.Encode(b.ParentBlockRoot),
BlockHash: hexutil.Encode(b.BlockHash),
PrevRandao: hexutil.Encode(b.PrevRandao),
FeeRecipient: hexutil.Encode(b.FeeRecipient),
GasLimit: fmt.Sprintf("%d", b.GasLimit),
BuilderIndex: fmt.Sprintf("%d", b.BuilderIndex),
Slot: fmt.Sprintf("%d", b.Slot),
Value: fmt.Sprintf("%d", b.Value),
ExecutionPayment: fmt.Sprintf("%d", b.ExecutionPayment),
BlobKzgCommitmentsRoot: hexutil.Encode(b.BlobKzgCommitmentsRoot),
ParentBlockHash: hexutil.Encode(b.ParentBlockHash),
ParentBlockRoot: hexutil.Encode(b.ParentBlockRoot),
BlockHash: hexutil.Encode(b.BlockHash),
PrevRandao: hexutil.Encode(b.PrevRandao),
FeeRecipient: hexutil.Encode(b.FeeRecipient),
GasLimit: fmt.Sprintf("%d", b.GasLimit),
BuilderIndex: fmt.Sprintf("%d", b.BuilderIndex),
Slot: fmt.Sprintf("%d", b.Slot),
Value: fmt.Sprintf("%d", b.Value),
ExecutionPayment: fmt.Sprintf("%d", b.ExecutionPayment),
BlobKzgCommitments: blobKzgCommitments,
}
}
@@ -3187,22 +3191,30 @@ func (b *ExecutionPayloadBid) ToConsensus() (*eth.ExecutionPayloadBid, error) {
if err != nil {
return nil, server.NewDecodeError(err, "ExecutionPayment")
}
blobKzgCommitmentsRoot, err := bytesutil.DecodeHexWithLength(b.BlobKzgCommitmentsRoot, fieldparams.RootLength)
err = slice.VerifyMaxLength(b.BlobKzgCommitments, fieldparams.MaxBlobCommitmentsPerBlock)
if err != nil {
return nil, server.NewDecodeError(err, "BlobKzgCommitmentsRoot")
return nil, server.NewDecodeError(err, "BlobKzgCommitments")
}
blobKzgCommitments := make([][]byte, len(b.BlobKzgCommitments))
for i, commitment := range b.BlobKzgCommitments {
kzg, err := bytesutil.DecodeHexWithLength(commitment, fieldparams.BLSPubkeyLength)
if err != nil {
return nil, server.NewDecodeError(err, fmt.Sprintf("BlobKzgCommitments[%d]", i))
}
blobKzgCommitments[i] = kzg
}
return &eth.ExecutionPayloadBid{
ParentBlockHash: parentBlockHash,
ParentBlockRoot: parentBlockRoot,
BlockHash: blockHash,
PrevRandao: prevRandao,
FeeRecipient: feeRecipient,
GasLimit: gasLimit,
BuilderIndex: primitives.BuilderIndex(builderIndex),
Slot: primitives.Slot(slot),
Value: primitives.Gwei(value),
ExecutionPayment: primitives.Gwei(executionPayment),
BlobKzgCommitmentsRoot: blobKzgCommitmentsRoot,
ParentBlockHash: parentBlockHash,
ParentBlockRoot: parentBlockRoot,
BlockHash: blockHash,
PrevRandao: prevRandao,
FeeRecipient: feeRecipient,
GasLimit: gasLimit,
BuilderIndex: primitives.BuilderIndex(builderIndex),
Slot: primitives.Slot(slot),
Value: primitives.Gwei(value),
ExecutionPayment: primitives.Gwei(executionPayment),
BlobKzgCommitments: blobKzgCommitments,
}, nil
}

View File

@@ -27,6 +27,7 @@ go_library(
"receive_blob.go",
"receive_block.go",
"receive_data_column.go",
"receive_payload_attestation_message.go",
"service.go",
"setup_forkchoice.go",
"tracked_proposer.go",
@@ -85,6 +86,7 @@ go_library(
"//consensus-types/primitives:go_default_library",
"//crypto/bls:go_default_library",
"//encoding/bytesutil:go_default_library",
"//io/logs:go_default_library",
"//math:go_default_library",
"//monitoring/tracing:go_default_library",
"//monitoring/tracing/trace:go_default_library",

View File

@@ -110,7 +110,7 @@ func VerifyCellKZGProofBatch(commitmentsBytes []Bytes48, cellIndices []uint64, c
ckzgCells := make([]ckzg4844.Cell, len(cells))
for i := range cells {
ckzgCells[i] = ckzg4844.Cell(cells[i])
copy(ckzgCells[i][:], cells[i][:])
}
return ckzg4844.VerifyCellKZGProofBatch(commitmentsBytes, cellIndices, ckzgCells, proofsBytes)
}

View File

@@ -10,6 +10,7 @@ import (
consensus_types "github.com/OffchainLabs/prysm/v7/consensus-types"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/io/logs"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
prysmTime "github.com/OffchainLabs/prysm/v7/time"
@@ -87,36 +88,45 @@ func logStateTransitionData(b interfaces.ReadOnlyBeaconBlock) error {
func logBlockSyncStatus(block interfaces.ReadOnlyBeaconBlock, blockRoot [32]byte, justified, finalized *ethpb.Checkpoint, receivedTime time.Time, genesis time.Time, daWaitedTime time.Duration) error {
startTime, err := slots.StartTime(genesis, block.Slot())
if err != nil {
return err
return errors.Wrap(err, "failed to get slot start time")
}
level := log.Logger.GetLevel()
parentRoot := block.ParentRoot()
blkRoot := fmt.Sprintf("0x%s...", hex.EncodeToString(blockRoot[:])[:8])
finalizedRoot := fmt.Sprintf("0x%s...", hex.EncodeToString(finalized.Root)[:8])
sinceSlotStartTime := prysmTime.Now().Sub(startTime)
lessFields := logrus.Fields{
"slot": block.Slot(),
"block": blkRoot,
"finalizedEpoch": finalized.Epoch,
"finalizedRoot": finalizedRoot,
"epoch": slots.ToEpoch(block.Slot()),
"sinceSlotStartTime": sinceSlotStartTime,
}
moreFields := logrus.Fields{
"slot": block.Slot(),
"slotInEpoch": block.Slot() % params.BeaconConfig().SlotsPerEpoch,
"block": blkRoot,
"epoch": slots.ToEpoch(block.Slot()),
"justifiedEpoch": justified.Epoch,
"justifiedRoot": fmt.Sprintf("0x%s...", hex.EncodeToString(justified.Root)[:8]),
"finalizedEpoch": finalized.Epoch,
"finalizedRoot": finalizedRoot,
"parentRoot": fmt.Sprintf("0x%s...", hex.EncodeToString(parentRoot[:])[:8]),
"version": version.String(block.Version()),
"sinceSlotStartTime": sinceSlotStartTime,
"chainServiceProcessedTime": prysmTime.Now().Sub(receivedTime) - daWaitedTime,
"dataAvailabilityWaitedTime": daWaitedTime,
}
level := logs.PackageVerbosity("beacon-chain/blockchain")
if level >= logrus.DebugLevel {
parentRoot := block.ParentRoot()
lf := logrus.Fields{
"slot": block.Slot(),
"slotInEpoch": block.Slot() % params.BeaconConfig().SlotsPerEpoch,
"block": fmt.Sprintf("0x%s...", hex.EncodeToString(blockRoot[:])[:8]),
"epoch": slots.ToEpoch(block.Slot()),
"justifiedEpoch": justified.Epoch,
"justifiedRoot": fmt.Sprintf("0x%s...", hex.EncodeToString(justified.Root)[:8]),
"finalizedEpoch": finalized.Epoch,
"finalizedRoot": fmt.Sprintf("0x%s...", hex.EncodeToString(finalized.Root)[:8]),
"parentRoot": fmt.Sprintf("0x%s...", hex.EncodeToString(parentRoot[:])[:8]),
"version": version.String(block.Version()),
"sinceSlotStartTime": prysmTime.Now().Sub(startTime),
"chainServiceProcessedTime": prysmTime.Now().Sub(receivedTime) - daWaitedTime,
"dataAvailabilityWaitedTime": daWaitedTime,
}
log.WithFields(lf).Debug("Synced new block")
} else {
log.WithFields(logrus.Fields{
"slot": block.Slot(),
"block": fmt.Sprintf("0x%s...", hex.EncodeToString(blockRoot[:])[:8]),
"finalizedEpoch": finalized.Epoch,
"finalizedRoot": fmt.Sprintf("0x%s...", hex.EncodeToString(finalized.Root)[:8]),
"epoch": slots.ToEpoch(block.Slot()),
}).Info("Synced new block")
log.WithFields(moreFields).Info("Synced new block")
return nil
}
log.WithFields(lessFields).WithField(logs.LogTargetField, logs.LogTargetUser).Info("Synced new block")
log.WithFields(moreFields).WithField(logs.LogTargetField, logs.LogTargetEphemeral).Info("Synced new block")
return nil
}

View File

@@ -0,0 +1,19 @@
package blockchain
import (
"context"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
)
// PayloadAttestationReceiver interface defines the methods of chain service for receiving
// validated payload attestation messages.
type PayloadAttestationReceiver interface {
ReceivePayloadAttestationMessage(context.Context, *ethpb.PayloadAttestationMessage) error
}
// ReceivePayloadAttestationMessage accepts a payload attestation message.
func (s *Service) ReceivePayloadAttestationMessage(ctx context.Context, a *ethpb.PayloadAttestationMessage) error {
// TODO: Handle payload attestation message processing once Gloas is fully wired.
return nil
}

View File

@@ -89,7 +89,7 @@ func (mb *mockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
return nil
}
func (mb *mockBroadcaster) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn, _ []blocks.PartialDataColumn) error {
func (mb *mockBroadcaster) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn) error {
mb.broadcastCalled = true
return nil
}

View File

@@ -757,6 +757,11 @@ func (c *ChainService) ReceiveDataColumns(dcs []blocks.VerifiedRODataColumn) err
return nil
}
// ReceivePayloadAttestationMessage implements the same method in the chain service.
func (c *ChainService) ReceivePayloadAttestationMessage(_ context.Context, _ *ethpb.PayloadAttestationMessage) error {
return nil
}
// DependentRootForEpoch mocks the same method in the chain service
func (c *ChainService) DependentRootForEpoch(_ [32]byte, _ primitives.Epoch) ([32]byte, error) {
return c.TargetRoot, nil

View File

@@ -17,6 +17,7 @@ go_library(
"error.go",
"interfaces.go",
"log.go",
"payload_attestation.go",
"payload_id.go",
"proposer_indices.go",
"proposer_indices_disabled.go", # keep
@@ -76,6 +77,7 @@ go_test(
"checkpoint_state_test.go",
"committee_fuzz_test.go",
"committee_test.go",
"payload_attestation_test.go",
"payload_id_test.go",
"private_access_test.go",
"proposer_indices_test.go",

View File

@@ -0,0 +1,53 @@
package cache
import (
"sync"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
)
// PayloadAttestationCache tracks seen payload attestation messages for a single slot.
type PayloadAttestationCache struct {
slot primitives.Slot
seen map[primitives.ValidatorIndex]struct{}
mu sync.RWMutex
}
// Seen returns true if a vote for the given slot has already been
// processed for this validator index.
func (p *PayloadAttestationCache) Seen(slot primitives.Slot, idx primitives.ValidatorIndex) bool {
p.mu.RLock()
defer p.mu.RUnlock()
if p.slot != slot {
return false
}
if p.seen == nil {
return false
}
_, ok := p.seen[idx]
return ok
}
// Add marks the given slot and validator index as seen.
// This function assumes that the message has already been validated.
func (p *PayloadAttestationCache) Add(slot primitives.Slot, idx primitives.ValidatorIndex) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.slot != slot {
p.slot = slot
p.seen = make(map[primitives.ValidatorIndex]struct{})
}
if p.seen == nil {
p.seen = make(map[primitives.ValidatorIndex]struct{})
}
p.seen[idx] = struct{}{}
return nil
}
// Clear clears the internal cache.
func (p *PayloadAttestationCache) Clear() {
p.mu.Lock()
defer p.mu.Unlock()
p.slot = 0
p.seen = nil
}

View File

@@ -0,0 +1,48 @@
package cache_test
import (
"testing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/stretchr/testify/require"
)
func TestPayloadAttestationCache_SeenAndAdd(t *testing.T) {
var c cache.PayloadAttestationCache
slot1 := primitives.Slot(1)
slot2 := primitives.Slot(2)
idx1 := primitives.ValidatorIndex(3)
idx2 := primitives.ValidatorIndex(4)
require.False(t, c.Seen(slot1, idx1))
require.NoError(t, c.Add(slot1, idx1))
require.True(t, c.Seen(slot1, idx1))
require.False(t, c.Seen(slot1, idx2))
require.False(t, c.Seen(slot2, idx1))
require.NoError(t, c.Add(slot1, idx2))
require.True(t, c.Seen(slot1, idx1))
require.True(t, c.Seen(slot1, idx2))
require.NoError(t, c.Add(slot2, idx1))
require.True(t, c.Seen(slot2, idx1))
require.False(t, c.Seen(slot1, idx1))
require.False(t, c.Seen(slot1, idx2))
}
func TestPayloadAttestationCache_Clear(t *testing.T) {
var c cache.PayloadAttestationCache
slot := primitives.Slot(10)
idx := primitives.ValidatorIndex(42)
require.NoError(t, c.Add(slot, idx))
require.True(t, c.Seen(slot, idx))
c.Clear()
require.False(t, c.Seen(slot, idx))
require.NoError(t, c.Add(slot, idx))
require.True(t, c.Seen(slot, idx))
}

View File

@@ -20,6 +20,7 @@ go_library(
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/epoch:go_default_library",
"//beacon-chain/core/epoch/precompute:go_default_library",
"//beacon-chain/core/gloas:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/time:go_default_library",

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/blocks"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/gloas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
@@ -75,7 +76,11 @@ func ProcessAttestationNoVerifySignature(
return nil, err
}
return SetParticipationAndRewardProposer(ctx, beaconState, att.GetData().Target.Epoch, indices, participatedFlags, totalBalance)
if err := beaconState.UpdatePendingPaymentWeight(att, indices, participatedFlags); err != nil {
return nil, errors.Wrap(err, "failed to update pending payment weight")
}
return SetParticipationAndRewardProposer(ctx, beaconState, att.GetData().Target.Epoch, indices, participatedFlags, totalBalance, att)
}
// SetParticipationAndRewardProposer retrieves and sets the epoch participation bits in state. Based on the epoch participation, it rewards
@@ -105,7 +110,9 @@ func SetParticipationAndRewardProposer(
beaconState state.BeaconState,
targetEpoch primitives.Epoch,
indices []uint64,
participatedFlags map[uint8]bool, totalBalance uint64) (state.BeaconState, error) {
participatedFlags map[uint8]bool,
totalBalance uint64,
att ethpb.Att) (state.BeaconState, error) {
var proposerRewardNumerator uint64
currentEpoch := time.CurrentEpoch(beaconState)
var stateErr error
@@ -299,6 +306,19 @@ func AttestationParticipationFlagIndices(beaconState state.ReadOnlyBeaconState,
participatedFlags[targetFlagIndex] = true
}
matchedSrcTgtHead := matchedHead && matchedSrcTgt
var beaconBlockRoot [32]byte
copy(beaconBlockRoot[:], data.BeaconBlockRoot)
matchingPayload, err := gloas.MatchingPayload(
beaconState,
beaconBlockRoot,
data.Slot,
uint64(data.CommitteeIndex),
)
if err != nil {
return nil, err
}
matchedSrcTgtHead = matchedSrcTgtHead && matchingPayload
if matchedSrcTgtHead && delay == cfg.MinAttestationInclusionDelay {
participatedFlags[headFlagIndex] = true
}

View File

@@ -1,7 +1,9 @@
package altair_test
import (
"bytes"
"fmt"
"reflect"
"testing"
"github.com/OffchainLabs/go-bitfield"
@@ -556,7 +558,7 @@ func TestSetParticipationAndRewardProposer(t *testing.T) {
b, err := helpers.TotalActiveBalance(beaconState)
require.NoError(t, err)
st, err := altair.SetParticipationAndRewardProposer(t.Context(), beaconState, test.epoch, test.indices, test.participatedFlags, b)
st, err := altair.SetParticipationAndRewardProposer(t.Context(), beaconState, test.epoch, test.indices, test.participatedFlags, b, &ethpb.Attestation{})
require.NoError(t, err)
i, err := helpers.BeaconProposerIndex(t.Context(), st)
@@ -775,11 +777,67 @@ func TestAttestationParticipationFlagIndices(t *testing.T) {
headFlagIndex: true,
},
},
{
name: "gloas same-slot committee index non-zero errors",
inputState: func() state.BeaconState {
stateSlot := primitives.Slot(5)
slot := primitives.Slot(3)
targetRoot := bytes.Repeat([]byte{0xAA}, 32)
headRoot := bytes.Repeat([]byte{0xBB}, 32)
prevRoot := bytes.Repeat([]byte{0xCC}, 32)
return buildGloasStateForFlags(t, stateSlot, slot, targetRoot, headRoot, prevRoot, 0, 0)
}(),
inputData: &ethpb.AttestationData{
Slot: 3,
CommitteeIndex: 1, // invalid for same-slot
BeaconBlockRoot: bytes.Repeat([]byte{0xBB}, 32),
Source: &ethpb.Checkpoint{Root: bytes.Repeat([]byte{0xDD}, 32)},
Target: &ethpb.Checkpoint{
Epoch: 0,
Root: bytes.Repeat([]byte{0xAA}, 32),
},
},
inputDelay: 1,
participationIndices: nil,
},
{
name: "gloas payload availability matches committee index",
inputState: func() state.BeaconState {
stateSlot := primitives.Slot(5)
slot := primitives.Slot(3)
targetRoot := bytes.Repeat([]byte{0xAA}, 32)
headRoot := bytes.Repeat([]byte{0xBB}, 32)
// Same prev root to make SameSlotAttestation false and use payload availability.
return buildGloasStateForFlags(t, stateSlot, slot, targetRoot, headRoot, headRoot, 1, slot)
}(),
inputData: &ethpb.AttestationData{
Slot: 3,
CommitteeIndex: 1,
BeaconBlockRoot: bytes.Repeat([]byte{0xBB}, 32),
Source: &ethpb.Checkpoint{Root: bytes.Repeat([]byte{0xDD}, 32)},
Target: &ethpb.Checkpoint{
Epoch: 0,
Root: bytes.Repeat([]byte{0xAA}, 32),
},
},
inputDelay: 1,
participationIndices: map[uint8]bool{
sourceFlagIndex: true,
targetFlagIndex: true,
headFlagIndex: true,
},
},
}
for _, test := range tests {
flagIndices, err := altair.AttestationParticipationFlagIndices(test.inputState, test.inputData, test.inputDelay)
if test.participationIndices == nil {
require.ErrorContains(t, "committee index", err)
continue
}
require.NoError(t, err)
require.DeepEqual(t, test.participationIndices, flagIndices)
if !reflect.DeepEqual(test.participationIndices, flagIndices) {
t.Fatalf("unexpected participation indices: got %v want %v", flagIndices, test.participationIndices)
}
}
}
@@ -858,3 +916,61 @@ func TestMatchingStatus(t *testing.T) {
require.Equal(t, test.matchedHead, head)
}
}
func buildGloasStateForFlags(t *testing.T, stateSlot, slot primitives.Slot, targetRoot, headRoot, prevRoot []byte, availabilityBit uint8, availabilitySlot primitives.Slot) state.BeaconState {
t.Helper()
cfg := params.BeaconConfig()
blockRoots := make([][]byte, cfg.SlotsPerHistoricalRoot)
blockRoots[0] = targetRoot
blockRoots[slot%cfg.SlotsPerHistoricalRoot] = headRoot
blockRoots[(slot-1)%cfg.SlotsPerHistoricalRoot] = prevRoot
stateRoots := make([][]byte, cfg.SlotsPerHistoricalRoot)
for i := range stateRoots {
stateRoots[i] = make([]byte, fieldparams.RootLength)
}
randaoMixes := make([][]byte, cfg.EpochsPerHistoricalVector)
for i := range randaoMixes {
randaoMixes[i] = make([]byte, fieldparams.RootLength)
}
execPayloadAvailability := make([]byte, cfg.SlotsPerHistoricalRoot/8)
idx := availabilitySlot % cfg.SlotsPerHistoricalRoot
byteIndex := idx / 8
bitIndex := idx % 8
if availabilityBit == 1 {
execPayloadAvailability[byteIndex] |= 1 << bitIndex
}
checkpointRoot := bytes.Repeat([]byte{0xDD}, fieldparams.RootLength)
justified := &ethpb.Checkpoint{Root: checkpointRoot}
stProto := &ethpb.BeaconStateGloas{
Slot: stateSlot,
GenesisValidatorsRoot: bytes.Repeat([]byte{0x11}, fieldparams.RootLength),
BlockRoots: blockRoots,
StateRoots: stateRoots,
RandaoMixes: randaoMixes,
ExecutionPayloadAvailability: execPayloadAvailability,
CurrentJustifiedCheckpoint: justified,
PreviousJustifiedCheckpoint: justified,
Validators: []*ethpb.Validator{
{
EffectiveBalance: cfg.MinActivationBalance,
WithdrawalCredentials: append([]byte{cfg.ETH1AddressWithdrawalPrefixByte}, bytes.Repeat([]byte{0x01}, 31)...),
},
},
Balances: []uint64{cfg.MinActivationBalance},
BuilderPendingPayments: make([]*ethpb.BuilderPendingPayment, cfg.SlotsPerEpoch*2),
Fork: &ethpb.Fork{
CurrentVersion: bytes.Repeat([]byte{0x01}, 4),
PreviousVersion: bytes.Repeat([]byte{0x01}, 4),
Epoch: 0,
},
}
beaconState, err := state_native.InitializeFromProtoGloas(stProto)
require.NoError(t, err)
return beaconState
}

View File

@@ -111,10 +111,21 @@ func VerifyAttestationNoVerifySignature(
var indexedAtt ethpb.IndexedAtt
if att.Version() >= version.Electra {
if att.GetData().CommitteeIndex != 0 {
return errors.New("committee index must be 0 post-Electra")
ci := att.GetData().CommitteeIndex
// Spec v1.7.0-alpha pseudocode:
//
// # [Modified in Gloas:EIP7732]
// assert data.index < 2
//
if beaconState.Version() >= version.Gloas {
if ci >= 2 {
return fmt.Errorf("incorrect committee index %d", ci)
}
} else {
if ci != 0 {
return errors.New("committee index must be 0 between Electra and Gloas forks")
}
}
aggBits := att.GetAggregationBits()
committeeIndices := att.CommitteeBitsVal().BitIndices()
committees := make([][]primitives.ValidatorIndex, len(committeeIndices))

View File

@@ -1,6 +1,7 @@
package blocks_test
import (
"bytes"
"context"
"testing"
@@ -262,7 +263,7 @@ func TestVerifyAttestationNoVerifySignature_Electra(t *testing.T) {
CommitteeBits: bitfield.NewBitvector64(),
}
err = blocks.VerifyAttestationNoVerifySignature(context.TODO(), beaconState, att)
assert.ErrorContains(t, "committee index must be 0 post-Electra", err)
assert.ErrorContains(t, "committee index must be 0", err)
})
t.Run("index of committee too big", func(t *testing.T) {
aggBits := bitfield.NewBitlist(3)
@@ -314,6 +315,75 @@ func TestVerifyAttestationNoVerifySignature_Electra(t *testing.T) {
})
}
func TestVerifyAttestationNoVerifySignature_GloasCommitteeIndexLimit(t *testing.T) {
cfg := params.BeaconConfig()
stateSlot := cfg.MinAttestationInclusionDelay + 1
blockRoots := make([][]byte, cfg.SlotsPerHistoricalRoot)
for i := range blockRoots {
blockRoots[i] = make([]byte, fieldparams.RootLength)
}
stateRoots := make([][]byte, cfg.SlotsPerHistoricalRoot)
for i := range stateRoots {
stateRoots[i] = make([]byte, fieldparams.RootLength)
}
randaoMixes := make([][]byte, cfg.EpochsPerHistoricalVector)
for i := range randaoMixes {
randaoMixes[i] = make([]byte, fieldparams.RootLength)
}
checkpointRoot := bytes.Repeat([]byte{0xAA}, fieldparams.RootLength)
justified := &ethpb.Checkpoint{Epoch: 0, Root: checkpointRoot}
gloasStateProto := &ethpb.BeaconStateGloas{
Slot: stateSlot,
GenesisValidatorsRoot: bytes.Repeat([]byte{0x11}, fieldparams.RootLength),
BlockRoots: blockRoots,
StateRoots: stateRoots,
RandaoMixes: randaoMixes,
ExecutionPayloadAvailability: make([]byte, cfg.SlotsPerHistoricalRoot/8),
CurrentJustifiedCheckpoint: justified,
PreviousJustifiedCheckpoint: justified,
Validators: []*ethpb.Validator{
{
EffectiveBalance: cfg.MinActivationBalance,
WithdrawalCredentials: append([]byte{cfg.ETH1AddressWithdrawalPrefixByte}, bytes.Repeat([]byte{0x01}, 31)...),
},
},
Balances: []uint64{cfg.MinActivationBalance},
BuilderPendingPayments: make([]*ethpb.BuilderPendingPayment, cfg.SlotsPerEpoch*2),
Fork: &ethpb.Fork{
CurrentVersion: bytes.Repeat([]byte{0x01}, 4),
PreviousVersion: bytes.Repeat([]byte{0x01}, 4),
Epoch: 0,
},
}
beaconState, err := state_native.InitializeFromProtoGloas(gloasStateProto)
require.NoError(t, err)
committeeBits := bitfield.NewBitvector64()
committeeBits.SetBitAt(0, true)
aggBits := bitfield.NewBitlist(1)
aggBits.SetBitAt(0, true)
att := &ethpb.AttestationElectra{
Data: &ethpb.AttestationData{
Slot: 0,
CommitteeIndex: 2, // invalid for Gloas (must be <2)
BeaconBlockRoot: blockRoots[0],
Source: justified,
Target: justified,
},
AggregationBits: aggBits,
CommitteeBits: committeeBits,
Signature: bytes.Repeat([]byte{0x00}, fieldparams.BLSSignatureLength),
}
err = blocks.VerifyAttestationNoVerifySignature(context.TODO(), beaconState, att)
assert.ErrorContains(t, "incorrect committee index 2", err)
}
func TestConvertToIndexed_OK(t *testing.T) {
helpers.ClearCache()
validators := make([]*ethpb.Validator, 2*params.BeaconConfig().SlotsPerEpoch)
@@ -583,6 +653,7 @@ func TestVerifyAttestations_HandlesPlannedFork(t *testing.T) {
}
func TestRetrieveAttestationSignatureSet_VerifiesMultipleAttestations(t *testing.T) {
helpers.ClearCache()
ctx := t.Context()
numOfValidators := uint64(params.BeaconConfig().SlotsPerEpoch.Mul(4))
validators := make([]*ethpb.Validator, numOfValidators)

View File

@@ -3,6 +3,7 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"attestation.go",
"bid.go",
"payload_attestation.go",
"pending_payment.go",
@@ -26,6 +27,7 @@ go_library(
"//crypto/hash:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_pkg_errors//:go_default_library",
],
@@ -34,6 +36,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"attestation_test.go",
"bid_test.go",
"payload_attestation_test.go",
"pending_payment_test.go",

View File

@@ -0,0 +1,52 @@
package gloas
import (
"fmt"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/pkg/errors"
)
// MatchingPayload returns true if the attestation's committee index matches the expected payload index.
//
// For pre-Gloas forks, this always returns true.
//
// Spec v1.7.0-alpha (pseudocode):
//
// # [New in Gloas:EIP7732]
// if is_attestation_same_slot(state, data):
// assert data.index == 0
// payload_matches = True
// else:
// slot_index = data.slot % SLOTS_PER_HISTORICAL_ROOT
// payload_index = state.execution_payload_availability[slot_index]
// payload_matches = data.index == payload_index
func MatchingPayload(
beaconState state.ReadOnlyBeaconState,
beaconBlockRoot [32]byte,
slot primitives.Slot,
committeeIndex uint64,
) (bool, error) {
if beaconState.Version() < version.Gloas {
return true, nil
}
sameSlot, err := beaconState.IsAttestationSameSlot(beaconBlockRoot, slot)
if err != nil {
return false, errors.Wrap(err, "failed to get same slot attestation status")
}
if sameSlot {
if committeeIndex != 0 {
return false, fmt.Errorf("committee index %d for same slot attestation must be 0", committeeIndex)
}
return true, nil
}
executionPayloadAvail, err := beaconState.ExecutionPayloadAvailability(slot)
if err != nil {
return false, errors.Wrap(err, "failed to get execution payload availability status")
}
return executionPayloadAvail == committeeIndex, nil
}

View File

@@ -0,0 +1,110 @@
package gloas
import (
"bytes"
"testing"
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/testing/require"
)
func buildStateWithBlockRoots(t *testing.T, stateSlot primitives.Slot, roots map[primitives.Slot][]byte) *state_native.BeaconState {
t.Helper()
cfg := params.BeaconConfig()
blockRoots := make([][]byte, cfg.SlotsPerHistoricalRoot)
for slot, root := range roots {
blockRoots[slot%cfg.SlotsPerHistoricalRoot] = root
}
stProto := &ethpb.BeaconStateGloas{
Slot: stateSlot,
BlockRoots: blockRoots,
}
state, err := state_native.InitializeFromProtoGloas(stProto)
require.NoError(t, err)
return state.(*state_native.BeaconState)
}
func TestMatchingPayload(t *testing.T) {
t.Run("pre-gloas always true", func(t *testing.T) {
stIface, err := state_native.InitializeFromProtoElectra(&ethpb.BeaconStateElectra{})
require.NoError(t, err)
ok, err := MatchingPayload(stIface, [32]byte{}, 0, 123)
require.NoError(t, err)
require.Equal(t, true, ok)
})
t.Run("same slot requires committee index 0", func(t *testing.T) {
root := bytes.Repeat([]byte{0xAA}, 32)
state := buildStateWithBlockRoots(t, 6, map[primitives.Slot][]byte{
4: root,
3: bytes.Repeat([]byte{0xBB}, 32),
})
var rootArr [32]byte
copy(rootArr[:], root)
ok, err := MatchingPayload(state, rootArr, 4, 1)
require.ErrorContains(t, "committee index", err)
require.Equal(t, false, ok)
})
t.Run("same slot matches when committee index is 0", func(t *testing.T) {
root := bytes.Repeat([]byte{0xAA}, 32)
state := buildStateWithBlockRoots(t, 6, map[primitives.Slot][]byte{
4: root,
3: bytes.Repeat([]byte{0xBB}, 32),
})
var rootArr [32]byte
copy(rootArr[:], root)
ok, err := MatchingPayload(state, rootArr, 4, 0)
require.NoError(t, err)
require.Equal(t, true, ok)
})
t.Run("non same slot checks payload availability", func(t *testing.T) {
cfg := params.BeaconConfig()
root := bytes.Repeat([]byte{0xAA}, 32)
blockRoots := make([][]byte, cfg.SlotsPerHistoricalRoot)
blockRoots[4%cfg.SlotsPerHistoricalRoot] = bytes.Repeat([]byte{0xCC}, 32)
blockRoots[3%cfg.SlotsPerHistoricalRoot] = bytes.Repeat([]byte{0xBB}, 32)
availability := make([]byte, cfg.SlotsPerHistoricalRoot/8)
slotIndex := uint64(4)
availability[slotIndex/8] = byte(1 << (slotIndex % 8))
stIface, err := state_native.InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
Slot: 6,
BlockRoots: blockRoots,
ExecutionPayloadAvailability: availability,
Fork: &ethpb.Fork{
CurrentVersion: bytes.Repeat([]byte{0x66}, 4),
PreviousVersion: bytes.Repeat([]byte{0x66}, 4),
Epoch: 0,
},
})
require.NoError(t, err)
state := stIface.(*state_native.BeaconState)
require.Equal(t, version.Gloas, state.Version())
var rootArr [32]byte
copy(rootArr[:], root)
ok, err := MatchingPayload(state, rootArr, 4, 1)
require.NoError(t, err)
require.Equal(t, true, ok)
ok, err = MatchingPayload(state, rootArr, 4, 0)
require.NoError(t, err)
require.Equal(t, false, ok)
})
}

View File

@@ -17,27 +17,56 @@ import (
)
// ProcessExecutionPayloadBid processes a signed execution payload bid in the Gloas fork.
// Spec v1.7.0-alpha.0 (pseudocode):
// process_execution_payload_bid(state: BeaconState, block: BeaconBlock):
//
// signed_bid = block.body.signed_execution_payload_bid
// bid = signed_bid.message
// builder_index = bid.builder_index
// amount = bid.value
// if builder_index == BUILDER_INDEX_SELF_BUILD:
// assert amount == 0
// assert signed_bid.signature == G2_POINT_AT_INFINITY
// else:
// assert is_active_builder(state, builder_index)
// assert can_builder_cover_bid(state, builder_index, amount)
// assert verify_execution_payload_bid_signature(state, signed_bid)
// assert bid.slot == block.slot
// assert bid.parent_block_hash == state.latest_block_hash
// assert bid.parent_block_root == block.parent_root
// assert bid.prev_randao == get_randao_mix(state, get_current_epoch(state))
// if amount > 0:
// state.builder_pending_payments[...] = BuilderPendingPayment(weight=0, withdrawal=BuilderPendingWithdrawal(fee_recipient=bid.fee_recipient, amount=amount, builder_index=builder_index))
// state.latest_execution_payload_bid = bid
// <spec fn="process_execution_payload_bid" fork="gloas" hash="823c9f3a">
// def process_execution_payload_bid(state: BeaconState, block: BeaconBlock) -> None:
// signed_bid = block.body.signed_execution_payload_bid
// bid = signed_bid.message
// builder_index = bid.builder_index
// amount = bid.value
//
// # For self-builds, amount must be zero regardless of withdrawal credential prefix
// if builder_index == BUILDER_INDEX_SELF_BUILD:
// assert amount == 0
// assert signed_bid.signature == bls.G2_POINT_AT_INFINITY
// else:
// # Verify that the builder is active
// assert is_active_builder(state, builder_index)
// # Verify that the builder has funds to cover the bid
// assert can_builder_cover_bid(state, builder_index, amount)
// # Verify that the bid signature is valid
// assert verify_execution_payload_bid_signature(state, signed_bid)
//
// # Verify commitments are under limit
// assert (
// len(bid.blob_kzg_commitments)
// <= get_blob_parameters(get_current_epoch(state)).max_blobs_per_block
// )
//
// # Verify that the bid is for the current slot
// assert bid.slot == block.slot
// # Verify that the bid is for the right parent block
// assert bid.parent_block_hash == state.latest_block_hash
// assert bid.parent_block_root == block.parent_root
// assert bid.prev_randao == get_randao_mix(state, get_current_epoch(state))
//
// # Record the pending payment if there is some payment
// if amount > 0:
// pending_payment = BuilderPendingPayment(
// weight=0,
// withdrawal=BuilderPendingWithdrawal(
// fee_recipient=bid.fee_recipient,
// amount=amount,
// builder_index=builder_index,
// ),
// )
// state.builder_pending_payments[SLOTS_PER_EPOCH + bid.slot % SLOTS_PER_EPOCH] = (
// pending_payment
// )
//
// # Cache the signed execution payload bid
// state.latest_execution_payload_bid = bid
// </spec>
func ProcessExecutionPayloadBid(st state.BeaconState, block interfaces.ReadOnlyBeaconBlock) error {
signedBid, err := block.Body().SignedExecutionPayloadBid()
if err != nil {
@@ -86,6 +115,12 @@ func ProcessExecutionPayloadBid(st state.BeaconState, block interfaces.ReadOnlyB
}
}
maxBlobsPerBlock := params.BeaconConfig().MaxBlobsPerBlockAtEpoch(slots.ToEpoch(block.Slot()))
commitmentCount := bid.BlobKzgCommitmentCount()
if commitmentCount > uint64(maxBlobsPerBlock) {
return fmt.Errorf("bid has %d blob KZG commitments over max %d", commitmentCount, maxBlobsPerBlock)
}
if err := validateBidConsistency(st, bid, block); err != nil {
return errors.Wrap(err, "bid consistency validation failed")
}

View File

@@ -184,6 +184,28 @@ func signBid(t *testing.T, sk common.SecretKey, bid *ethpb.ExecutionPayloadBid,
return out
}
func blobCommitmentsForSlot(slot primitives.Slot, count int) [][]byte {
max := int(params.BeaconConfig().MaxBlobsPerBlockAtEpoch(slots.ToEpoch(slot)))
if count > max {
count = max
}
commitments := make([][]byte, count)
for i := range commitments {
commitments[i] = bytes.Repeat([]byte{0xEE}, 48)
}
return commitments
}
func tooManyBlobCommitmentsForSlot(slot primitives.Slot) [][]byte {
max := int(params.BeaconConfig().MaxBlobsPerBlockAtEpoch(slots.ToEpoch(slot)))
count := max + 1
commitments := make([][]byte, count)
for i := range commitments {
commitments[i] = bytes.Repeat([]byte{0xEE}, 48)
}
return commitments
}
func TestProcessExecutionPayloadBid_SelfBuildSuccess(t *testing.T) {
slot := primitives.Slot(12)
proposerIdx := primitives.ValidatorIndex(0)
@@ -194,17 +216,17 @@ func TestProcessExecutionPayloadBid_SelfBuildSuccess(t *testing.T) {
state := buildGloasState(t, slot, proposerIdx, builderIdx, params.BeaconConfig().MinActivationBalance+1000, randao, latestHash, pubKey)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 0,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0xEE}, 32),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 0,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
}
signed := &ethpb.SignedExecutionPayloadBid{
Message: bid,
@@ -236,16 +258,16 @@ func TestProcessExecutionPayloadBid_SelfBuildNonZeroAmountFails(t *testing.T) {
state := buildGloasState(t, slot, proposerIdx, builderIdx, params.BeaconConfig().MinActivationBalance+1000, randao, latestHash, [48]byte{})
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xAA}, 32),
BlockHash: bytes.Repeat([]byte{0xBB}, 32),
PrevRandao: randao[:],
BuilderIndex: builderIdx,
Slot: slot,
Value: 10,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0xCC}, 32),
FeeRecipient: bytes.Repeat([]byte{0xDD}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xAA}, 32),
BlockHash: bytes.Repeat([]byte{0xBB}, 32),
PrevRandao: randao[:],
BuilderIndex: builderIdx,
Slot: slot,
Value: 10,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0xDD}, 20),
}
signed := &ethpb.SignedExecutionPayloadBid{
Message: bid,
@@ -280,17 +302,17 @@ func TestProcessExecutionPayloadBid_PendingPaymentAndCacheBid(t *testing.T) {
state := buildGloasState(t, slot, proposerIdx, builderIdx, balance, randao, latestHash, pubKey)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 500_000,
ExecutionPayment: 1,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0xEE}, 32),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 500_000,
ExecutionPayment: 1,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
}
genesis := bytesutil.ToBytes32(state.GenesisValidatorsRoot())
@@ -341,17 +363,17 @@ func TestProcessExecutionPayloadBid_BuilderNotActive(t *testing.T) {
state = stateIface.(*state_native.BeaconState)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0x03}, 32),
BlockHash: bytes.Repeat([]byte{0x04}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 10,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0x05}, 32),
FeeRecipient: bytes.Repeat([]byte{0x06}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0x03}, 32),
BlockHash: bytes.Repeat([]byte{0x04}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 10,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0x06}, 20),
}
genesis := bytesutil.ToBytes32(state.GenesisValidatorsRoot())
sig := signBid(t, sk, bid, state.Fork(), genesis)
@@ -394,17 +416,17 @@ func TestProcessExecutionPayloadBid_CannotCoverBid(t *testing.T) {
state = stateIface.(*state_native.BeaconState)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 25,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0xEE}, 32),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 25,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
}
genesis := bytesutil.ToBytes32(state.GenesisValidatorsRoot())
sig := signBid(t, sk, bid, state.Fork(), genesis)
@@ -436,17 +458,17 @@ func TestProcessExecutionPayloadBid_InvalidSignature(t *testing.T) {
state := buildGloasState(t, slot, proposerIdx, builderIdx, params.BeaconConfig().MinDepositAmount+1000, randao, latestHash, pubKey)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 10,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0xEE}, 32),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 10,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
}
// Use an invalid signature.
invalidSig := [96]byte{1}
@@ -463,6 +485,42 @@ func TestProcessExecutionPayloadBid_InvalidSignature(t *testing.T) {
require.ErrorContains(t, "bid signature validation failed", err)
}
func TestProcessExecutionPayloadBid_TooManyBlobCommitments(t *testing.T) {
slot := primitives.Slot(9)
proposerIdx := primitives.ValidatorIndex(0)
builderIdx := params.BeaconConfig().BuilderIndexSelfBuild
randao := [32]byte(bytes.Repeat([]byte{0xAA}, 32))
latestHash := [32]byte(bytes.Repeat([]byte{0xBB}, 32))
pubKey := [48]byte{}
state := buildGloasState(t, slot, proposerIdx, builderIdx, params.BeaconConfig().MinActivationBalance+1000, randao, latestHash, pubKey)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
BuilderIndex: builderIdx,
Slot: slot,
BlobKzgCommitments: tooManyBlobCommitmentsForSlot(slot),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
}
signed := &ethpb.SignedExecutionPayloadBid{
Message: bid,
Signature: common.InfiniteSignature[:],
}
block := stubBlock{
slot: slot,
proposer: proposerIdx,
parentRoot: bytesutil.ToBytes32(bid.ParentBlockRoot),
body: stubBlockBody{signedBid: signed},
v: version.Gloas,
}
err := ProcessExecutionPayloadBid(state, block)
require.ErrorContains(t, "blob KZG commitments over max", err)
}
func TestProcessExecutionPayloadBid_SlotMismatch(t *testing.T) {
slot := primitives.Slot(10)
builderIdx := primitives.BuilderIndex(1)
@@ -478,17 +536,17 @@ func TestProcessExecutionPayloadBid_SlotMismatch(t *testing.T) {
state := buildGloasState(t, slot, proposerIdx, builderIdx, params.BeaconConfig().MinDepositAmount+1000, randao, latestHash, pubKey)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xAA}, 32),
BlockHash: bytes.Repeat([]byte{0xBB}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot + 1, // mismatch
Value: 1,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0xCC}, 32),
FeeRecipient: bytes.Repeat([]byte{0xDD}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xAA}, 32),
BlockHash: bytes.Repeat([]byte{0xBB}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot + 1, // mismatch
Value: 1,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0xDD}, 20),
}
genesis := bytesutil.ToBytes32(state.GenesisValidatorsRoot())
sig := signBid(t, sk, bid, state.Fork(), genesis)
@@ -520,17 +578,17 @@ func TestProcessExecutionPayloadBid_ParentHashMismatch(t *testing.T) {
state := buildGloasState(t, slot, proposerIdx, builderIdx, params.BeaconConfig().MinDepositAmount+1000, randao, latestHash, pubKey)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: bytes.Repeat([]byte{0x11}, 32), // mismatch
ParentBlockRoot: bytes.Repeat([]byte{0x22}, 32),
BlockHash: bytes.Repeat([]byte{0x33}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 1,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0x44}, 32),
FeeRecipient: bytes.Repeat([]byte{0x55}, 20),
ParentBlockHash: bytes.Repeat([]byte{0x11}, 32), // mismatch
ParentBlockRoot: bytes.Repeat([]byte{0x22}, 32),
BlockHash: bytes.Repeat([]byte{0x33}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 1,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0x55}, 20),
}
genesis := bytesutil.ToBytes32(state.GenesisValidatorsRoot())
sig := signBid(t, sk, bid, state.Fork(), genesis)
@@ -563,17 +621,17 @@ func TestProcessExecutionPayloadBid_ParentRootMismatch(t *testing.T) {
parentRoot := bytes.Repeat([]byte{0x22}, 32)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: parentRoot,
BlockHash: bytes.Repeat([]byte{0x33}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 1,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0x44}, 32),
FeeRecipient: bytes.Repeat([]byte{0x55}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: parentRoot,
BlockHash: bytes.Repeat([]byte{0x33}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 1,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0x55}, 20),
}
genesis := bytesutil.ToBytes32(state.GenesisValidatorsRoot())
sig := signBid(t, sk, bid, state.Fork(), genesis)
@@ -605,17 +663,17 @@ func TestProcessExecutionPayloadBid_PrevRandaoMismatch(t *testing.T) {
state := buildGloasState(t, slot, proposerIdx, builderIdx, params.BeaconConfig().MinDepositAmount+1000, randao, latestHash, pubKey)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0x22}, 32),
BlockHash: bytes.Repeat([]byte{0x33}, 32),
PrevRandao: bytes.Repeat([]byte{0x01}, 32), // mismatch
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 1,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0x44}, 32),
FeeRecipient: bytes.Repeat([]byte{0x55}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0x22}, 32),
BlockHash: bytes.Repeat([]byte{0x33}, 32),
PrevRandao: bytes.Repeat([]byte{0x01}, 32), // mismatch
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 1,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0x55}, 20),
}
genesis := bytesutil.ToBytes32(state.GenesisValidatorsRoot())
sig := signBid(t, sk, bid, state.Fork(), genesis)

View File

@@ -24,14 +24,21 @@ import (
)
// ProcessPayloadAttestations validates payload attestations in a block body.
// Spec v1.7.0-alpha.0 (pseudocode):
// process_payload_attestation(state: BeaconState, payload_attestation: PayloadAttestation):
//
// data = payload_attestation.data
// assert data.beacon_block_root == state.latest_block_header.parent_root
// assert data.slot + 1 == state.slot
// indexed = get_indexed_payload_attestation(state, data.slot, payload_attestation)
// assert is_valid_indexed_payload_attestation(state, indexed)
// <spec fn="process_payload_attestation" fork="gloas" hash="f46bf0b0">
// def process_payload_attestation(
// state: BeaconState, payload_attestation: PayloadAttestation
// ) -> None:
// data = payload_attestation.data
//
// # Check that the attestation is for the parent beacon block
// assert data.beacon_block_root == state.latest_block_header.parent_root
// # Check that the attestation is for the previous slot
// assert data.slot + 1 == state.slot
// # Verify signature
// indexed_payload_attestation = get_indexed_payload_attestation(state, payload_attestation)
// assert is_valid_indexed_payload_attestation(state, indexed_payload_attestation)
// </spec>
func ProcessPayloadAttestations(ctx context.Context, st state.BeaconState, body interfaces.ReadOnlyBeaconBlockBody) error {
atts, err := body.PayloadAttestations()
if err != nil {
@@ -70,7 +77,7 @@ func ProcessPayloadAttestations(ctx context.Context, st state.BeaconState, body
// indexedPayloadAttestation converts a payload attestation into its indexed form.
func indexedPayloadAttestation(ctx context.Context, st state.ReadOnlyBeaconState, att *eth.PayloadAttestation) (*consensus_types.IndexedPayloadAttestation, error) {
committee, err := payloadCommittee(ctx, st, att.Data.Slot)
committee, err := PayloadCommittee(ctx, st, att.Data.Slot)
if err != nil {
return nil, err
}
@@ -89,19 +96,26 @@ func indexedPayloadAttestation(ctx context.Context, st state.ReadOnlyBeaconState
}, nil
}
// payloadCommittee returns the payload timeliness committee for a given slot for the state.
// Spec v1.7.0-alpha.0 (pseudocode):
// get_ptc(state: BeaconState, slot: Slot) -> Vector[ValidatorIndex, PTC_SIZE]:
// PayloadCommittee returns the payload timeliness committee for a given slot for the state.
//
// epoch = compute_epoch_at_slot(slot)
// seed = hash(get_seed(state, epoch, DOMAIN_PTC_ATTESTER) + uint_to_bytes(slot))
// indices = []
// committees_per_slot = get_committee_count_per_slot(state, epoch)
// for i in range(committees_per_slot):
// committee = get_beacon_committee(state, slot, CommitteeIndex(i))
// indices.extend(committee)
// return compute_balance_weighted_selection(state, indices, seed, size=PTC_SIZE, shuffle_indices=False)
func payloadCommittee(ctx context.Context, st state.ReadOnlyBeaconState, slot primitives.Slot) ([]primitives.ValidatorIndex, error) {
// <spec fn="get_ptc" fork="gloas" hash="ae15f761">
// def get_ptc(state: BeaconState, slot: Slot) -> Vector[ValidatorIndex, PTC_SIZE]:
// """
// Get the payload timeliness committee for the given ``slot``.
// """
// epoch = compute_epoch_at_slot(slot)
// seed = hash(get_seed(state, epoch, DOMAIN_PTC_ATTESTER) + uint_to_bytes(slot))
// indices: List[ValidatorIndex] = []
// # Concatenate all committees for this slot in order
// committees_per_slot = get_committee_count_per_slot(state, epoch)
// for i in range(committees_per_slot):
// committee = get_beacon_committee(state, slot, CommitteeIndex(i))
// indices.extend(committee)
// return compute_balance_weighted_selection(
// state, indices, seed, size=PTC_SIZE, shuffle_indices=False
// )
// </spec>
func PayloadCommittee(ctx context.Context, st state.ReadOnlyBeaconState, slot primitives.Slot) ([]primitives.ValidatorIndex, error) {
epoch := slots.ToEpoch(slot)
seed, err := ptcSeed(st, epoch, slot)
if err != nil {
@@ -152,17 +166,35 @@ func ptcSeed(st state.ReadOnlyBeaconState, epoch primitives.Epoch, slot primitiv
}
// selectByBalance selects a balance-weighted subset of input candidates.
// Spec v1.7.0-alpha.0 (pseudocode):
// compute_balance_weighted_selection(state, indices, seed, size, shuffle_indices):
// Note: shuffle_indices is false for PTC.
//
// total = len(indices); selected = []; i = 0
// while len(selected) < size:
// next = i % total
// if shuffle_indices: next = compute_shuffled_index(next, total, seed)
// if compute_balance_weighted_acceptance(state, indices[next], seed, i):
// selected.append(indices[next])
// i += 1
// <spec fn="compute_balance_weighted_selection" fork="gloas" hash="2c9f1c23">
// def compute_balance_weighted_selection(
// state: BeaconState,
// indices: Sequence[ValidatorIndex],
// seed: Bytes32,
// size: uint64,
// shuffle_indices: bool,
// ) -> Sequence[ValidatorIndex]:
// """
// Return ``size`` indices sampled by effective balance, using ``indices``
// as candidates. If ``shuffle_indices`` is ``True``, candidate indices
// are themselves sampled from ``indices`` by shuffling it, otherwise
// ``indices`` is traversed in order.
// """
// total = uint64(len(indices))
// assert total > 0
// selected: List[ValidatorIndex] = []
// i = uint64(0)
// while len(selected) < size:
// next_index = i % total
// if shuffle_indices:
// next_index = compute_shuffled_index(next_index, total, seed)
// candidate_index = indices[next_index]
// if compute_balance_weighted_acceptance(state, candidate_index, seed, i):
// selected.append(candidate_index)
// i += 1
// return selected
// </spec>
func selectByBalanceFill(
ctx context.Context,
st state.ReadOnlyBeaconState,
@@ -199,15 +231,22 @@ func selectByBalanceFill(
}
// acceptByBalance determines if a validator is accepted based on its effective balance.
// Spec v1.7.0-alpha.0 (pseudocode):
// compute_balance_weighted_acceptance(state, index, seed, i):
//
// MAX_RANDOM_VALUE = 2**16 - 1
// random_bytes = hash(seed + uint_to_bytes(i // 16))
// offset = i % 16 * 2
// random_value = bytes_to_uint64(random_bytes[offset:offset+2])
// effective_balance = state.validators[index].effective_balance
// return effective_balance * MAX_RANDOM_VALUE >= MAX_EFFECTIVE_BALANCE_ELECTRA * random_value
// <spec fn="compute_balance_weighted_acceptance" fork="gloas" hash="9954dcd0">
// def compute_balance_weighted_acceptance(
// state: BeaconState, index: ValidatorIndex, seed: Bytes32, i: uint64
// ) -> bool:
// """
// Return whether to accept the selection of the validator ``index``, with probability
// proportional to its ``effective_balance``, and randomness given by ``seed`` and ``i``.
// """
// MAX_RANDOM_VALUE = 2**16 - 1
// random_bytes = hash(seed + uint_to_bytes(i // 16))
// offset = i % 16 * 2
// random_value = bytes_to_uint64(random_bytes[offset : offset + 2])
// effective_balance = state.validators[index].effective_balance
// return effective_balance * MAX_RANDOM_VALUE >= MAX_EFFECTIVE_BALANCE_ELECTRA * random_value
// </spec>
func acceptByBalance(st state.ReadOnlyBeaconState, idx primitives.ValidatorIndex, seedBuf []byte, hashFunc func([]byte) [32]byte, maxBalance uint64, round uint64) (bool, error) {
// Reuse the seed buffer by overwriting the last 8 bytes with the round counter.
binary.LittleEndian.PutUint64(seedBuf[len(seedBuf)-8:], round/16)
@@ -224,16 +263,26 @@ func acceptByBalance(st state.ReadOnlyBeaconState, idx primitives.ValidatorIndex
}
// validIndexedPayloadAttestation verifies the signature of an indexed payload attestation.
// Spec v1.7.0-alpha.0 (pseudocode):
// is_valid_indexed_payload_attestation(state: BeaconState, indexed_payload_attestation: IndexedPayloadAttestation) -> bool:
//
// indices = indexed_payload_attestation.attesting_indices
// return len(indices) > 0 and indices == sorted(indices) and
// bls.FastAggregateVerify(
// [state.validators[i].pubkey for i in indices],
// compute_signing_root(indexed_payload_attestation.data, get_domain(state, DOMAIN_PTC_ATTESTER, compute_epoch_at_slot(attestation.data.slot)),
// indexed_payload_attestation.signature,
// )
// <spec fn="is_valid_indexed_payload_attestation" fork="gloas" hash="d76e0f89">
// def is_valid_indexed_payload_attestation(
// state: BeaconState, attestation: IndexedPayloadAttestation
// ) -> bool:
// """
// Check if ``attestation`` is non-empty, has sorted indices, and has
// a valid aggregate signature.
// """
// # Verify indices are non-empty and sorted
// indices = attestation.attesting_indices
// if len(indices) == 0 or not indices == sorted(indices):
// return False
//
// # Verify aggregate signature
// pubkeys = [state.validators[i].pubkey for i in indices]
// domain = get_domain(state, DOMAIN_PTC_ATTESTER, compute_epoch_at_slot(attestation.data.slot))
// signing_root = compute_signing_root(attestation.data, domain)
// return bls.FastAggregateVerify(pubkeys, signing_root, attestation.signature)
// </spec>
func validIndexedPayloadAttestation(st state.ReadOnlyBeaconState, att *consensus_types.IndexedPayloadAttestation) error {
indices := att.AttestingIndices
if len(indices) == 0 || !slices.IsSorted(indices) {

View File

@@ -10,17 +10,21 @@ import (
)
// ProcessBuilderPendingPayments processes the builder pending payments from the previous epoch.
// Spec v1.7.0-alpha.0 (pseudocode):
// def process_builder_pending_payments(state: BeaconState) -> None:
//
// quorum = get_builder_payment_quorum_threshold(state)
// for payment in state.builder_pending_payments[:SLOTS_PER_EPOCH]:
// if payment.weight >= quorum:
// state.builder_pending_withdrawals.append(payment.withdrawal)
// <spec fn="process_builder_pending_payments" fork="gloas" hash="10da48dd">
// def process_builder_pending_payments(state: BeaconState) -> None:
// """
// Processes the builder pending payments from the previous epoch.
// """
// quorum = get_builder_payment_quorum_threshold(state)
// for payment in state.builder_pending_payments[:SLOTS_PER_EPOCH]:
// if payment.weight >= quorum:
// state.builder_pending_withdrawals.append(payment.withdrawal)
//
// old_payments = state.builder_pending_payments[SLOTS_PER_EPOCH:]
// new_payments = [BuilderPendingPayment() for _ in range(SLOTS_PER_EPOCH)]
// state.builder_pending_payments = old_payments + new_payments
// old_payments = state.builder_pending_payments[SLOTS_PER_EPOCH:]
// new_payments = [BuilderPendingPayment() for _ in range(SLOTS_PER_EPOCH)]
// state.builder_pending_payments = old_payments + new_payments
// </spec>
func ProcessBuilderPendingPayments(state state.BeaconState) error {
quorum, err := builderQuorumThreshold(state)
if err != nil {
@@ -53,12 +57,16 @@ func ProcessBuilderPendingPayments(state state.BeaconState) error {
}
// builderQuorumThreshold calculates the quorum threshold for builder payments.
// Spec v1.7.0-alpha.0 (pseudocode):
// def get_builder_payment_quorum_threshold(state: BeaconState) -> uint64:
//
// per_slot_balance = get_total_active_balance(state) // SLOTS_PER_EPOCH
// quorum = per_slot_balance * BUILDER_PAYMENT_THRESHOLD_NUMERATOR
// return uint64(quorum // BUILDER_PAYMENT_THRESHOLD_DENOMINATOR)
// <spec fn="get_builder_payment_quorum_threshold" fork="gloas" hash="a64b7ffb">
// def get_builder_payment_quorum_threshold(state: BeaconState) -> uint64:
// """
// Calculate the quorum threshold for builder payments.
// """
// per_slot_balance = get_total_active_balance(state) // SLOTS_PER_EPOCH
// quorum = per_slot_balance * BUILDER_PAYMENT_THRESHOLD_NUMERATOR
// return uint64(quorum // BUILDER_PAYMENT_THRESHOLD_DENOMINATOR)
// </spec>
func builderQuorumThreshold(state state.ReadOnlyBeaconState) (primitives.Gwei, error) {
activeBalance, err := helpers.TotalActiveBalance(state)
if err != nil {

View File

@@ -11,16 +11,20 @@ import (
)
// RemoveBuilderPendingPayment removes the pending builder payment for the proposal slot.
// Spec v1.7.0 (pseudocode):
//
// <spec fn="process_proposer_slashing" fork="gloas" lines="22-32" hash="4da721ef">
// # [New in Gloas:EIP7732]
// # Remove the BuilderPendingPayment corresponding to
// # this proposal if it is still in the 2-epoch window.
// slot = header_1.slot
// proposal_epoch = compute_epoch_at_slot(slot)
// if proposal_epoch == get_current_epoch(state):
// payment_index = SLOTS_PER_EPOCH + slot % SLOTS_PER_EPOCH
// state.builder_pending_payments[payment_index] = BuilderPendingPayment()
// payment_index = SLOTS_PER_EPOCH + slot % SLOTS_PER_EPOCH
// state.builder_pending_payments[payment_index] = BuilderPendingPayment()
// elif proposal_epoch == get_previous_epoch(state):
// payment_index = slot % SLOTS_PER_EPOCH
// state.builder_pending_payments[payment_index] = BuilderPendingPayment()
// payment_index = slot % SLOTS_PER_EPOCH
// state.builder_pending_payments[payment_index] = BuilderPendingPayment()
// </spec>
func RemoveBuilderPendingPayment(st state.BeaconState, header *eth.BeaconBlockHeader) error {
proposalEpoch := slots.ToEpoch(header.Slot)
currentEpoch := time.CurrentEpoch(st)

View File

@@ -33,7 +33,6 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

View File

@@ -1,15 +1,11 @@
package peerdas
import (
stderrors "errors"
"iter"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/container/trie"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/pkg/errors"
)
@@ -20,7 +16,6 @@ var (
ErrIndexTooLarge = errors.New("column index is larger than the specified columns count")
ErrNoKzgCommitments = errors.New("no KZG commitments found")
ErrMismatchLength = errors.New("mismatch in the length of the column, commitments or proofs")
ErrEmptySegment = errors.New("empty segment in batch")
ErrInvalidKZGProof = errors.New("invalid KZG proof")
ErrBadRootLength = errors.New("bad root length")
ErrInvalidInclusionProof = errors.New("invalid inclusion proof")
@@ -62,127 +57,62 @@ func VerifyDataColumnSidecar(sidecar blocks.RODataColumn) error {
return nil
}
// CellProofBundleSegment is returned when a batch fails. The caller can call
// the `.Verify` method to verify just this segment.
type CellProofBundleSegment struct {
indices []uint64
commitments []kzg.Bytes48
cells []kzg.Cell
proofs []kzg.Bytes48
}
// Verify verifies this segment without batching.
func (s CellProofBundleSegment) Verify() error {
if len(s.cells) == 0 {
return ErrEmptySegment
}
verified, err := kzg.VerifyCellKZGProofBatch(s.commitments, s.indices, s.cells, s.proofs)
if err != nil {
return stderrors.Join(err, ErrInvalidKZGProof)
}
if !verified {
return ErrInvalidKZGProof
}
return nil
}
func VerifyDataColumnsCellsKZGProofs(sizeHint int, cellProofsIter iter.Seq[blocks.CellProofBundle]) error {
// ignore the failed segment list since we are just passing in one segment.
_, err := BatchVerifyDataColumnsCellsKZGProofs(sizeHint, []iter.Seq[blocks.CellProofBundle]{cellProofsIter})
return err
}
// BatchVerifyDataColumnsCellsKZGProofs verifies if the KZG proofs are correct.
// VerifyDataColumnsSidecarKZGProofs verifies if the KZG proofs are correct.
// Note: We are slightly deviating from the specification here:
// The specification verifies the KZG proofs for each sidecar separately,
// while we are verifying all the KZG proofs from multiple sidecars in a batch.
// This is done to improve performance since the internal KZG library is way more
// efficient when verifying in batch. If the batch fails, the failed segments
// are returned to the caller so that they may try segment by segment without
// batching. On success the failed segment list is empty.
//
// efficient when verifying in batch.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#verify_data_column_sidecar_kzg_proofs
func BatchVerifyDataColumnsCellsKZGProofs(sizeHint int, cellProofsIters []iter.Seq[blocks.CellProofBundle]) ( /* failed segment list */ []CellProofBundleSegment, error) {
commitments := make([]kzg.Bytes48, 0, sizeHint)
indices := make([]uint64, 0, sizeHint)
cells := make([]kzg.Cell, 0, sizeHint)
proofs := make([]kzg.Bytes48, 0, sizeHint)
func VerifyDataColumnsSidecarKZGProofs(sidecars []blocks.RODataColumn) error {
// Compute the total count.
count := 0
for _, sidecar := range sidecars {
count += len(sidecar.Column)
}
var anySegmentEmpty bool
var segments []CellProofBundleSegment
for _, cellProofsIter := range cellProofsIters {
startIdx := len(cells)
for bundle := range cellProofsIter {
commitments := make([]kzg.Bytes48, 0, count)
indices := make([]uint64, 0, count)
cells := make([]kzg.Cell, 0, count)
proofs := make([]kzg.Bytes48, 0, count)
for _, sidecar := range sidecars {
for i := range sidecar.Column {
var (
commitment kzg.Bytes48
cell kzg.Cell
proof kzg.Bytes48
)
if len(bundle.Commitment) != len(commitment) ||
len(bundle.Cell) != len(cell) ||
len(bundle.Proof) != len(proof) {
return nil, ErrMismatchLength
commitmentBytes := sidecar.KzgCommitments[i]
cellBytes := sidecar.Column[i]
proofBytes := sidecar.KzgProofs[i]
if len(commitmentBytes) != len(commitment) ||
len(cellBytes) != len(cell) ||
len(proofBytes) != len(proof) {
return ErrMismatchLength
}
copy(commitment[:], bundle.Commitment)
copy(cell[:], bundle.Cell)
copy(proof[:], bundle.Proof)
copy(commitment[:], commitmentBytes)
copy(cell[:], cellBytes)
copy(proof[:], proofBytes)
commitments = append(commitments, commitment)
indices = append(indices, bundle.ColumnIndex)
indices = append(indices, sidecar.Index)
cells = append(cells, cell)
proofs = append(proofs, proof)
}
if len(cells[startIdx:]) == 0 {
anySegmentEmpty = true
}
segments = append(segments, CellProofBundleSegment{
indices: indices[startIdx:],
commitments: commitments[startIdx:],
cells: cells[startIdx:],
proofs: proofs[startIdx:],
})
}
if anySegmentEmpty {
return segments, ErrEmptySegment
}
// Batch verify that the cells match the corresponding commitments and proofs.
verified, err := kzg.VerifyCellKZGProofBatch(commitments, indices, cells, proofs)
if err != nil {
return segments, stderrors.Join(err, ErrInvalidKZGProof)
return errors.Wrap(err, "verify cell KZG proof batch")
}
if !verified {
return segments, ErrInvalidKZGProof
}
return nil, nil
}
// verifyKzgCommitmentsInclusionProof is the shared implementation for inclusion proof verification.
func verifyKzgCommitmentsInclusionProof(bodyRoot []byte, kzgCommitments [][]byte, inclusionProof [][]byte) error {
if len(bodyRoot) != fieldparams.RootLength {
return ErrBadRootLength
}
leaves := blocks.LeavesFromCommitments(kzgCommitments)
sparse, err := trie.GenerateTrieFromItems(leaves, fieldparams.LogMaxBlobCommitments)
if err != nil {
return errors.Wrap(err, "generate trie from items")
}
hashTreeRoot, err := sparse.HashTreeRoot()
if err != nil {
return errors.Wrap(err, "hash tree root")
}
verified := trie.VerifyMerkleProof(bodyRoot, hashTreeRoot[:], kzgPosition, inclusionProof)
if !verified {
return ErrInvalidInclusionProof
return ErrInvalidKZGProof
}
return nil
@@ -194,23 +124,30 @@ func VerifyDataColumnSidecarInclusionProof(sidecar blocks.RODataColumn) error {
if sidecar.SignedBlockHeader == nil || sidecar.SignedBlockHeader.Header == nil {
return ErrNilBlockHeader
}
return verifyKzgCommitmentsInclusionProof(
sidecar.SignedBlockHeader.Header.BodyRoot,
sidecar.KzgCommitments,
sidecar.KzgCommitmentsInclusionProof,
)
}
// VerifyPartialDataColumnHeaderInclusionProof verifies if the KZG commitments are included in the beacon block.
func VerifyPartialDataColumnHeaderInclusionProof(header *ethpb.PartialDataColumnHeader) error {
if header.SignedBlockHeader == nil || header.SignedBlockHeader.Header == nil {
return ErrNilBlockHeader
root := sidecar.SignedBlockHeader.Header.BodyRoot
if len(root) != fieldparams.RootLength {
return ErrBadRootLength
}
return verifyKzgCommitmentsInclusionProof(
header.SignedBlockHeader.Header.BodyRoot,
header.KzgCommitments,
header.KzgCommitmentsInclusionProof,
)
leaves := blocks.LeavesFromCommitments(sidecar.KzgCommitments)
sparse, err := trie.GenerateTrieFromItems(leaves, fieldparams.LogMaxBlobCommitments)
if err != nil {
return errors.Wrap(err, "generate trie from items")
}
hashTreeRoot, err := sparse.HashTreeRoot()
if err != nil {
return errors.Wrap(err, "hash tree root")
}
verified := trie.VerifyMerkleProof(root, hashTreeRoot[:], kzgPosition, sidecar.KzgCommitmentsInclusionProof)
if !verified {
return ErrInvalidInclusionProof
}
return nil
}
// ComputeSubnetForDataColumnSidecar computes the subnet for a data column sidecar.

View File

@@ -3,7 +3,6 @@ package peerdas_test
import (
"crypto/rand"
"fmt"
"iter"
"testing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
@@ -73,7 +72,7 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
sidecars := generateRandomSidecars(t, seed, blobCount)
sidecars[0].Column[0] = sidecars[0].Column[0][:len(sidecars[0].Column[0])-1] // Remove one byte to create size mismatch
err := peerdas.VerifyDataColumnsCellsKZGProofs(0, blocks.RODataColumnsToCellProofBundles(sidecars))
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
require.ErrorIs(t, err, peerdas.ErrMismatchLength)
})
@@ -81,15 +80,14 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
sidecars := generateRandomSidecars(t, seed, blobCount)
sidecars[0].Column[0][0]++ // It is OK to overflow
err := peerdas.VerifyDataColumnsCellsKZGProofs(0, blocks.RODataColumnsToCellProofBundles(sidecars))
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
require.ErrorIs(t, err, peerdas.ErrInvalidKZGProof)
})
t.Run("nominal", func(t *testing.T) {
sidecars := generateRandomSidecars(t, seed, blobCount)
failedSegments, err := peerdas.BatchVerifyDataColumnsCellsKZGProofs(blobCount, []iter.Seq[blocks.CellProofBundle]{blocks.RODataColumnsToCellProofBundles(sidecars)})
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
require.NoError(t, err)
require.Equal(t, 0, len(failedSegments))
})
}
@@ -275,7 +273,7 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_SameCommitments_NoBatch(b *testin
for _, sidecar := range sidecars {
sidecars := []blocks.RODataColumn{sidecar}
b.StartTimer()
err := peerdas.VerifyDataColumnsCellsKZGProofs(0, blocks.RODataColumnsToCellProofBundles(sidecars))
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
b.StopTimer()
require.NoError(b, err)
}
@@ -310,7 +308,7 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_DiffCommitments_Batch(b *testing.
}
b.StartTimer()
err := peerdas.VerifyDataColumnsCellsKZGProofs(0, blocks.RODataColumnsToCellProofBundles(allSidecars))
err := peerdas.VerifyDataColumnsSidecarKZGProofs(allSidecars)
b.StopTimer()
require.NoError(b, err)
}
@@ -343,7 +341,7 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_DiffCommitments_Batch4(b *testing
for _, sidecars := range allSidecars {
b.StartTimer()
err := peerdas.VerifyDataColumnsCellsKZGProofs(len(allSidecars), blocks.RODataColumnsToCellProofBundles(sidecars))
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
b.StopTimer()
require.NoError(b, err)
}

View File

@@ -5,7 +5,6 @@ import (
"sync"
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
@@ -340,8 +339,7 @@ func ComputeCellsAndProofsFromFlat(blobs [][]byte, cellProofs [][]byte) ([][]kzg
}
// ComputeCellsAndProofsFromStructured computes the cells and proofs from blobs and cell proofs.
// commitmentCount is required to return the correct sized bitlist even if we see a nil slice of blobsAndProofs.
func ComputeCellsAndProofsFromStructured(commitmentCount uint64, blobsAndProofs []*pb.BlobAndProofV2) (bitfield.Bitlist /* parts included */, [][]kzg.Cell, [][]kzg.Proof, error) {
func ComputeCellsAndProofsFromStructured(blobsAndProofs []*pb.BlobAndProofV2) ([][]kzg.Cell, [][]kzg.Proof, error) {
start := time.Now()
defer func() {
cellsAndProofsFromStructuredComputationTime.Observe(float64(time.Since(start).Milliseconds()))
@@ -349,24 +347,14 @@ func ComputeCellsAndProofsFromStructured(commitmentCount uint64, blobsAndProofs
var wg errgroup.Group
var blobsPresent int
for _, blobAndProof := range blobsAndProofs {
if blobAndProof != nil {
blobsPresent++
}
}
cellsPerBlob := make([][]kzg.Cell, blobsPresent)
proofsPerBlob := make([][]kzg.Proof, blobsPresent)
included := bitfield.NewBitlist(commitmentCount)
cellsPerBlob := make([][]kzg.Cell, len(blobsAndProofs))
proofsPerBlob := make([][]kzg.Proof, len(blobsAndProofs))
var j int
for i, blobAndProof := range blobsAndProofs {
if blobAndProof == nil {
continue
return nil, nil, ErrNilBlobAndProof
}
included.SetBitAt(uint64(i), true)
compactIndex := j
wg.Go(func() error {
var kzgBlob kzg.Blob
if copy(kzgBlob[:], blobAndProof.Blob) != len(kzgBlob) {
@@ -393,18 +381,17 @@ func ComputeCellsAndProofsFromStructured(commitmentCount uint64, blobsAndProofs
kzgProofs = append(kzgProofs, kzgProof)
}
cellsPerBlob[compactIndex] = cells
proofsPerBlob[compactIndex] = kzgProofs
cellsPerBlob[i] = cells
proofsPerBlob[i] = kzgProofs
return nil
})
j++
}
if err := wg.Wait(); err != nil {
return nil, nil, nil, err
return nil, nil, err
}
return included, cellsPerBlob, proofsPerBlob, nil
return cellsPerBlob, proofsPerBlob, nil
}
// ReconstructBlobs reconstructs blobs from data column sidecars without computing KZG proofs or creating sidecars.

View File

@@ -479,9 +479,8 @@ func TestComputeCellsAndProofsFromFlat(t *testing.T) {
func TestComputeCellsAndProofsFromStructured(t *testing.T) {
t.Run("nil blob and proof", func(t *testing.T) {
included, _, _, err := peerdas.ComputeCellsAndProofsFromStructured(0, []*pb.BlobAndProofV2{nil})
require.NoError(t, err)
require.Equal(t, uint64(0), included.Count())
_, _, err := peerdas.ComputeCellsAndProofsFromStructured([]*pb.BlobAndProofV2{nil})
require.ErrorIs(t, err, peerdas.ErrNilBlobAndProof)
})
t.Run("nominal", func(t *testing.T) {
@@ -534,8 +533,7 @@ func TestComputeCellsAndProofsFromStructured(t *testing.T) {
require.NoError(t, err)
// Test ComputeCellsAndProofs
included, actualCellsPerBlob, actualProofsPerBlob, err := peerdas.ComputeCellsAndProofsFromStructured(uint64(len(blobsAndProofs)), blobsAndProofs)
require.Equal(t, included.Count(), uint64(len(actualCellsPerBlob)))
actualCellsPerBlob, actualProofsPerBlob, err := peerdas.ComputeCellsAndProofsFromStructured(blobsAndProofs)
require.NoError(t, err)
require.Equal(t, blobCount, len(actualCellsPerBlob))

View File

@@ -3,7 +3,6 @@ package peerdas
import (
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
beaconState "github.com/OffchainLabs/prysm/v7/beacon-chain/state"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
@@ -24,13 +23,11 @@ var (
var (
_ ConstructionPopulator = (*BlockReconstructionSource)(nil)
_ ConstructionPopulator = (*SidecarReconstructionSource)(nil)
_ ConstructionPopulator = (*PartialDataColumnHeaderReconstructionSource)(nil)
)
const (
BlockType = "BeaconBlock"
SidecarType = "DataColumnSidecar"
PartialDataColumnHeaderType = "PartialDataColumnHeader"
BlockType = "BeaconBlock"
SidecarType = "DataColumnSidecar"
)
type (
@@ -57,10 +54,6 @@ type (
blocks.VerifiedRODataColumn
}
PartialDataColumnHeaderReconstructionSource struct {
*ethpb.PartialDataColumnHeader
}
blockInfo struct {
signedBlockHeader *ethpb.SignedBeaconBlockHeader
kzgCommitments [][]byte
@@ -78,11 +71,6 @@ func PopulateFromSidecar(sidecar blocks.VerifiedRODataColumn) *SidecarReconstruc
return &SidecarReconstructionSource{VerifiedRODataColumn: sidecar}
}
// PopulateFromPartialHeader creates a PartialDataColumnHeaderReconstructionSource from a partial header
func PopulateFromPartialHeader(header *ethpb.PartialDataColumnHeader) *PartialDataColumnHeaderReconstructionSource {
return &PartialDataColumnHeaderReconstructionSource{PartialDataColumnHeader: header}
}
// ValidatorsCustodyRequirement returns the number of custody groups regarding the validator indices attached to the beacon node.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#validator-custody
func ValidatorsCustodyRequirement(state beaconState.ReadOnlyBeaconState, validatorsIndex map[primitives.ValidatorIndex]bool) (uint64, error) {
@@ -155,40 +143,6 @@ func DataColumnSidecars(cellsPerBlob [][]kzg.Cell, proofsPerBlob [][]kzg.Proof,
return roSidecars, nil
}
func PartialColumns(included bitfield.Bitlist, cellsPerBlob [][]kzg.Cell, proofsPerBlob [][]kzg.Proof, src ConstructionPopulator) ([]blocks.PartialDataColumn, error) {
start := time.Now()
const numberOfColumns = uint64(fieldparams.NumberOfColumns)
cells, proofs, err := rotateRowsToCols(cellsPerBlob, proofsPerBlob, numberOfColumns)
if err != nil {
return nil, errors.Wrap(err, "rotate cells and proofs")
}
info, err := src.extract()
if err != nil {
return nil, errors.Wrap(err, "extract block info")
}
dataColumns := make([]blocks.PartialDataColumn, 0, numberOfColumns)
for idx := range numberOfColumns {
dc, err := blocks.NewPartialDataColumn(info.signedBlockHeader, idx, info.kzgCommitments, info.kzgInclusionProof)
if err != nil {
return nil, errors.Wrap(err, "new ro data column")
}
for i := range len(info.kzgCommitments) {
if !included.BitAt(uint64(i)) {
continue
}
dc.ExtendFromVerfifiedCell(uint64(i), cells[idx][0], proofs[idx][0])
cells[idx] = cells[idx][1:]
proofs[idx] = proofs[idx][1:]
}
dataColumns = append(dataColumns, dc)
}
dataColumnComputationTime.Observe(float64(time.Since(start).Milliseconds()))
return dataColumns, nil
}
// Slot returns the slot of the source
func (s *BlockReconstructionSource) Slot() primitives.Slot {
return s.Block().Slot()
@@ -300,43 +254,3 @@ func (s *SidecarReconstructionSource) extract() (*blockInfo, error) {
return info, nil
}
// Slot returns the slot from the partial data column header
func (p *PartialDataColumnHeaderReconstructionSource) Slot() primitives.Slot {
return p.SignedBlockHeader.Header.Slot
}
// Root returns the block root computed from the header
func (p *PartialDataColumnHeaderReconstructionSource) Root() [fieldparams.RootLength]byte {
root, err := p.SignedBlockHeader.Header.HashTreeRoot()
if err != nil {
return [fieldparams.RootLength]byte{}
}
return root
}
// ProposerIndex returns the proposer index from the header
func (p *PartialDataColumnHeaderReconstructionSource) ProposerIndex() primitives.ValidatorIndex {
return p.SignedBlockHeader.Header.ProposerIndex
}
// Commitments returns the KZG commitments from the header
func (p *PartialDataColumnHeaderReconstructionSource) Commitments() ([][]byte, error) {
return p.KzgCommitments, nil
}
// Type returns the type of the source
func (p *PartialDataColumnHeaderReconstructionSource) Type() string {
return PartialDataColumnHeaderType
}
// extract extracts the block information from the partial header
func (p *PartialDataColumnHeaderReconstructionSource) extract() (*blockInfo, error) {
info := &blockInfo{
signedBlockHeader: p.SignedBlockHeader,
kzgCommitments: p.KzgCommitments,
kzgInclusionProof: p.KzgCommitmentsInclusionProof,
}
return info, nil
}

View File

@@ -267,31 +267,4 @@ func TestReconstructionSource(t *testing.T) {
require.Equal(t, peerdas.SidecarType, src.Type())
})
t.Run("from partial header", func(t *testing.T) {
referenceSidecar := sidecars[0]
partialHeader := &ethpb.PartialDataColumnHeader{
SignedBlockHeader: referenceSidecar.SignedBlockHeader,
KzgCommitments: referenceSidecar.KzgCommitments,
KzgCommitmentsInclusionProof: referenceSidecar.KzgCommitmentsInclusionProof,
}
src := peerdas.PopulateFromPartialHeader(partialHeader)
require.Equal(t, referenceSidecar.SignedBlockHeader.Header.Slot, src.Slot())
// Compute expected root
expectedRoot, err := referenceSidecar.SignedBlockHeader.Header.HashTreeRoot()
require.NoError(t, err)
require.Equal(t, expectedRoot, src.Root())
require.Equal(t, referenceSidecar.SignedBlockHeader.Header.ProposerIndex, src.ProposerIndex())
commitments, err := src.Commitments()
require.NoError(t, err)
require.Equal(t, 2, len(commitments))
require.DeepEqual(t, commitment1, commitments[0])
require.DeepEqual(t, commitment2, commitments[1])
require.Equal(t, peerdas.PartialDataColumnHeaderType, src.Type())
})
}

View File

@@ -143,10 +143,11 @@ func ProcessSlot(ctx context.Context, state state.BeaconState) (state.BeaconStat
return nil, err
}
// Spec v1.6.1 (pseudocode):
// <spec fn="process_slot" fork="gloas" lines="11-13" hash="62b28839">
// # [New in Gloas:EIP7732]
// # Unset the next payload availability
// state.execution_payload_availability[(state.slot + 1) % SLOTS_PER_HISTORICAL_ROOT] = 0b0
// </spec>
if state.Version() >= version.Gloas {
index := uint64((state.Slot() + 1) % params.BeaconConfig().SlotsPerHistoricalRoot)
if err := state.UpdateExecutionPayloadAvailabilityAtIndex(index, 0x0); err != nil {

View File

@@ -78,7 +78,7 @@ func newGloasState(t *testing.T, slot primitives.Slot, availability []byte) stat
BlockHash: make([]byte, 32),
PrevRandao: make([]byte, 32),
FeeRecipient: make([]byte, 20),
BlobKzgCommitmentsRoot: make([]byte, 32),
BlobKzgCommitments: [][]byte{make([]byte, 48)},
},
Eth1Data: &ethpb.Eth1Data{
DepositRoot: make([]byte, 32),

View File

@@ -66,6 +66,10 @@ type ReadOnlyDatabase interface {
OriginCheckpointBlockRoot(ctx context.Context) ([32]byte, error)
BackfillStatus(context.Context) (*dbval.BackfillStatus, error)
// Execution payload envelope operations (Gloas+).
ExecutionPayloadEnvelope(ctx context.Context, blockRoot [32]byte) (*ethpb.SignedBlindedExecutionPayloadEnvelope, error)
HasExecutionPayloadEnvelope(ctx context.Context, blockRoot [32]byte) bool
// P2P Metadata operations.
MetadataSeqNum(ctx context.Context) (uint64, error)
}
@@ -115,6 +119,10 @@ type NoHeadAccessDatabase interface {
SaveLightClientUpdate(ctx context.Context, period uint64, update interfaces.LightClientUpdate) error
SaveLightClientBootstrap(ctx context.Context, blockRoot []byte, bootstrap interfaces.LightClientBootstrap) error
// Execution payload envelope operations (Gloas+).
SaveExecutionPayloadEnvelope(ctx context.Context, envelope *ethpb.SignedExecutionPayloadEnvelope) error
DeleteExecutionPayloadEnvelope(ctx context.Context, blockRoot [32]byte) error
CleanUpDirtyStates(ctx context.Context, slotsPerArchivedPoint primitives.Slot) error
DeleteHistoricalDataBeforeSlot(ctx context.Context, slot primitives.Slot, batchSize int) (int, error)

View File

@@ -13,6 +13,7 @@ go_library(
"encoding.go",
"error.go",
"execution_chain.go",
"execution_payload_envelope.go",
"finalized_block_roots.go",
"genesis.go",
"key.go",
@@ -96,6 +97,7 @@ go_test(
"deposit_contract_test.go",
"encoding_test.go",
"execution_chain_test.go",
"execution_payload_envelope_test.go",
"finalized_block_roots_test.go",
"genesis_test.go",
"init_test.go",

View File

@@ -2,6 +2,7 @@ package kv
import (
"context"
"slices"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
@@ -33,6 +34,9 @@ func (s *Store) LastArchivedRoot(ctx context.Context) [32]byte {
if err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(stateSlotIndicesBucket)
_, blockRoot = bkt.Cursor().Last()
if len(blockRoot) > 0 {
blockRoot = slices.Clone(blockRoot)
}
return nil
}); err != nil { // This view never returns an error, but we'll handle anyway for sanity.
panic(err) // lint:nopanic -- View never returns an error.
@@ -51,6 +55,9 @@ func (s *Store) ArchivedPointRoot(ctx context.Context, slot primitives.Slot) [32
if err := s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(stateSlotIndicesBucket)
blockRoot = bucket.Get(bytesutil.SlotToBytesBigEndian(slot))
if len(blockRoot) > 0 {
blockRoot = slices.Clone(blockRoot)
}
return nil
}); err != nil { // This view never returns an error, but we'll handle anyway for sanity.
panic(err) // lint:nopanic -- View never returns an error.

View File

@@ -517,6 +517,10 @@ func (s *Store) DeleteHistoricalDataBeforeSlot(ctx context.Context, cutoffSlot p
return errors.Wrap(err, "could not delete validators")
}
// TODO: execution payload envelopes (Gloas+) are keyed by execution payload
// block hash, not beacon block root, so they cannot be pruned in this loop.
// A separate pruning mechanism is needed (e.g. secondary index or cursor scan).
numSlotsDeleted++
}
@@ -812,7 +816,10 @@ func (s *Store) FeeRecipientByValidatorID(ctx context.Context, id primitives.Val
var addr []byte
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(feeRecipientBucket)
addr = bkt.Get(bytesutil.Uint64ToBytesBigEndian(uint64(id)))
stored := bkt.Get(bytesutil.Uint64ToBytesBigEndian(uint64(id)))
if len(stored) > 0 {
addr = slices.Clone(stored)
}
// IF the fee recipient is not found in the standard fee recipient bucket, then
// check the registration bucket. The fee recipient may be there.
// This is to resolve imcompatility until we fully migrate to the registration bucket.
@@ -826,7 +833,7 @@ func (s *Store) FeeRecipientByValidatorID(ctx context.Context, id primitives.Val
if err := decode(ctx, enc, reg); err != nil {
return err
}
addr = reg.FeeRecipient
addr = slices.Clone(reg.FeeRecipient)
}
return nil
})
@@ -1247,6 +1254,12 @@ func unmarshalBlock(_ context.Context, enc []byte) (interfaces.ReadOnlySignedBea
if err := rawBlock.UnmarshalSSZ(enc[len(fuluBlindKey):]); err != nil {
return nil, errors.Wrap(err, "could not unmarshal blinded Fulu block")
}
case hasGloasKey(enc):
// post Gloas we save the full beacon block as EIP-7732 separates beacon block and payload
rawBlock = &ethpb.SignedBeaconBlockGloas{}
if err := rawBlock.UnmarshalSSZ(enc[len(gloasKey):]); err != nil {
return nil, errors.Wrap(err, "could not unmarshal Gloas block")
}
default:
// Marshal block bytes to phase 0 beacon block.
rawBlock = &ethpb.SignedBeaconBlock{}
@@ -1277,6 +1290,11 @@ func encodeBlock(blk interfaces.ReadOnlySignedBeaconBlock) ([]byte, error) {
func keyForBlock(blk interfaces.ReadOnlySignedBeaconBlock) ([]byte, error) {
v := blk.Version()
if v >= version.Gloas {
// Gloas blocks are never blinded (no execution payload in block body).
return gloasKey, nil
}
if v >= version.Fulu {
if blk.IsBlinded() {
return fuluBlindKey, nil

View File

@@ -151,6 +151,17 @@ var blockTests = []struct {
}
return blocks.NewSignedBeaconBlock(b)
}},
{
name: "gloas",
newBlock: func(slot primitives.Slot, root []byte) (interfaces.ReadOnlySignedBeaconBlock, error) {
b := util.NewBeaconBlockGloas()
b.Block.Slot = slot
if root != nil {
b.Block.ParentRoot = root
}
return blocks.NewSignedBeaconBlock(b)
},
},
}
func TestStore_SaveBlock_NoDuplicates(t *testing.T) {
@@ -211,7 +222,7 @@ func TestStore_BlocksCRUD(t *testing.T) {
retrievedBlock, err = db.Block(ctx, blockRoot)
require.NoError(t, err)
wanted := retrievedBlock
if retrievedBlock.Version() >= version.Bellatrix {
if retrievedBlock.Version() >= version.Bellatrix && retrievedBlock.Version() < version.Gloas {
wanted, err = retrievedBlock.ToBlinded()
require.NoError(t, err)
}
@@ -643,7 +654,7 @@ func TestStore_BlocksCRUD_NoCache(t *testing.T) {
require.NoError(t, err)
wanted := blk
if blk.Version() >= version.Bellatrix {
if blk.Version() >= version.Bellatrix && blk.Version() < version.Gloas {
wanted, err = blk.ToBlinded()
require.NoError(t, err)
}
@@ -1014,7 +1025,7 @@ func TestStore_SaveBlock_CanGetHighestAt(t *testing.T) {
b, err := db.Block(ctx, root)
require.NoError(t, err)
wanted := block1
if block1.Version() >= version.Bellatrix {
if block1.Version() >= version.Bellatrix && block1.Version() < version.Gloas {
wanted, err = wanted.ToBlinded()
require.NoError(t, err)
}
@@ -1032,7 +1043,7 @@ func TestStore_SaveBlock_CanGetHighestAt(t *testing.T) {
b, err = db.Block(ctx, root)
require.NoError(t, err)
wanted2 := block2
if block2.Version() >= version.Bellatrix {
if block2.Version() >= version.Bellatrix && block2.Version() < version.Gloas {
wanted2, err = block2.ToBlinded()
require.NoError(t, err)
}
@@ -1050,7 +1061,7 @@ func TestStore_SaveBlock_CanGetHighestAt(t *testing.T) {
b, err = db.Block(ctx, root)
require.NoError(t, err)
wanted = block3
if block3.Version() >= version.Bellatrix {
if block3.Version() >= version.Bellatrix && block3.Version() < version.Gloas {
wanted, err = wanted.ToBlinded()
require.NoError(t, err)
}
@@ -1086,7 +1097,7 @@ func TestStore_GenesisBlock_CanGetHighestAt(t *testing.T) {
b, err := db.Block(ctx, root)
require.NoError(t, err)
wanted := block1
if block1.Version() >= version.Bellatrix {
if block1.Version() >= version.Bellatrix && block1.Version() < version.Gloas {
wanted, err = block1.ToBlinded()
require.NoError(t, err)
}
@@ -1103,7 +1114,7 @@ func TestStore_GenesisBlock_CanGetHighestAt(t *testing.T) {
b, err = db.Block(ctx, root)
require.NoError(t, err)
wanted = genesisBlock
if genesisBlock.Version() >= version.Bellatrix {
if genesisBlock.Version() >= version.Bellatrix && genesisBlock.Version() < version.Gloas {
wanted, err = genesisBlock.ToBlinded()
require.NoError(t, err)
}
@@ -1120,7 +1131,7 @@ func TestStore_GenesisBlock_CanGetHighestAt(t *testing.T) {
b, err = db.Block(ctx, root)
require.NoError(t, err)
wanted = genesisBlock
if genesisBlock.Version() >= version.Bellatrix {
if genesisBlock.Version() >= version.Bellatrix && genesisBlock.Version() < version.Gloas {
wanted, err = genesisBlock.ToBlinded()
require.NoError(t, err)
}
@@ -1216,7 +1227,7 @@ func TestStore_BlocksBySlot_BlockRootsBySlot(t *testing.T) {
require.NoError(t, err)
wanted := b1
if b1.Version() >= version.Bellatrix {
if b1.Version() >= version.Bellatrix && b1.Version() < version.Gloas {
wanted, err = b1.ToBlinded()
require.NoError(t, err)
}
@@ -1232,7 +1243,7 @@ func TestStore_BlocksBySlot_BlockRootsBySlot(t *testing.T) {
t.Fatalf("Expected 2 blocks, received %d blocks", len(retrievedBlocks))
}
wanted = b2
if b2.Version() >= version.Bellatrix {
if b2.Version() >= version.Bellatrix && b2.Version() < version.Gloas {
wanted, err = b2.ToBlinded()
require.NoError(t, err)
}
@@ -1242,7 +1253,7 @@ func TestStore_BlocksBySlot_BlockRootsBySlot(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, true, proto.Equal(wantedPb, retrieved0Pb), "Wanted: %v, received: %v", retrievedBlocks[0], wanted)
wanted = b3
if b3.Version() >= version.Bellatrix {
if b3.Version() >= version.Bellatrix && b3.Version() < version.Gloas {
wanted, err = b3.ToBlinded()
require.NoError(t, err)
}

View File

@@ -3,6 +3,7 @@ package kv
import (
"context"
"fmt"
"slices"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
"github.com/ethereum/go-ethereum/common"
@@ -17,7 +18,10 @@ func (s *Store) DepositContractAddress(ctx context.Context) ([]byte, error) {
var addr []byte
if err := s.db.View(func(tx *bolt.Tx) error {
chainInfo := tx.Bucket(chainMetadataBucket)
addr = chainInfo.Get(depositContractAddressKey)
stored := chainInfo.Get(depositContractAddressKey)
if len(stored) > 0 {
addr = slices.Clone(stored)
}
return nil
}); err != nil { // This view never returns an error, but we'll handle anyway for sanity.
panic(err) // lint:nopanic -- View never returns an error.

View File

@@ -0,0 +1,123 @@
package kv
import (
"context"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/golang/snappy"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
)
// SaveExecutionPayloadEnvelope blinds and saves a signed execution payload envelope keyed by
// beacon block root. The envelope is stored in blinded form: the full execution payload is replaced
// with its block hash. The full payload can later be retrieved from the EL via
// engine_getPayloadBodiesByHash.
func (s *Store) SaveExecutionPayloadEnvelope(ctx context.Context, env *ethpb.SignedExecutionPayloadEnvelope) error {
_, span := trace.StartSpan(ctx, "BeaconDB.SaveExecutionPayloadEnvelope")
defer span.End()
if env == nil || env.Message == nil || env.Message.Payload == nil {
return errors.New("cannot save nil execution payload envelope")
}
blockRoot := bytesutil.ToBytes32(env.Message.BeaconBlockRoot)
blinded := blindEnvelope(env)
enc, err := encodeBlindedEnvelope(blinded)
if err != nil {
return err
}
return s.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(executionPayloadEnvelopesBucket)
return bkt.Put(blockRoot[:], enc)
})
}
// ExecutionPayloadEnvelope retrieves the blinded signed execution payload envelope by beacon block root.
func (s *Store) ExecutionPayloadEnvelope(ctx context.Context, blockRoot [32]byte) (*ethpb.SignedBlindedExecutionPayloadEnvelope, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.ExecutionPayloadEnvelope")
defer span.End()
var enc []byte
if err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(executionPayloadEnvelopesBucket)
enc = bkt.Get(blockRoot[:])
return nil
}); err != nil {
return nil, err
}
if enc == nil {
return nil, errors.Wrap(ErrNotFound, "execution payload envelope not found")
}
return decodeBlindedEnvelope(enc)
}
// HasExecutionPayloadEnvelope checks whether an execution payload envelope exists for the given beacon block root.
func (s *Store) HasExecutionPayloadEnvelope(ctx context.Context, blockRoot [32]byte) bool {
_, span := trace.StartSpan(ctx, "BeaconDB.HasExecutionPayloadEnvelope")
defer span.End()
var exists bool
if err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(executionPayloadEnvelopesBucket)
exists = bkt.Get(blockRoot[:]) != nil
return nil
}); err != nil {
return false
}
return exists
}
// DeleteExecutionPayloadEnvelope removes a signed execution payload envelope by beacon block root.
func (s *Store) DeleteExecutionPayloadEnvelope(ctx context.Context, blockRoot [32]byte) error {
_, span := trace.StartSpan(ctx, "BeaconDB.DeleteExecutionPayloadEnvelope")
defer span.End()
return s.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(executionPayloadEnvelopesBucket)
return bkt.Delete(blockRoot[:])
})
}
// blindEnvelope converts a full signed envelope to its blinded form by replacing
// the execution payload with its block hash. This avoids computing the expensive
// payload hash tree root on the critical path.
func blindEnvelope(env *ethpb.SignedExecutionPayloadEnvelope) *ethpb.SignedBlindedExecutionPayloadEnvelope {
return &ethpb.SignedBlindedExecutionPayloadEnvelope{
Message: &ethpb.BlindedExecutionPayloadEnvelope{
BlockHash: env.Message.Payload.BlockHash,
ExecutionRequests: env.Message.ExecutionRequests,
BuilderIndex: env.Message.BuilderIndex,
BeaconBlockRoot: env.Message.BeaconBlockRoot,
Slot: env.Message.Slot,
StateRoot: env.Message.StateRoot,
},
Signature: env.Signature,
}
}
// encodeBlindedEnvelope SSZ-encodes and snappy-compresses a blinded envelope for storage.
func encodeBlindedEnvelope(env *ethpb.SignedBlindedExecutionPayloadEnvelope) ([]byte, error) {
sszBytes, err := env.MarshalSSZ()
if err != nil {
return nil, errors.Wrap(err, "could not marshal blinded envelope")
}
return snappy.Encode(nil, sszBytes), nil
}
// decodeBlindedEnvelope snappy-decompresses and SSZ-decodes a blinded envelope from storage.
func decodeBlindedEnvelope(enc []byte) (*ethpb.SignedBlindedExecutionPayloadEnvelope, error) {
dec, err := snappy.Decode(nil, enc)
if err != nil {
return nil, errors.Wrap(err, "could not snappy decode envelope")
}
blinded := &ethpb.SignedBlindedExecutionPayloadEnvelope{}
if err := blinded.UnmarshalSSZ(dec); err != nil {
return nil, errors.Wrap(err, "could not unmarshal blinded envelope")
}
return blinded, nil
}

View File

@@ -0,0 +1,124 @@
package kv
import (
"context"
"testing"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
enginev1 "github.com/OffchainLabs/prysm/v7/proto/engine/v1"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
)
func testEnvelope(t *testing.T) *ethpb.SignedExecutionPayloadEnvelope {
t.Helper()
return &ethpb.SignedExecutionPayloadEnvelope{
Message: &ethpb.ExecutionPayloadEnvelope{
Payload: &enginev1.ExecutionPayloadDeneb{
ParentHash: bytesutil.PadTo([]byte("parent"), 32),
FeeRecipient: bytesutil.PadTo([]byte("fee"), 20),
StateRoot: bytesutil.PadTo([]byte("stateroot"), 32),
ReceiptsRoot: bytesutil.PadTo([]byte("receipts"), 32),
LogsBloom: bytesutil.PadTo([]byte{}, 256),
PrevRandao: bytesutil.PadTo([]byte("randao"), 32),
BlockNumber: 100,
GasLimit: 30000000,
GasUsed: 21000,
Timestamp: 1000,
ExtraData: []byte("extra"),
BaseFeePerGas: bytesutil.PadTo([]byte{1}, 32),
BlockHash: bytesutil.PadTo([]byte("blockhash"), 32),
Transactions: [][]byte{[]byte("tx1"), []byte("tx2")},
Withdrawals: []*enginev1.Withdrawal{{Index: 1, ValidatorIndex: 2, Address: bytesutil.PadTo([]byte("addr"), 20), Amount: 100}},
BlobGasUsed: 131072,
ExcessBlobGas: 0,
},
ExecutionRequests: &enginev1.ExecutionRequests{},
BuilderIndex: primitives.BuilderIndex(42),
BeaconBlockRoot: bytesutil.PadTo([]byte("beaconroot"), 32),
Slot: primitives.Slot(99),
StateRoot: bytesutil.PadTo([]byte("envelopestateroot"), 32),
},
Signature: bytesutil.PadTo([]byte("sig"), 96),
}
}
func TestStore_SaveAndRetrieveExecutionPayloadEnvelope(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
env := testEnvelope(t)
// Keyed by beacon block root.
blockRoot := bytesutil.ToBytes32(env.Message.BeaconBlockRoot)
// Initially should not exist.
assert.Equal(t, false, db.HasExecutionPayloadEnvelope(ctx, blockRoot))
// Save (always blinds internally).
require.NoError(t, db.SaveExecutionPayloadEnvelope(ctx, env))
// Should exist now.
assert.Equal(t, true, db.HasExecutionPayloadEnvelope(ctx, blockRoot))
// Load and verify it's blinded.
loaded, err := db.ExecutionPayloadEnvelope(ctx, blockRoot)
require.NoError(t, err)
// Verify metadata is preserved.
assert.Equal(t, env.Message.Slot, loaded.Message.Slot)
assert.Equal(t, env.Message.BuilderIndex, loaded.Message.BuilderIndex)
assert.DeepEqual(t, env.Message.BeaconBlockRoot, loaded.Message.BeaconBlockRoot)
assert.DeepEqual(t, env.Message.StateRoot, loaded.Message.StateRoot)
assert.DeepEqual(t, env.Signature, loaded.Signature)
// BlockHash should be the payload's block hash (not a hash tree root).
assert.DeepEqual(t, env.Message.Payload.BlockHash, loaded.Message.BlockHash)
}
func TestStore_DeleteExecutionPayloadEnvelope(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
env := testEnvelope(t)
blockRoot := bytesutil.ToBytes32(env.Message.BeaconBlockRoot)
require.NoError(t, db.SaveExecutionPayloadEnvelope(ctx, env))
assert.Equal(t, true, db.HasExecutionPayloadEnvelope(ctx, blockRoot))
require.NoError(t, db.DeleteExecutionPayloadEnvelope(ctx, blockRoot))
assert.Equal(t, false, db.HasExecutionPayloadEnvelope(ctx, blockRoot))
}
func TestStore_ExecutionPayloadEnvelope_NotFound(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
nonExistent := bytesutil.ToBytes32([]byte("nonexistent"))
_, err := db.ExecutionPayloadEnvelope(ctx, nonExistent)
require.ErrorContains(t, "not found", err)
}
func TestStore_SaveExecutionPayloadEnvelope_NilRejected(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
err := db.SaveExecutionPayloadEnvelope(ctx, nil)
require.ErrorContains(t, "nil", err)
}
func TestBlindEnvelope_PreservesBlockHash(t *testing.T) {
env := testEnvelope(t)
blinded := blindEnvelope(env)
// Should contain the block hash from the payload, not a hash tree root.
assert.DeepEqual(t, env.Message.Payload.BlockHash, blinded.Message.BlockHash)
// Metadata should be preserved.
assert.Equal(t, env.Message.BuilderIndex, blinded.Message.BuilderIndex)
assert.Equal(t, env.Message.Slot, blinded.Message.Slot)
assert.DeepEqual(t, env.Message.BeaconBlockRoot, blinded.Message.BeaconBlockRoot)
assert.DeepEqual(t, env.Message.StateRoot, blinded.Message.StateRoot)
assert.DeepEqual(t, env.Signature, blinded.Signature)
}

View File

@@ -87,3 +87,10 @@ func hasFuluBlindKey(enc []byte) bool {
}
return bytes.Equal(enc[:len(fuluBlindKey)], fuluBlindKey)
}
func hasGloasKey(enc []byte) bool {
if len(gloasKey) >= len(enc) {
return false
}
return bytes.Equal(enc[:len(gloasKey)], gloasKey)
}

View File

@@ -126,6 +126,7 @@ var Buckets = [][]byte{
feeRecipientBucket,
registrationBucket,
custodyBucket,
executionPayloadEnvelopesBucket,
}
// KVStoreOption is a functional option that modifies a kv.Store.

View File

@@ -199,7 +199,7 @@ func performValidatorStateMigration(ctx context.Context, bar *progressbar.Progre
func stateBucketKeys(stateBucket *bolt.Bucket) ([][]byte, error) {
var keys [][]byte
if err := stateBucket.ForEach(func(pubKey, v []byte) error {
keys = append(keys, pubKey)
keys = append(keys, bytes.Clone(pubKey))
return nil
}); err != nil {
return nil, err

View File

@@ -7,16 +7,17 @@ package kv
// it easy to scan for keys that have a certain shard number as a prefix and return those
// corresponding attestations.
var (
blocksBucket = []byte("blocks")
stateBucket = []byte("state")
stateSummaryBucket = []byte("state-summary")
chainMetadataBucket = []byte("chain-metadata")
checkpointBucket = []byte("check-point")
powchainBucket = []byte("powchain")
stateValidatorsBucket = []byte("state-validators")
feeRecipientBucket = []byte("fee-recipient")
registrationBucket = []byte("registration")
stateDiffBucket = []byte("state-diff")
blocksBucket = []byte("blocks")
stateBucket = []byte("state")
stateSummaryBucket = []byte("state-summary")
chainMetadataBucket = []byte("chain-metadata")
checkpointBucket = []byte("check-point")
powchainBucket = []byte("powchain")
stateValidatorsBucket = []byte("state-validators")
feeRecipientBucket = []byte("fee-recipient")
registrationBucket = []byte("registration")
stateDiffBucket = []byte("state-diff")
executionPayloadEnvelopesBucket = []byte("execution-payload-envelopes")
// Light Client Updates Bucket
lightClientUpdatesBucket = []byte("light-client-updates")
@@ -60,6 +61,8 @@ var (
electraBlindKey = []byte("blind-electra")
fuluKey = []byte("fulu")
fuluBlindKey = []byte("blind-fulu")
gloasKey = []byte("gloas")
// No gloasBlindKey needed - Gloas blocks are never blinded (no execution payload in block body).
// block root included in the beacon state used by weak subjectivity initial sync
originCheckpointBlockRootKey = []byte("origin-checkpoint-block-root")

View File

@@ -2,6 +2,7 @@ package kv
import (
"context"
"slices"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
@@ -187,20 +188,23 @@ func (s *Store) getDiff(lvl int, slot uint64) (hdiff.HdiffBytes, error) {
return bolt.ErrBucketNotFound
}
buf := append(key, stateSuffix...)
stateDiff = bucket.Get(buf)
if stateDiff == nil {
rawStateDiff := bucket.Get(buf)
if len(rawStateDiff) == 0 {
return errors.New("state diff not found")
}
stateDiff = slices.Clone(rawStateDiff)
buf = append(key, validatorSuffix...)
validatorDiff = bucket.Get(buf)
if validatorDiff == nil {
rawValidatorDiff := bucket.Get(buf)
if len(rawValidatorDiff) == 0 {
return errors.New("validator diff not found")
}
validatorDiff = slices.Clone(rawValidatorDiff)
buf = append(key, balancesSuffix...)
balancesDiff = bucket.Get(buf)
if balancesDiff == nil {
rawBalancesDiff := bucket.Get(buf)
if len(rawBalancesDiff) == 0 {
return errors.New("balances diff not found")
}
balancesDiff = slices.Clone(rawBalancesDiff)
return nil
})
@@ -224,10 +228,11 @@ func (s *Store) getFullSnapshot(slot uint64) (state.BeaconState, error) {
if bucket == nil {
return bolt.ErrBucketNotFound
}
enc = bucket.Get(key)
if enc == nil {
rawEnc := bucket.Get(key)
if rawEnc == nil {
return errors.New("state not found")
}
enc = slices.Clone(rawEnc)
return nil
})

View File

@@ -2,6 +2,7 @@ package kv
import (
"context"
"slices"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
@@ -47,7 +48,11 @@ func (s *Store) StateSummary(ctx context.Context, blockRoot [32]byte) (*ethpb.St
}
var enc []byte
if err := s.db.View(func(tx *bolt.Tx) error {
enc = tx.Bucket(stateSummaryBucket).Get(blockRoot[:])
rawEnc := tx.Bucket(stateSummaryBucket).Get(blockRoot[:])
if len(rawEnc) == 0 {
return nil
}
enc = slices.Clone(rawEnc)
return nil
}); err != nil {
return nil, err

View File

@@ -73,7 +73,6 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_k8s_client_go//tools/cache:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",

View File

@@ -7,7 +7,6 @@ import (
"strings"
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/execution/types"
@@ -59,7 +58,6 @@ var (
fuluEngineEndpoints = []string{
GetPayloadMethodV5,
GetBlobsV2,
GetBlobsV3,
}
)
@@ -101,8 +99,6 @@ const (
GetBlobsV1 = "engine_getBlobsV1"
// GetBlobsV2 request string for JSON-RPC.
GetBlobsV2 = "engine_getBlobsV2"
// GetBlobsV3 request string for JSON-RPC.
GetBlobsV3 = "engine_getBlobsV3"
// Defines the seconds before timing out engine endpoints with non-block execution semantics.
defaultEngineTimeout = time.Second
)
@@ -126,7 +122,7 @@ type Reconstructor interface {
ctx context.Context, blindedBlocks []interfaces.ReadOnlySignedBeaconBlock,
) ([]interfaces.SignedBeaconBlock, error)
ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [fieldparams.RootLength]byte, hi func(uint64) bool) ([]blocks.VerifiedROBlob, error)
ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, []blocks.PartialDataColumn, error)
ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error)
}
// EngineCaller defines a client that can interact with an Ethereum
@@ -557,22 +553,6 @@ func (s *Service) GetBlobsV2(ctx context.Context, versionedHashes []common.Hash)
return result, handleRPCError(err)
}
func (s *Service) GetBlobsV3(ctx context.Context, versionedHashes []common.Hash) ([]*pb.BlobAndProofV2, error) {
ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.GetBlobsV3")
defer span.End()
start := time.Now()
if !s.capabilityCache.has(GetBlobsV3) {
return nil, errors.New(fmt.Sprintf("%s is not supported", GetBlobsV3))
}
getBlobsV3RequestsTotal.Inc()
result := make([]*pb.BlobAndProofV2, len(versionedHashes))
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV3, versionedHashes)
getBlobsV3Latency.Observe(time.Since(start).Seconds())
return result, handleRPCError(err)
}
// ReconstructFullBlock takes in a blinded beacon block and reconstructs
// a beacon block with a full execution payload via the engine API.
func (s *Service) ReconstructFullBlock(
@@ -683,47 +663,40 @@ func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.
return verifiedBlobs, nil
}
func (s *Service) ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, []blocks.PartialDataColumn, error) {
func (s *Service) ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error) {
root := populator.Root()
// Fetch cells and proofs from the execution client using the KZG commitments from the sidecar.
commitments, err := populator.Commitments()
if err != nil {
return nil, nil, wrapWithBlockRoot(err, root, "commitments")
return nil, wrapWithBlockRoot(err, root, "commitments")
}
included, cellsPerBlob, proofsPerBlob, err := s.fetchCellsAndProofsFromExecution(ctx, commitments)
log.Info("Received cells and proofs from execution client", "included", included, "cells count", len(cellsPerBlob), "err", err)
cellsPerBlob, proofsPerBlob, err := s.fetchCellsAndProofsFromExecution(ctx, commitments)
if err != nil {
return nil, nil, wrapWithBlockRoot(err, root, "fetch cells and proofs from execution client")
return nil, wrapWithBlockRoot(err, root, "fetch cells and proofs from execution client")
}
partialColumns, err := peerdas.PartialColumns(included, cellsPerBlob, proofsPerBlob, populator)
haveAllBlobs := included.Count() == uint64(len(commitments))
log.Info("Constructed partial columns", "haveAllBlobs", haveAllBlobs)
if haveAllBlobs {
// Construct data column sidears from the signed block and cells and proofs.
roSidecars, err := peerdas.DataColumnSidecars(cellsPerBlob, proofsPerBlob, populator)
if err != nil {
return nil, nil, wrapWithBlockRoot(err, populator.Root(), "data column sidcars from column sidecar")
}
// Upgrade the sidecars to verified sidecars.
// We trust the execution layer we are connected to, so we can upgrade the sidecar into a verified one.
verifiedROSidecars := upgradeSidecarsToVerifiedSidecars(roSidecars)
return verifiedROSidecars, partialColumns, nil
// Return early if nothing is returned from the EL.
if len(cellsPerBlob) == 0 {
return nil, nil
}
// Construct data column sidears from the signed block and cells and proofs.
roSidecars, err := peerdas.DataColumnSidecars(cellsPerBlob, proofsPerBlob, populator)
if err != nil {
return nil, nil, wrapWithBlockRoot(err, populator.Root(), "partial columns from column sidecar")
return nil, wrapWithBlockRoot(err, populator.Root(), "data column sidcars from column sidecar")
}
return nil, partialColumns, nil
// Upgrade the sidecars to verified sidecars.
// We trust the execution layer we are connected to, so we can upgrade the sidecar into a verified one.
verifiedROSidecars := upgradeSidecarsToVerifiedSidecars(roSidecars)
return verifiedROSidecars, nil
}
// fetchCellsAndProofsFromExecution fetches cells and proofs from the execution client (using engine_getBlobsV2 execution API method)
func (s *Service) fetchCellsAndProofsFromExecution(ctx context.Context, kzgCommitments [][]byte) (bitfield.Bitlist /* included parts */, [][]kzg.Cell, [][]kzg.Proof, error) {
func (s *Service) fetchCellsAndProofsFromExecution(ctx context.Context, kzgCommitments [][]byte) ([][]kzg.Cell, [][]kzg.Proof, error) {
// Collect KZG hashes for all blobs.
versionedHashes := make([]common.Hash, 0, len(kzgCommitments))
for _, commitment := range kzgCommitments {
@@ -731,34 +704,24 @@ func (s *Service) fetchCellsAndProofsFromExecution(ctx context.Context, kzgCommi
versionedHashes = append(versionedHashes, versionedHash)
}
var blobAndProofs []*pb.BlobAndProofV2
// Fetch all blobsAndCellsProofs from the execution client.
var err error
useV3 := s.capabilityCache.has(GetBlobsV3)
if useV3 {
// v3 can return a partial response. V2 is all or nothing
blobAndProofs, err = s.GetBlobsV3(ctx, versionedHashes)
} else {
blobAndProofs, err = s.GetBlobsV2(ctx, versionedHashes)
blobAndProofV2s, err := s.GetBlobsV2(ctx, versionedHashes)
if err != nil {
return nil, nil, errors.Wrapf(err, "get blobs V2")
}
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "get blobs V2/3")
// Return early if nothing is returned from the EL.
if len(blobAndProofV2s) == 0 {
return nil, nil, nil
}
// Compute cells and proofs from the blobs and cell proofs.
included, cellsPerBlob, proofsPerBlob, err := peerdas.ComputeCellsAndProofsFromStructured(uint64(len(kzgCommitments)), blobAndProofs)
cellsPerBlob, proofsPerBlob, err := peerdas.ComputeCellsAndProofsFromStructured(blobAndProofV2s)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "compute cells and proofs")
}
if included.Count() == uint64(len(kzgCommitments)) {
getBlobsV3CompleteResponsesTotal.Inc()
} else if included.Count() > 0 {
getBlobsV3PartialResponsesTotal.Inc()
return nil, nil, errors.Wrap(err, "compute cells and proofs")
}
return included, cellsPerBlob, proofsPerBlob, nil
return cellsPerBlob, proofsPerBlob, nil
}
// upgradeSidecarsToVerifiedSidecars upgrades a list of data column sidecars into verified data column sidecars.

View File

@@ -2587,7 +2587,7 @@ func TestConstructDataColumnSidecars(t *testing.T) {
ctx := context.Background()
t.Run("GetBlobsV2 is not supported", func(t *testing.T) {
_, _, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
_, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.ErrorContains(t, "engine_getBlobsV2 is not supported", err)
})
@@ -2598,7 +2598,7 @@ func TestConstructDataColumnSidecars(t *testing.T) {
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
dataColumns, _, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
dataColumns, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
require.Equal(t, 0, len(dataColumns))
})
@@ -2611,7 +2611,7 @@ func TestConstructDataColumnSidecars(t *testing.T) {
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
dataColumns, _, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
dataColumns, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
require.Equal(t, 128, len(dataColumns))
})

View File

@@ -34,25 +34,6 @@ var (
Buckets: []float64{25, 50, 100, 200, 500, 1000, 2000, 4000},
},
)
getBlobsV3RequestsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "beacon_engine_getBlobsV3_requests_total",
Help: "Total number of engine_getBlobsV3 requests sent",
})
getBlobsV3CompleteResponsesTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "beacon_engine_getBlobsV3_complete_responses_total",
Help: "Total number of complete engine_getBlobsV3 successful responses received",
})
getBlobsV3PartialResponsesTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "beacon_engine_getBlobsV3_partial_responses_total",
Help: "Total number of engine_getBlobsV3 partial responses received",
})
getBlobsV3Latency = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "beacon_engine_getBlobsV3_request_duration_seconds",
Help: "Duration of engine_getBlobsV3 requests in seconds",
Buckets: []float64{0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 4},
},
)
errParseCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "execution_parse_error_count",
Help: "The number of errors that occurred while parsing execution payload",

View File

@@ -118,8 +118,8 @@ func (e *EngineClient) ReconstructBlobSidecars(context.Context, interfaces.ReadO
}
// ConstructDataColumnSidecars is a mock implementation of the ConstructDataColumnSidecars method.
func (e *EngineClient) ConstructDataColumnSidecars(context.Context, peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, []blocks.PartialDataColumn, error) {
return e.DataColumnSidecars, nil, e.ErrorDataColumnSidecars
func (e *EngineClient) ConstructDataColumnSidecars(context.Context, peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error) {
return e.DataColumnSidecars, e.ErrorDataColumnSidecars
}
// GetTerminalBlockHash --

View File

@@ -6,7 +6,6 @@ go_library(
"doc.go",
"errors.go",
"forkchoice.go",
"last_root.go",
"log.go",
"metrics.go",
"node.go",
@@ -51,7 +50,6 @@ go_test(
srcs = [
"ffg_update_test.go",
"forkchoice_test.go",
"last_root_test.go",
"no_vote_test.go",
"node_test.go",
"on_tick_test.go",

View File

@@ -32,7 +32,6 @@ func New() *ForkChoice {
finalizedCheckpoint: &forkchoicetypes.Checkpoint{},
proposerBoostRoot: [32]byte{},
nodeByRoot: make(map[[fieldparams.RootLength]byte]*Node),
nodeByPayload: make(map[[fieldparams.RootLength]byte]*Node),
slashedIndices: make(map[primitives.ValidatorIndex]bool),
receivedBlocksLastEpoch: [fieldparams.SlotsPerEpoch]primitives.Slot{},
}

View File

@@ -1,26 +0,0 @@
package doublylinkedtree
import (
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/time/slots"
)
// LastRoot returns the last canonical block root in the given epoch
func (f *ForkChoice) LastRoot(epoch primitives.Epoch) [32]byte {
head := f.store.headNode
headEpoch := slots.ToEpoch(head.slot)
epochEnd, err := slots.EpochEnd(epoch)
if err != nil {
return [32]byte{}
}
if headEpoch <= epoch {
return head.root
}
for head != nil && head.slot > epochEnd {
head = head.parent
}
if head == nil {
return [32]byte{}
}
return head.root
}

View File

@@ -1,38 +0,0 @@
package doublylinkedtree
import (
"testing"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/testing/require"
)
func TestLastRoot(t *testing.T) {
f := setup(0, 0)
ctx := t.Context()
st, root, err := prepareForkchoiceState(ctx, 1, [32]byte{'1'}, params.BeaconConfig().ZeroHash, [32]byte{'1'}, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, root))
st, root, err = prepareForkchoiceState(ctx, 2, [32]byte{'2'}, [32]byte{'1'}, [32]byte{'2'}, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, root))
st, root, err = prepareForkchoiceState(ctx, 3, [32]byte{'3'}, [32]byte{'1'}, [32]byte{'3'}, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, root))
st, root, err = prepareForkchoiceState(ctx, 32, [32]byte{'4'}, [32]byte{'3'}, [32]byte{'4'}, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, root))
st, root, err = prepareForkchoiceState(ctx, 33, [32]byte{'5'}, [32]byte{'2'}, [32]byte{'5'}, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, root))
st, root, err = prepareForkchoiceState(ctx, 34, [32]byte{'6'}, [32]byte{'5'}, [32]byte{'6'}, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, root))
headNode := f.store.nodeByRoot[[32]byte{'6'}]
f.store.headNode = headNode
require.Equal(t, [32]byte{'6'}, f.store.headNode.root)
require.Equal(t, [32]byte{'2'}, f.LastRoot(0))
require.Equal(t, [32]byte{'6'}, f.LastRoot(1))
require.Equal(t, [32]byte{'6'}, f.LastRoot(2))
}

View File

@@ -94,6 +94,5 @@ func (s *Store) removeNodeAndChildren(ctx context.Context, node *Node, invalidRo
s.previousProposerBoostScore = 0
}
delete(s.nodeByRoot, node.root)
delete(s.nodeByPayload, node.payloadHash)
return invalidRoots, nil
}

View File

@@ -113,7 +113,6 @@ func (s *Store) insert(ctx context.Context,
}
}
s.nodeByPayload[payloadHash] = n
s.nodeByRoot[root] = n
if parent == nil {
if s.treeRootNode == nil {
@@ -122,7 +121,6 @@ func (s *Store) insert(ctx context.Context,
s.highestReceivedNode = n
} else {
delete(s.nodeByRoot, root)
delete(s.nodeByPayload, payloadHash)
return nil, errInvalidParentRoot
}
} else {
@@ -191,7 +189,6 @@ func (s *Store) pruneFinalizedNodeByRootMap(ctx context.Context, node, finalized
node.children = nil
delete(s.nodeByRoot, node.root)
delete(s.nodeByPayload, node.payloadHash)
return nil
}
@@ -273,21 +270,6 @@ func (f *ForkChoice) HighestReceivedBlockSlot() primitives.Slot {
return f.store.highestReceivedNode.slot
}
// HighestReceivedBlockDelay returns the number of slots that the highest
// received block was late when receiving it. For example, a block was late by 12 slots,
// then this method is expected to return 12.
func (f *ForkChoice) HighestReceivedBlockDelay() primitives.Slot {
n := f.store.highestReceivedNode
if n == nil {
return 0
}
sss, err := slots.SinceSlotStart(n.slot, f.store.genesisTime, n.timestamp)
if err != nil {
return 0
}
return primitives.Slot(uint64(sss/time.Second) / params.BeaconConfig().SecondsPerSlot)
}
// ReceivedBlocksLastEpoch returns the number of blocks received in the last epoch
func (f *ForkChoice) ReceivedBlocksLastEpoch() (uint64, error) {
count := uint64(0)

View File

@@ -128,10 +128,9 @@ func TestStore_Insert(t *testing.T) {
// The new node does not have a parent.
treeRootNode := &Node{slot: 0, root: indexToHash(0)}
nodeByRoot := map[[32]byte]*Node{indexToHash(0): treeRootNode}
nodeByPayload := map[[32]byte]*Node{indexToHash(0): treeRootNode}
jc := &forkchoicetypes.Checkpoint{Epoch: 0}
fc := &forkchoicetypes.Checkpoint{Epoch: 0}
s := &Store{nodeByRoot: nodeByRoot, treeRootNode: treeRootNode, nodeByPayload: nodeByPayload, justifiedCheckpoint: jc, finalizedCheckpoint: fc, highestReceivedNode: &Node{}}
s := &Store{nodeByRoot: nodeByRoot, treeRootNode: treeRootNode, justifiedCheckpoint: jc, finalizedCheckpoint: fc, highestReceivedNode: &Node{}}
payloadHash := [32]byte{'a'}
ctx := t.Context()
_, blk, err := prepareForkchoiceState(ctx, 100, indexToHash(100), indexToHash(0), payloadHash, 1, 1)
@@ -238,7 +237,6 @@ func TestStore_Prune_NoDanglingBranch(t *testing.T) {
s.finalizedCheckpoint.Root = indexToHash(1)
require.NoError(t, s.prune(t.Context()))
require.Equal(t, len(s.nodeByRoot), 1)
require.Equal(t, len(s.nodeByPayload), 1)
}
// This test starts with the following branching diagram
@@ -319,8 +317,6 @@ func TestStore_PruneMapsNodes(t *testing.T) {
s.finalizedCheckpoint.Root = indexToHash(1)
require.NoError(t, s.prune(t.Context()))
require.Equal(t, len(s.nodeByRoot), 1)
require.Equal(t, len(s.nodeByPayload), 1)
}
func TestForkChoice_ReceivedBlocksLastEpoch(t *testing.T) {
@@ -339,7 +335,6 @@ func TestForkChoice_ReceivedBlocksLastEpoch(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(1), count)
require.Equal(t, primitives.Slot(1), f.HighestReceivedBlockSlot())
require.Equal(t, primitives.Slot(0), f.HighestReceivedBlockDelay())
// 64
// Received block last epoch is 1
@@ -352,7 +347,6 @@ func TestForkChoice_ReceivedBlocksLastEpoch(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(1), count)
require.Equal(t, primitives.Slot(64), f.HighestReceivedBlockSlot())
require.Equal(t, primitives.Slot(0), f.HighestReceivedBlockDelay())
// 64 65
// Received block last epoch is 2
@@ -365,7 +359,6 @@ func TestForkChoice_ReceivedBlocksLastEpoch(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(2), count)
require.Equal(t, primitives.Slot(65), f.HighestReceivedBlockSlot())
require.Equal(t, primitives.Slot(1), f.HighestReceivedBlockDelay())
// 64 65 66
// Received block last epoch is 3
@@ -717,17 +710,3 @@ func TestStore_CleanupInserting(t *testing.T) {
require.NotNil(t, f.InsertNode(ctx, st, blk))
require.Equal(t, false, f.HasNode(blk.Root()))
}
func TestStore_HighestReceivedBlockDelay(t *testing.T) {
f := ForkChoice{
store: &Store{
genesisTime: time.Unix(0, 0),
highestReceivedNode: &Node{
slot: 10,
timestamp: time.Unix(int64(((10 + 12) * params.BeaconConfig().SecondsPerSlot)), 0), // 12 slots late
},
},
}
require.Equal(t, primitives.Slot(12), f.HighestReceivedBlockDelay())
}

View File

@@ -36,7 +36,6 @@ type Store struct {
treeRootNode *Node // the root node of the store tree.
headNode *Node // last head Node
nodeByRoot map[[fieldparams.RootLength]byte]*Node // nodes indexed by roots.
nodeByPayload map[[fieldparams.RootLength]byte]*Node // nodes indexed by payload Hash
slashedIndices map[primitives.ValidatorIndex]bool // the list of equivocating validator indices
originRoot [fieldparams.RootLength]byte // The genesis block root
genesisTime time.Time

View File

@@ -67,13 +67,11 @@ type FastGetter interface {
HasNode([32]byte) bool
HighestReceivedBlockSlot() primitives.Slot
HighestReceivedBlockRoot() [32]byte
HighestReceivedBlockDelay() primitives.Slot
IsCanonical(root [32]byte) bool
IsOptimistic(root [32]byte) (bool, error)
IsViableForCheckpoint(*forkchoicetypes.Checkpoint) (bool, error)
JustifiedCheckpoint() *forkchoicetypes.Checkpoint
JustifiedPayloadBlockHash() [32]byte
LastRoot(primitives.Epoch) [32]byte
NodeCount() int
PreviousJustifiedCheckpoint() *forkchoicetypes.Checkpoint
ProposerBoost() [fieldparams.RootLength]byte

View File

@@ -121,13 +121,6 @@ func (ro *ROForkChoice) HighestReceivedBlockRoot() [32]byte {
return ro.getter.HighestReceivedBlockRoot()
}
// HighestReceivedBlockDelay delegates to the underlying forkchoice call, under a lock.
func (ro *ROForkChoice) HighestReceivedBlockDelay() primitives.Slot {
ro.l.RLock()
defer ro.l.RUnlock()
return ro.getter.HighestReceivedBlockDelay()
}
// ReceivedBlocksLastEpoch delegates to the underlying forkchoice call, under a lock.
func (ro *ROForkChoice) ReceivedBlocksLastEpoch() (uint64, error) {
ro.l.RLock()
@@ -163,13 +156,6 @@ func (ro *ROForkChoice) Slot(root [32]byte) (primitives.Slot, error) {
return ro.getter.Slot(root)
}
// LastRoot delegates to the underlying forkchoice call, under a lock.
func (ro *ROForkChoice) LastRoot(e primitives.Epoch) [32]byte {
ro.l.RLock()
defer ro.l.RUnlock()
return ro.getter.LastRoot(e)
}
// DependentRoot delegates to the underlying forkchoice call, under a lock.
func (ro *ROForkChoice) DependentRoot(epoch primitives.Epoch) ([32]byte, error) {
ro.l.RLock()

View File

@@ -30,7 +30,6 @@ const (
nodeCountCalled
highestReceivedBlockSlotCalled
highestReceivedBlockRootCalled
highestReceivedBlockDelayCalled
receivedBlocksLastEpochCalled
weightCalled
isOptimisticCalled
@@ -118,11 +117,6 @@ func TestROLocking(t *testing.T) {
call: highestReceivedBlockSlotCalled,
cb: func(g FastGetter) { g.HighestReceivedBlockSlot() },
},
{
name: "highestReceivedBlockDelayCalled",
call: highestReceivedBlockDelayCalled,
cb: func(g FastGetter) { g.HighestReceivedBlockDelay() },
},
{
name: "receivedBlocksLastEpochCalled",
call: receivedBlocksLastEpochCalled,
@@ -148,11 +142,6 @@ func TestROLocking(t *testing.T) {
call: slotCalled,
cb: func(g FastGetter) { _, err := g.Slot([32]byte{}); _discard(t, err) },
},
{
name: "lastRootCalled",
call: lastRootCalled,
cb: func(g FastGetter) { g.LastRoot(0) },
},
{
name: "targetRootForEpochCalled",
call: targetRootForEpochCalled,
@@ -265,11 +254,6 @@ func (ro *mockROForkchoice) HighestReceivedBlockRoot() [32]byte {
return [32]byte{}
}
func (ro *mockROForkchoice) HighestReceivedBlockDelay() primitives.Slot {
ro.calls = append(ro.calls, highestReceivedBlockDelayCalled)
return 0
}
func (ro *mockROForkchoice) ReceivedBlocksLastEpoch() (uint64, error) {
ro.calls = append(ro.calls, receivedBlocksLastEpochCalled)
return 0, nil
@@ -295,11 +279,6 @@ func (ro *mockROForkchoice) Slot(_ [32]byte) (primitives.Slot, error) {
return 0, nil
}
func (ro *mockROForkchoice) LastRoot(_ primitives.Epoch) [32]byte {
ro.calls = append(ro.calls, lastRootCalled)
return [32]byte{}
}
// DependentRoot impoements FastGetter.
func (ro *mockROForkchoice) DependentRoot(_ primitives.Epoch) ([32]byte, error) {
ro.calls = append(ro.calls, dependentRootCalled)

View File

@@ -678,7 +678,6 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
DB: b.db,
StateGen: b.stateGen,
ClockWaiter: b.ClockWaiter,
PartialDataColumns: b.cliCtx.Bool(flags.PartialDataColumns.Name),
})
if err != nil {
return err

View File

@@ -52,7 +52,6 @@ go_library(
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/partialdatacolumnbroadcaster:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",

View File

@@ -343,7 +343,7 @@ func (s *Service) BroadcastLightClientFinalityUpdate(ctx context.Context, update
// there is at least one peer in each needed subnet. If not, it will attempt to find one before broadcasting.
// This function is non-blocking. It stops trying to broadcast a given sidecar when more than one slot has passed, or the context is
// cancelled (whichever comes first).
func (s *Service) BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn, partialColumns []blocks.PartialDataColumn) error {
func (s *Service) BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn) error {
// Increase the number of broadcast attempts.
dataColumnSidecarBroadcastAttempts.Add(float64(len(sidecars)))
@@ -353,15 +353,16 @@ func (s *Service) BroadcastDataColumnSidecars(ctx context.Context, sidecars []bl
return errors.Wrap(err, "current fork digest")
}
go s.broadcastDataColumnSidecars(ctx, forkDigest, sidecars, partialColumns)
go s.broadcastDataColumnSidecars(ctx, forkDigest, sidecars)
return nil
}
// broadcastDataColumnSidecars broadcasts multiple data column sidecars to the p2p network, after ensuring
// there is at least one peer in each needed subnet. If not, it will attempt to find one before broadcasting.
// It returns when all broadcasts are complete, or the context is cancelled (whichever comes first).
func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [fieldparams.VersionLength]byte, sidecars []blocks.VerifiedRODataColumn, partialColumns []blocks.PartialDataColumn) {
// broadcastDataColumnSidecars broadcasts multiple data column sidecars to the p2p network.
// For sidecars with available peers, it uses batch publishing.
// For sidecars without peers, it finds peers first and then publishes individually.
// Both paths run in parallel. It returns when all broadcasts are complete, or the context is cancelled.
func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [fieldparams.VersionLength]byte, sidecars []blocks.VerifiedRODataColumn) {
type rootAndIndex struct {
root [fieldparams.RootLength]byte
index uint64
@@ -371,8 +372,8 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
logLevel := logrus.GetLevel()
slotPerRoot := make(map[[fieldparams.RootLength]byte]primitives.Slot, 1)
topicFunc := func(dcIndex uint64) (topic string, wrappedSubIdx uint64, subnet uint64) {
subnet = peerdas.ComputeSubnetForDataColumnSidecar(dcIndex)
topicFunc := func(sidecar blocks.VerifiedRODataColumn) (topic string, wrappedSubIdx uint64, subnet uint64) {
subnet = peerdas.ComputeSubnetForDataColumnSidecar(sidecar.Index)
topic = dataColumnSubnetToTopic(subnet, forkDigest)
wrappedSubIdx = subnet + dataColumnSubnetVal
return
@@ -385,7 +386,7 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
for _, sidecar := range sidecars {
slotPerRoot[sidecar.BlockRoot()] = sidecar.Slot()
topic, wrappedSubIdx, _ := topicFunc(sidecar.Index)
topic, wrappedSubIdx, _ := topicFunc(sidecar)
// Check if we have a peer for this subnet (use RLock for read-only check).
mu := s.subnetLocker(wrappedSubIdx)
mu.RLock()
@@ -410,7 +411,7 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
ctx := trace.NewContext(s.ctx, span)
defer span.End()
topic, _, _ := topicFunc(sidecar.Index)
topic, _, _ := topicFunc(sidecar)
if err := s.batchObject(ctx, &messageBatch, sidecar, topic); err != nil {
tracing.AnnotateError(span, err)
@@ -418,10 +419,6 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
return
}
// Increase the number of successful broadcasts.
dataColumnSidecarBroadcasts.Inc()
// Record the timing for log purposes.
if logLevel >= logrus.DebugLevel {
root := sidecar.BlockRoot()
timings.Store(rootAndIndex{root: root, index: sidecar.Index}, time.Now())
@@ -436,7 +433,7 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
ctx := trace.NewContext(s.ctx, span)
defer span.End()
topic, wrappedSubIdx, subnet := topicFunc(sidecar.Index)
topic, wrappedSubIdx, subnet := topicFunc(sidecar)
// Find peers for this sidecar's subnet.
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, DataColumnSubnetTopicFormat, forkDigest, subnet); err != nil {
@@ -461,32 +458,6 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
})
}
if s.partialColumnBroadcaster != nil {
// Note: There is not batch publish for partial columns.
for _, partialColumn := range partialColumns {
individualWg.Go(func() {
_, span := trace.StartSpan(ctx, "p2p.broadcastPartialDataColumn")
ctx := trace.NewContext(s.ctx, span)
defer span.End()
topic, wrappedSubIdx, subnet := topicFunc(partialColumn.Index)
// Find peers for this sidecar's subnet.
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, DataColumnSubnetTopicFormat, forkDigest, subnet); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot find peers if needed")
return
}
fullTopicStr := topic + s.Encoding().ProtocolSuffix()
if err := s.partialColumnBroadcaster.Publish(fullTopicStr, partialColumn); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot partial broadcast data column sidecar")
}
})
}
}
// Wait for batch to be populated, then publish.
batchWg.Wait()
if len(sidecarsWithPeers) > 0 {

View File

@@ -803,7 +803,7 @@ func TestService_BroadcastDataColumn(t *testing.T) {
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Broadcast to peers and wait.
err = service.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{verifiedRoSidecar}, nil)
err = service.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{verifiedRoSidecar})
require.NoError(t, err)
// Receive the message.
@@ -969,7 +969,7 @@ func TestService_BroadcastDataColumnRoundRobin(t *testing.T) {
time.Sleep(100 * time.Millisecond)
// Broadcast all sidecars.
err = service.BroadcastDataColumnSidecars(ctx, verifiedRoSidecars, nil)
err = service.BroadcastDataColumnSidecars(ctx, verifiedRoSidecars)
require.NoError(t, err)
// Give some time for messages to be sent.
time.Sleep(100 * time.Millisecond)

View File

@@ -26,7 +26,6 @@ const (
// Config for the p2p service. These parameters are set from application level flags
// to initialize the p2p service.
type Config struct {
PartialDataColumns bool
NoDiscovery bool
EnableUPnP bool
StaticPeerID bool

View File

@@ -25,6 +25,7 @@ var gossipTopicMappings = map[string]func() proto.Message{
LightClientOptimisticUpdateTopicFormat: func() proto.Message { return &ethpb.LightClientOptimisticUpdateAltair{} },
LightClientFinalityUpdateTopicFormat: func() proto.Message { return &ethpb.LightClientFinalityUpdateAltair{} },
DataColumnSubnetTopicFormat: func() proto.Message { return &ethpb.DataColumnSidecar{} },
PayloadAttestationMessageTopicFormat: func() proto.Message { return &ethpb.PayloadAttestationMessage{} },
}
// GossipTopicMappings is a function to return the assigned data type
@@ -144,4 +145,7 @@ func init() {
// Specially handle Fulu objects.
GossipTypeMapping[reflect.TypeFor[*ethpb.SignedBeaconBlockFulu]()] = BlockSubnetTopicFormat
// Payload attestation messages.
GossipTypeMapping[reflect.TypeFor[*ethpb.PayloadAttestationMessage]()] = PayloadAttestationMessageTopicFormat
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
@@ -29,7 +28,6 @@ type (
Broadcaster
SetStreamHandler
PubSubProvider
PartialColumnBroadcasterProvider
PubSubTopicUser
SenderEncoder
PeerManager
@@ -54,7 +52,7 @@ type (
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
BroadcastLightClientOptimisticUpdate(ctx context.Context, update interfaces.LightClientOptimisticUpdate) error
BroadcastLightClientFinalityUpdate(ctx context.Context, update interfaces.LightClientFinalityUpdate) error
BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn, partialColumns []blocks.PartialDataColumn) error
BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn) error
}
// SetStreamHandler configures p2p to handle streams of a certain topic ID.
@@ -94,11 +92,6 @@ type (
PubSub() *pubsub.PubSub
}
// PubSubProvider provides the p2p pubsub protocol.
PartialColumnBroadcasterProvider interface {
PartialColumnBroadcaster() *partialdatacolumnbroadcaster.PartialColumnBroadcaster
}
// PeerManager abstracts some peer management methods from libp2p.
PeerManager interface {
Disconnect(peer.ID) error

View File

@@ -157,11 +157,6 @@ var (
Help: "The number of publish messages received via rpc for a particular topic",
},
[]string{"topic"})
pubsubRPCPubRecvSize = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_recv_pub_size_total",
Help: "The total size of publish messages received via rpc for a particular topic",
},
[]string{"topic", "is_partial"})
pubsubRPCDrop = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_drop_total",
Help: "The number of messages dropped via rpc for a particular control message",
@@ -176,11 +171,6 @@ var (
Help: "The number of publish messages dropped via rpc for a particular topic",
},
[]string{"topic"})
pubsubRPCPubDropSize = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_drop_pub_size_total",
Help: "The total size of publish messages dropped via rpc for a particular topic",
},
[]string{"topic", "is_partial"})
pubsubRPCSent = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_sent_total",
Help: "The number of messages sent via rpc for a particular control message",
@@ -195,16 +185,6 @@ var (
Help: "The number of publish messages sent via rpc for a particular topic",
},
[]string{"topic"})
pubsubRPCPubSentSize = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gossipsub_pubsub_rpc_sent_pub_size_total",
Help: "The total size of publish messages sent via rpc for a particular topic",
},
[]string{"topic", "is_partial"})
pubsubMeshPeers = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "gossipsub_mesh_peers",
Help: "The number of capable peers in mesh",
},
[]string{"topic", "supports_partial"})
)
func (s *Service) updateMetrics() {

View File

@@ -1,28 +0,0 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"log.go",
"metrics.go",
"partial.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster",
visibility = ["//visibility:public"],
deps = [
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//internal/logrusadapter:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages/bitmap:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -1,25 +0,0 @@
load("@prysm//tools/go:def.bzl", "go_test")
go_test(
name = "go_default_test",
size = "medium",
srcs = ["two_node_test.go"],
deps = [
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/partialdatacolumnbroadcaster:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p//x/simlibp2p:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_marcopolo_simnet//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -1,245 +0,0 @@
package integrationtest
import (
"context"
"crypto/rand"
"fmt"
"testing"
"testing/synctest"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
"github.com/OffchainLabs/prysm/v7/testing/util"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
simlibp2p "github.com/libp2p/go-libp2p/x/simlibp2p"
"github.com/marcopolo/simnet"
"github.com/sirupsen/logrus"
)
// TestTwoNodePartialColumnExchange tests that two nodes can exchange partial columns
// and reconstruct the complete column. Node 1 has cells 0-2, Node 2 has cells 3-5.
// After exchange, both should have all cells.
func TestTwoNodePartialColumnExchange(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
// Create a simulated libp2p network
latency := time.Millisecond * 10
network, meta, err := simlibp2p.SimpleLibp2pNetwork([]simlibp2p.NodeLinkSettingsAndCount{
{LinkSettings: simnet.NodeBiDiLinkSettings{
Downlink: simnet.LinkSettings{BitsPerSecond: 20 * simlibp2p.OneMbps, Latency: latency / 2},
Uplink: simnet.LinkSettings{BitsPerSecond: 20 * simlibp2p.OneMbps, Latency: latency / 2},
}, Count: 2},
}, simlibp2p.NetworkSettings{UseBlankHost: true})
require.NoError(t, err)
require.NoError(t, network.Start())
defer func() {
require.NoError(t, network.Close())
}()
defer func() {
for _, node := range meta.Nodes {
err := node.Close()
if err != nil {
panic(err)
}
}
}()
h1 := meta.Nodes[0]
h2 := meta.Nodes[1]
logger := logrus.New()
logger.SetLevel(logrus.DebugLevel)
broadcaster1 := partialdatacolumnbroadcaster.NewBroadcaster(logger)
broadcaster2 := partialdatacolumnbroadcaster.NewBroadcaster(logger)
opts1 := broadcaster1.AppendPubSubOpts([]pubsub.Option{
pubsub.WithMessageSigning(false),
pubsub.WithStrictSignatureVerification(false),
})
opts2 := broadcaster2.AppendPubSubOpts([]pubsub.Option{
pubsub.WithMessageSigning(false),
pubsub.WithStrictSignatureVerification(false),
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ps1, err := pubsub.NewGossipSub(ctx, h1, opts1...)
require.NoError(t, err)
ps2, err := pubsub.NewGossipSub(ctx, h2, opts2...)
require.NoError(t, err)
defer func() {
broadcaster1.Stop()
broadcaster2.Stop()
}()
// Generate Test Data
var blockRoot [fieldparams.RootLength]byte
copy(blockRoot[:], []byte("test-block-root"))
numCells := 6
commitments := make([][]byte, numCells)
cells := make([][]byte, numCells)
proofs := make([][]byte, numCells)
for i := range numCells {
commitments[i] = make([]byte, 48)
cells[i] = make([]byte, 2048)
_, err := rand.Read(cells[i])
require.NoError(t, err)
proofs[i] = make([]byte, 48)
_ = fmt.Appendf(proofs[i][:0], "proof %d", i)
}
roDC, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{
{
BodyRoot: blockRoot[:],
KzgCommitments: commitments,
Column: cells,
KzgProofs: proofs,
},
})
pc1, err := blocks.NewPartialDataColumn(roDC[0].DataColumnSidecar.SignedBlockHeader, roDC[0].Index, roDC[0].KzgCommitments, roDC[0].KzgCommitmentsInclusionProof)
require.NoError(t, err)
pc2, err := blocks.NewPartialDataColumn(roDC[0].DataColumnSidecar.SignedBlockHeader, roDC[0].Index, roDC[0].KzgCommitments, roDC[0].KzgCommitmentsInclusionProof)
require.NoError(t, err)
// Split data
for i := range numCells {
if i%2 == 0 {
pc1.ExtendFromVerfifiedCell(uint64(i), roDC[0].Column[i], roDC[0].KzgProofs[i])
} else {
pc2.ExtendFromVerfifiedCell(uint64(i), roDC[0].Column[i], roDC[0].KzgProofs[i])
}
}
// Setup Topic and Subscriptions
digest := params.ForkDigest(0)
columnIndex := uint64(12)
subnet := peerdas.ComputeSubnetForDataColumnSidecar(columnIndex)
topicStr := fmt.Sprintf(p2p.DataColumnSubnetTopicFormat, digest, subnet) +
encoder.SszNetworkEncoder{}.ProtocolSuffix()
time.Sleep(100 * time.Millisecond)
topic1, err := ps1.Join(topicStr, pubsub.RequestPartialMessages())
require.NoError(t, err)
topic2, err := ps2.Join(topicStr, pubsub.RequestPartialMessages())
require.NoError(t, err)
// Header validator
headerValidator := func(header *ethpb.PartialDataColumnHeader) (reject bool, err error) {
if header == nil {
return false, fmt.Errorf("nil header")
}
if header.SignedBlockHeader == nil || header.SignedBlockHeader.Header == nil {
return true, fmt.Errorf("nil signed block header")
}
if len(header.KzgCommitments) == 0 {
return true, fmt.Errorf("empty kzg commitments")
}
// Verify inclusion proof
if err := peerdas.VerifyPartialDataColumnHeaderInclusionProof(header); err != nil {
return true, fmt.Errorf("invalid inclusion proof: %w", err)
}
t.Log("Header validation passed")
return false, nil
}
cellValidator := func(_ []blocks.CellProofBundle) error {
return nil
}
node1Complete := make(chan blocks.VerifiedRODataColumn, 1)
node2Complete := make(chan blocks.VerifiedRODataColumn, 1)
handler1 := func(topic string, col blocks.VerifiedRODataColumn) {
t.Logf("Node 1: Completed! Column has %d cells", len(col.Column))
node1Complete <- col
}
handler2 := func(topic string, col blocks.VerifiedRODataColumn) {
t.Logf("Node 2: Completed! Column has %d cells", len(col.Column))
node2Complete <- col
}
// Connect hosts
err = h1.Connect(context.Background(), peer.AddrInfo{
ID: h2.ID(),
Addrs: h2.Addrs(),
})
require.NoError(t, err)
time.Sleep(300 * time.Millisecond)
// Subscribe to regular GossipSub (critical for partial message RPC exchange!)
sub1, err := topic1.Subscribe()
require.NoError(t, err)
defer sub1.Cancel()
sub2, err := topic2.Subscribe()
require.NoError(t, err)
defer sub2.Cancel()
noopHeaderHandler := func(header *ethpb.PartialDataColumnHeader, groupID string) {}
err = broadcaster1.Start(headerValidator, cellValidator, handler1, noopHeaderHandler)
require.NoError(t, err)
err = broadcaster2.Start(headerValidator, cellValidator, handler2, noopHeaderHandler)
require.NoError(t, err)
err = broadcaster1.Subscribe(topic1)
require.NoError(t, err)
err = broadcaster2.Subscribe(topic2)
require.NoError(t, err)
// Wait for mesh to form
time.Sleep(2 * time.Second)
// Publish
t.Log("Publishing from Node 1")
err = broadcaster1.Publish(topicStr, pc1)
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
t.Log("Publishing from Node 2")
err = broadcaster2.Publish(topicStr, pc2)
require.NoError(t, err)
// Wait for Completion
timeout := time.After(10 * time.Second)
var col1, col2 blocks.VerifiedRODataColumn
receivedCount := 0
for receivedCount < 2 {
select {
case col1 = <-node1Complete:
t.Log("Node 1 completed reconstruction")
receivedCount++
case col2 = <-node2Complete:
t.Log("Node 2 completed reconstruction")
receivedCount++
case <-timeout:
t.Fatalf("Timeout: Only %d/2 nodes completed", receivedCount)
}
}
// Verify both columns have all cells
assert.Equal(t, numCells, len(col1.Column), "Node 1 should have all cells")
assert.Equal(t, numCells, len(col2.Column), "Node 2 should have all cells")
assert.DeepSSZEqual(t, cells, col1.Column, "Node 1 cell mismatch")
assert.DeepSSZEqual(t, cells, col2.Column, "Node 2 cell mismatch")
})
}

View File

@@ -1,9 +0,0 @@
// Code generated by hack/gen-logs.sh; DO NOT EDIT.
// This file is created and regenerated automatically. Anything added here might get removed.
package partialdatacolumnbroadcaster
import "github.com/sirupsen/logrus"
// The prefix for logs from this package will be the text after the last slash in the package path.
// If you wish to change this, you should add your desired name in the runtime/logging/logrus-prefixed-formatter/prefix-replacement.go file.
var log = logrus.WithField("package", "beacon-chain/p2p/partialdatacolumnbroadcaster")

View File

@@ -1,18 +0,0 @@
package partialdatacolumnbroadcaster
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
partialMessageUsefulCellsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "beacon_partial_message_useful_cells_total",
Help: "Number of useful cells received via a partial message",
}, []string{"column_index"})
partialMessageCellsReceivedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "beacon_partial_message_cells_received_total",
Help: "Number of total cells received via a partial message",
}, []string{"column_index"})
)

View File

@@ -1,621 +0,0 @@
package partialdatacolumnbroadcaster
import (
"bytes"
"log/slog"
"regexp"
"strconv"
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/internal/logrusadapter"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p-pubsub/partialmessages"
"github.com/libp2p/go-libp2p-pubsub/partialmessages/bitmap"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// TODOs:
// different eager push strategies:
// - no eager push
// - full column eager push
// - With debouncing - some factor of RTT
// - eager push missing cells
const TTLInSlots = 3
const maxConcurrentValidators = 128
const headerHandledTimeout = time.Second * 1
const maxConcurrentHeaderHandlers = 128
var dataColumnTopicRegex = regexp.MustCompile(`data_column_sidecar_(\d+)`)
func extractColumnIndexFromTopic(topic string) (uint64, error) {
matches := dataColumnTopicRegex.FindStringSubmatch(topic)
if len(matches) < 2 {
return 0, errors.New("could not extract column index from topic")
}
return strconv.ParseUint(matches[1], 10, 64)
}
// HeaderValidator validates a PartialDataColumnHeader.
// Returns (reject, err) where:
// - reject=true, err!=nil: REJECT - peer should be penalized
// - reject=false, err!=nil: IGNORE - don't penalize, just ignore
// - reject=false, err=nil: valid header
type HeaderValidator func(header *ethpb.PartialDataColumnHeader) (reject bool, err error)
type HeaderHandler func(header *ethpb.PartialDataColumnHeader, groupID string)
type ColumnValidator func(cells []blocks.CellProofBundle) error
type PartialColumnBroadcaster struct {
logger *logrus.Logger
ps *pubsub.PubSub
stop chan struct{}
validateHeader HeaderValidator
validateColumn ColumnValidator
handleColumn SubHandler
handleHeader HeaderHandler
// map groupID -> bool to signal when header has been handled
headerHandled map[string]bool
// map topic -> *pubsub.Topic
topics map[string]*pubsub.Topic
concurrentValidatorSemaphore chan struct{}
concurrentHeaderHandlerSemaphore chan struct{}
// map topic -> map[groupID]PartialColumn
partialMsgStore map[string]map[string]*blocks.PartialDataColumn
groupTTL map[string]int8
// validHeaderCache caches validated headers by group ID (works across topics)
validHeaderCache map[string]*ethpb.PartialDataColumnHeader
incomingReq chan request
}
type requestKind uint8
const (
requestKindPublish requestKind = iota
requestKindSubscribe
requestKindUnsubscribe
requestKindHandleIncomingRPC
requestKindCellsValidated
requestKindHeaderHandled
)
type request struct {
kind requestKind
response chan error
sub subscribe
unsub unsubscribe
publish publish
incomingRPC rpcWithFrom
cellsValidated *cellsValidated
headerHandledGroup string
}
type publish struct {
topic string
c blocks.PartialDataColumn
}
type subscribe struct {
t *pubsub.Topic
}
type unsubscribe struct {
topic string
}
type rpcWithFrom struct {
*pubsub_pb.PartialMessagesExtension
from peer.ID
}
type cellsValidated struct {
validationTook time.Duration
topic string
group []byte
cellIndices []uint64
cells []blocks.CellProofBundle
}
func NewBroadcaster(logger *logrus.Logger) *PartialColumnBroadcaster {
return &PartialColumnBroadcaster{
topics: make(map[string]*pubsub.Topic),
partialMsgStore: make(map[string]map[string]*blocks.PartialDataColumn),
groupTTL: make(map[string]int8),
validHeaderCache: make(map[string]*ethpb.PartialDataColumnHeader),
headerHandled: make(map[string]bool),
// GossipSub sends the messages to this channel. The buffer should be
// big enough to avoid dropping messages. We don't want to block the gossipsub event loop for this.
incomingReq: make(chan request, 128*16),
logger: logger,
concurrentValidatorSemaphore: make(chan struct{}, maxConcurrentValidators),
concurrentHeaderHandlerSemaphore: make(chan struct{}, maxConcurrentHeaderHandlers),
}
}
// AppendPubSubOpts adds the necessary pubsub options to enable partial messages.
func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubsub.Option {
slogger := slog.New(logrusadapter.Handler{Logger: p.logger})
opts = append(opts,
pubsub.WithPartialMessagesExtension(&partialmessages.PartialMessagesExtension{
Logger: slogger,
MergePartsMetadata: func(topic string, left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
if len(left) == 0 {
return right
}
merged, err := bitfield.Bitlist(left).Or(bitfield.Bitlist(right))
if err != nil {
p.logger.Warn("Failed to merge bitfields", "err", err, "left", left, "right", right)
return left
}
return partialmessages.PartsMetadata(merged)
},
ValidateRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
// TODO. Add some basic and fast sanity checks
return nil
},
OnIncomingRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
select {
case p.incomingReq <- request{
kind: requestKindHandleIncomingRPC,
incomingRPC: rpcWithFrom{rpc, from},
}:
default:
p.logger.Warn("Dropping incoming partial RPC", "rpc", rpc)
}
return nil
},
}),
func(ps *pubsub.PubSub) error {
p.ps = ps
return nil
},
)
return opts
}
// Start starts the event loop of the PartialColumnBroadcaster.
// It accepts the required validator and handler functions, returning an error if any is nil.
// The event loop is launched in a goroutine.
func (p *PartialColumnBroadcaster) Start(
validateHeader HeaderValidator,
validateColumn ColumnValidator,
handleColumn SubHandler,
handleHeader HeaderHandler,
) error {
if validateHeader == nil {
return errors.New("no header validator provided")
}
if handleHeader == nil {
return errors.New("no header handler provided")
}
if validateColumn == nil {
return errors.New("no column validator provided")
}
if handleColumn == nil {
return errors.New("no column handler provided")
}
p.validateHeader = validateHeader
p.validateColumn = validateColumn
p.handleColumn = handleColumn
p.handleHeader = handleHeader
p.stop = make(chan struct{})
go p.loop()
return nil
}
func (p *PartialColumnBroadcaster) loop() {
cleanup := time.NewTicker(time.Second * time.Duration(params.BeaconConfig().SecondsPerSlot))
defer cleanup.Stop()
for {
select {
case <-p.stop:
return
case <-cleanup.C:
for groupID, ttl := range p.groupTTL {
if ttl > 0 {
p.groupTTL[groupID] = ttl - 1
continue
}
delete(p.groupTTL, groupID)
delete(p.validHeaderCache, groupID)
delete(p.headerHandled, groupID)
for topic, msgStore := range p.partialMsgStore {
delete(msgStore, groupID)
if len(msgStore) == 0 {
delete(p.partialMsgStore, topic)
}
}
}
case req := <-p.incomingReq:
switch req.kind {
case requestKindPublish:
req.response <- p.publish(req.publish.topic, req.publish.c)
case requestKindSubscribe:
req.response <- p.subscribe(req.sub.t)
case requestKindUnsubscribe:
req.response <- p.unsubscribe(req.unsub.topic)
case requestKindHandleIncomingRPC:
err := p.handleIncomingRPC(req.incomingRPC)
if err != nil {
p.logger.Error("Failed to handle incoming partial RPC", "err", err)
}
case requestKindCellsValidated:
err := p.handleCellsValidated(req.cellsValidated)
if err != nil {
p.logger.Error("Failed to handle cells validated", "err", err)
}
case requestKindHeaderHandled:
p.handleHeaderHandled(req.headerHandledGroup)
default:
p.logger.Error("Unknown request kind", "kind", req.kind)
}
}
}
}
func (p *PartialColumnBroadcaster) getDataColumn(topic string, group []byte) *blocks.PartialDataColumn {
topicStore, ok := p.partialMsgStore[topic]
if !ok {
return nil
}
msg, ok := topicStore[string(group)]
if !ok {
return nil
}
return msg
}
func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) error {
if p.ps == nil {
return errors.New("pubsub not initialized")
}
hasMessage := len(rpcWithFrom.PartialMessage) > 0
var message ethpb.PartialDataColumnSidecar
if hasMessage {
err := message.UnmarshalSSZ(rpcWithFrom.PartialMessage)
if err != nil {
return errors.Wrap(err, "failed to unmarshal partial message data")
}
}
topicID := rpcWithFrom.GetTopicID()
groupID := rpcWithFrom.GroupID
ourDataColumn := p.getDataColumn(topicID, groupID)
var shouldRepublish bool
if ourDataColumn == nil && hasMessage {
var header *ethpb.PartialDataColumnHeader
// Check cache first for this group
if cachedHeader, ok := p.validHeaderCache[string(groupID)]; ok {
header = cachedHeader
} else {
// We haven't seen this group before. Check if we have a valid header.
if len(message.Header) == 0 {
p.logger.Debug("No partial column found and no header in message, ignoring")
return nil
}
header = message.Header[0]
reject, err := p.validateHeader(header)
if err != nil {
p.logger.Debug("Header validation failed", "err", err, "reject", reject)
if reject {
// REJECT case: penalize the peer
_ = p.ps.PeerFeedback(topicID, rpcWithFrom.from, pubsub.PeerFeedbackInvalidMessage)
}
// Both REJECT and IGNORE: don't process further
return nil
}
// Cache the valid header
p.validHeaderCache[string(groupID)] = header
p.headerHandled[string(groupID)] = false
p.concurrentHeaderHandlerSemaphore <- struct{}{}
go func() {
defer func() {
<-p.concurrentHeaderHandlerSemaphore
}()
p.handleHeader(header, string(groupID))
}()
}
columnIndex, err := extractColumnIndexFromTopic(topicID)
if err != nil {
return err
}
newColumn, err := blocks.NewPartialDataColumn(
header.SignedBlockHeader,
columnIndex,
header.KzgCommitments,
header.KzgCommitmentsInclusionProof,
)
if err != nil {
p.logger.WithError(err).WithFields(logrus.Fields{
"topic": topicID,
"columnIndex": columnIndex,
"numCommitments": len(header.KzgCommitments),
}).Error("Failed to create partial data column from header")
return err
}
// Save to store
topicStore, ok := p.partialMsgStore[topicID]
if !ok {
topicStore = make(map[string]*blocks.PartialDataColumn)
p.partialMsgStore[topicID] = topicStore
}
topicStore[string(newColumn.GroupID())] = &newColumn
p.groupTTL[string(newColumn.GroupID())] = TTLInSlots
ourDataColumn = &newColumn
shouldRepublish = true
}
if ourDataColumn == nil {
// We don't have a partial column for this. Can happen if we got cells
// without a header.
return nil
}
logger := p.logger.WithFields(logrus.Fields{
"from": rpcWithFrom.from,
"topic": topicID,
"group": groupID,
})
if len(rpcWithFrom.PartialMessage) > 0 {
// TODO: is there any penalty we want to consider for giving us data we didn't request?
// Note that we need to be careful around race conditions and eager data.
// Also note that protobufs by design allow extra data that we don't parse.
// Marco's thoughts. No, we don't need to do anything else here.
cellIndices, cellsToVerify, err := ourDataColumn.CellsToVerifyFromPartialMessage(&message)
if err != nil {
return err
}
// Track cells received via partial message
if len(cellIndices) > 0 {
columnIndexStr := strconv.FormatUint(ourDataColumn.Index, 10)
partialMessageCellsReceivedTotal.WithLabelValues(columnIndexStr).Add(float64(len(cellIndices)))
}
if len(cellsToVerify) > 0 {
p.concurrentValidatorSemaphore <- struct{}{}
go func() {
defer func() {
<-p.concurrentValidatorSemaphore
}()
start := time.Now()
err := p.validateColumn(cellsToVerify)
if err != nil {
logger.Error("failed to validate cells", "err", err)
_ = p.ps.PeerFeedback(topicID, rpcWithFrom.from, pubsub.PeerFeedbackInvalidMessage)
return
}
_ = p.ps.PeerFeedback(topicID, rpcWithFrom.from, pubsub.PeerFeedbackUsefulMessage)
p.incomingReq <- request{
kind: requestKindCellsValidated,
cellsValidated: &cellsValidated{
validationTook: time.Since(start),
topic: topicID,
group: groupID,
cells: cellsToVerify,
cellIndices: cellIndices,
},
}
}()
}
}
peerHas := bitmap.Bitmap(rpcWithFrom.PartsMetadata)
iHave := bitmap.Bitmap(ourDataColumn.PartsMetadata())
if !shouldRepublish && len(peerHas) > 0 && !bytes.Equal(peerHas, iHave) {
// Either we have something they don't or vice versa
shouldRepublish = true
logger.Debug("republishing due to parts metadata difference")
}
headerHandled, ok := p.headerHandled[string(groupID)]
// we only want to skip republishing if the header is currently being handled but hasn't been handled yet.
// If the header is NOT being handled at all, these incoming cells are likely in response to a previous publish we sent
// (either when got a data column sidecar, a beacon block body or if we are a block proposer).
if ok && !headerHandled {
p.logger.Debug("Header not handled, skipping republish", "topic", topicID, "group", groupID)
return nil
}
if shouldRepublish {
err := p.ps.PublishPartialMessage(topicID, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
return err
}
}
return nil
}
func (p *PartialColumnBroadcaster) handleCellsValidated(cells *cellsValidated) error {
ourDataColumn := p.getDataColumn(cells.topic, cells.group)
if ourDataColumn == nil {
return errors.New("data column not found for verified cells")
}
extended := ourDataColumn.ExtendFromVerfifiedCells(cells.cellIndices, cells.cells)
p.logger.Debug("Extended partial message", "duration", cells.validationTook, "extended", extended)
columnIndexStr := strconv.FormatUint(ourDataColumn.Index, 10)
if extended {
// Track useful cells (cells that extended our data)
partialMessageUsefulCellsTotal.WithLabelValues(columnIndexStr).Add(float64(len(cells.cells)))
// TODO: we could use the heuristic here that if this data was
// useful to us, it's likely useful to our peers and we should
// republish eagerly
if col, ok := ourDataColumn.Complete(p.logger); ok {
p.logger.Info("Completed partial column", "topic", cells.topic, "group", cells.group)
if p.handleColumn != nil {
go p.handleColumn(cells.topic, col)
}
} else {
p.logger.Info("Extended partial column", "topic", cells.topic, "group", cells.group)
}
headerHandled, ok := p.headerHandled[string(ourDataColumn.GroupID())]
if ok && !headerHandled {
p.logger.Debug("Header not handled, skipping republish", "topic", cells.topic, "group", cells.group)
return nil
}
err := p.ps.PublishPartialMessage(cells.topic, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
return err
}
}
return nil
}
func (p *PartialColumnBroadcaster) handleHeaderHandled(groupID string) {
p.headerHandled[groupID] = true
for topic, topicStore := range p.partialMsgStore {
col, ok := topicStore[groupID]
if !ok {
continue
}
err := p.ps.PublishPartialMessage(topic, col, partialmessages.PublishOptions{})
if err != nil {
p.logger.WithError(err).Error("Failed to republish after header handled", "topic", topic, "groupID", groupID)
}
}
}
func (p *PartialColumnBroadcaster) Stop() {
if p.stop != nil {
close(p.stop)
}
}
// HeaderHandled notifies the event loop that a header has been fully processed,
// triggering republishing of all columns in the store for the given groupID.
func (p *PartialColumnBroadcaster) HeaderHandled(groupID string) error {
if p.ps == nil {
return errors.New("pubsub not initialized")
}
p.incomingReq <- request{
kind: requestKindHeaderHandled,
headerHandledGroup: groupID,
}
return nil
}
// Publish publishes the partial column.
func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataColumn) error {
if p.ps == nil {
return errors.New("pubsub not initialized")
}
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindPublish,
response: respCh,
publish: publish{
topic: topic,
c: c,
},
}
return <-respCh
}
func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataColumn) error {
topicStore, ok := p.partialMsgStore[topic]
if !ok {
topicStore = make(map[string]*blocks.PartialDataColumn)
p.partialMsgStore[topic] = topicStore
}
var extended bool
existing := topicStore[string(c.GroupID())]
if existing != nil {
// Extend the existing column with cells being published here.
// The existing column may already contain cells received from peers. We must not overwrite it.
for i := range c.Included.Len() {
if c.Included.BitAt(i) {
extended = existing.ExtendFromVerfifiedCell(uint64(i), c.Column[i], c.KzgProofs[i])
}
}
if extended {
if col, ok := existing.Complete(p.logger); ok {
p.logger.Info("Completed partial column", "topic", topic, "group", existing.GroupID())
if p.handleColumn != nil {
go p.handleColumn(topic, col)
}
}
}
} else {
topicStore[string(c.GroupID())] = &c
existing = &c
}
p.groupTTL[string(c.GroupID())] = TTLInSlots
return p.ps.PublishPartialMessage(topic, existing, partialmessages.PublishOptions{})
}
type SubHandler func(topic string, col blocks.VerifiedRODataColumn)
func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic) error {
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindSubscribe,
sub: subscribe{
t: t,
},
response: respCh,
}
return <-respCh
}
func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic) error {
topic := t.String()
if _, ok := p.topics[topic]; ok {
return errors.New("already subscribed")
}
p.topics[topic] = t
return nil
}
func (p *PartialColumnBroadcaster) Unsubscribe(topic string) error {
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindUnsubscribe,
unsub: unsubscribe{
topic: topic,
},
response: respCh,
}
return <-respCh
}
func (p *PartialColumnBroadcaster) unsubscribe(topic string) error {
t, ok := p.topics[topic]
if !ok {
return errors.New("topic not found")
}
delete(p.topics, topic)
delete(p.partialMsgStore, topic)
return t.Close()
}

View File

@@ -58,7 +58,7 @@ func TestPeerExplicitAdd(t *testing.T) {
resAddress, err := p.Address(id)
require.NoError(t, err)
assert.Equal(t, address.Equal(resAddress), true, "Unexpected address")
assert.Equal(t, address, resAddress, "Unexpected address")
resDirection, err := p.Direction(id)
require.NoError(t, err)
@@ -72,7 +72,7 @@ func TestPeerExplicitAdd(t *testing.T) {
resAddress2, err := p.Address(id)
require.NoError(t, err)
assert.Equal(t, address2.Equal(resAddress2), true, "Unexpected address")
assert.Equal(t, address2, resAddress2, "Unexpected address")
resDirection2, err := p.Direction(id)
require.NoError(t, err)

View File

@@ -170,7 +170,7 @@ func (s *Service) pubsubOptions() []pubsub.Option {
pubsub.WithPeerScore(peerScoringParams(s.cfg.IPColocationWhitelist)),
pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute),
pubsub.WithGossipSubParams(pubsubGossipParam()),
pubsub.WithRawTracer(&gossipTracer{host: s.host, allowedTopics: filt}),
pubsub.WithRawTracer(gossipTracer{host: s.host}),
}
if len(s.cfg.StaticPeers) > 0 {
@@ -181,9 +181,6 @@ func (s *Service) pubsubOptions() []pubsub.Option {
}
psOpts = append(psOpts, pubsub.WithDirectPeers(directPeersAddrInfos))
}
if s.partialColumnBroadcaster != nil {
psOpts = s.partialColumnBroadcaster.AppendPubSubOpts(psOpts)
}
return psOpts
}

View File

@@ -1,8 +1,6 @@
package p2p
import (
"sync"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
@@ -10,7 +8,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
var _ = pubsub.RawTracer(&gossipTracer{})
var _ = pubsub.RawTracer(gossipTracer{})
// Initializes the values for the pubsub rpc action.
type action int
@@ -25,160 +23,85 @@ const (
// and broadcasted through gossipsub.
type gossipTracer struct {
host host.Host
allowedTopics pubsub.SubscriptionFilter
mu sync.Mutex
// map topic -> Set(peerID). Peer is in set if it supports partial messages.
partialMessagePeers map[string]map[peer.ID]struct{}
// map topic -> Set(peerID). Peer is in set if in the mesh.
meshPeers map[string]map[peer.ID]struct{}
}
// AddPeer .
func (g *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {
func (g gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {
// no-op
}
// RemovePeer .
func (g *gossipTracer) RemovePeer(p peer.ID) {
g.mu.Lock()
defer g.mu.Unlock()
for _, peers := range g.partialMessagePeers {
delete(peers, p)
}
for topic, peers := range g.meshPeers {
if _, ok := peers[p]; ok {
delete(peers, p)
g.updateMeshPeersMetric(topic)
}
}
func (g gossipTracer) RemovePeer(p peer.ID) {
// no-op
}
// Join .
func (g *gossipTracer) Join(topic string) {
func (g gossipTracer) Join(topic string) {
pubsubTopicsActive.WithLabelValues(topic).Set(1)
g.mu.Lock()
defer g.mu.Unlock()
if g.partialMessagePeers == nil {
g.partialMessagePeers = make(map[string]map[peer.ID]struct{})
}
if g.partialMessagePeers[topic] == nil {
g.partialMessagePeers[topic] = make(map[peer.ID]struct{})
}
if g.meshPeers == nil {
g.meshPeers = make(map[string]map[peer.ID]struct{})
}
if g.meshPeers[topic] == nil {
g.meshPeers[topic] = make(map[peer.ID]struct{})
}
}
// Leave .
func (g *gossipTracer) Leave(topic string) {
func (g gossipTracer) Leave(topic string) {
pubsubTopicsActive.WithLabelValues(topic).Set(0)
g.mu.Lock()
defer g.mu.Unlock()
delete(g.partialMessagePeers, topic)
delete(g.meshPeers, topic)
}
// Graft .
func (g *gossipTracer) Graft(p peer.ID, topic string) {
func (g gossipTracer) Graft(p peer.ID, topic string) {
pubsubTopicsGraft.WithLabelValues(topic).Inc()
g.mu.Lock()
defer g.mu.Unlock()
if m, ok := g.meshPeers[topic]; ok {
m[p] = struct{}{}
}
g.updateMeshPeersMetric(topic)
}
// Prune .
func (g *gossipTracer) Prune(p peer.ID, topic string) {
func (g gossipTracer) Prune(p peer.ID, topic string) {
pubsubTopicsPrune.WithLabelValues(topic).Inc()
g.mu.Lock()
defer g.mu.Unlock()
if m, ok := g.meshPeers[topic]; ok {
delete(m, p)
}
g.updateMeshPeersMetric(topic)
}
// ValidateMessage .
func (g *gossipTracer) ValidateMessage(msg *pubsub.Message) {
func (g gossipTracer) ValidateMessage(msg *pubsub.Message) {
pubsubMessageValidate.WithLabelValues(*msg.Topic).Inc()
}
// DeliverMessage .
func (g *gossipTracer) DeliverMessage(msg *pubsub.Message) {
func (g gossipTracer) DeliverMessage(msg *pubsub.Message) {
pubsubMessageDeliver.WithLabelValues(*msg.Topic).Inc()
}
// RejectMessage .
func (g *gossipTracer) RejectMessage(msg *pubsub.Message, reason string) {
func (g gossipTracer) RejectMessage(msg *pubsub.Message, reason string) {
pubsubMessageReject.WithLabelValues(*msg.Topic, reason).Inc()
}
// DuplicateMessage .
func (g *gossipTracer) DuplicateMessage(msg *pubsub.Message) {
func (g gossipTracer) DuplicateMessage(msg *pubsub.Message) {
pubsubMessageDuplicate.WithLabelValues(*msg.Topic).Inc()
}
// UndeliverableMessage .
func (g *gossipTracer) UndeliverableMessage(msg *pubsub.Message) {
func (g gossipTracer) UndeliverableMessage(msg *pubsub.Message) {
pubsubMessageUndeliverable.WithLabelValues(*msg.Topic).Inc()
}
// ThrottlePeer .
func (g *gossipTracer) ThrottlePeer(p peer.ID) {
func (g gossipTracer) ThrottlePeer(p peer.ID) {
agent := agentFromPid(p, g.host.Peerstore())
pubsubPeerThrottle.WithLabelValues(agent).Inc()
}
// RecvRPC .
func (g *gossipTracer) RecvRPC(rpc *pubsub.RPC) {
from := rpc.From()
g.setMetricFromRPC(recv, pubsubRPCSubRecv, pubsubRPCPubRecv, pubsubRPCPubRecvSize, pubsubRPCRecv, rpc)
g.mu.Lock()
defer g.mu.Unlock()
for _, sub := range rpc.Subscriptions {
topic := sub.GetTopicid()
if !g.allowedTopics.CanSubscribe(topic) {
continue
}
if g.partialMessagePeers == nil {
g.partialMessagePeers = make(map[string]map[peer.ID]struct{})
}
m, ok := g.partialMessagePeers[topic]
if !ok {
m = make(map[peer.ID]struct{})
g.partialMessagePeers[topic] = m
}
if sub.GetSubscribe() && sub.GetRequestsPartial() {
m[from] = struct{}{}
} else {
delete(m, from)
if len(m) == 0 {
delete(g.partialMessagePeers, topic)
}
}
}
func (g gossipTracer) RecvRPC(rpc *pubsub.RPC) {
g.setMetricFromRPC(recv, pubsubRPCSubRecv, pubsubRPCPubRecv, pubsubRPCRecv, rpc)
}
// SendRPC .
func (g *gossipTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {
g.setMetricFromRPC(send, pubsubRPCSubSent, pubsubRPCPubSent, pubsubRPCPubSentSize, pubsubRPCSent, rpc)
func (g gossipTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {
g.setMetricFromRPC(send, pubsubRPCSubSent, pubsubRPCPubSent, pubsubRPCSent, rpc)
}
// DropRPC .
func (g *gossipTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {
g.setMetricFromRPC(drop, pubsubRPCSubDrop, pubsubRPCPubDrop, pubsubRPCPubDropSize, pubsubRPCDrop, rpc)
func (g gossipTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {
g.setMetricFromRPC(drop, pubsubRPCSubDrop, pubsubRPCPubDrop, pubsubRPCDrop, rpc)
}
func (g *gossipTracer) setMetricFromRPC(act action, subCtr prometheus.Counter, pubCtr, pubSizeCtr, ctrlCtr *prometheus.CounterVec, rpc *pubsub.RPC) {
func (g gossipTracer) setMetricFromRPC(act action, subCtr prometheus.Counter, pubCtr, ctrlCtr *prometheus.CounterVec, rpc *pubsub.RPC) {
subCtr.Add(float64(len(rpc.Subscriptions)))
if rpc.Control != nil {
ctrlCtr.WithLabelValues("graft").Add(float64(len(rpc.Control.Graft)))
@@ -187,41 +110,12 @@ func (g *gossipTracer) setMetricFromRPC(act action, subCtr prometheus.Counter, p
ctrlCtr.WithLabelValues("iwant").Add(float64(len(rpc.Control.Iwant)))
ctrlCtr.WithLabelValues("idontwant").Add(float64(len(rpc.Control.Idontwant)))
}
// For incoming messages from pubsub, we do not record metrics for them as these values
// could be junk.
if act == recv {
return
}
for _, msg := range rpc.Publish {
pubCtr.WithLabelValues(msg.GetTopic()).Inc()
pubSizeCtr.WithLabelValues(msg.GetTopic(), "false").Add(float64(msg.Size()))
}
if rpc.Partial != nil {
pubCtr.WithLabelValues(rpc.Partial.GetTopicID()).Inc()
pubSizeCtr.WithLabelValues(rpc.Partial.GetTopicID(), "true").Add(float64(rpc.Partial.Size()))
}
}
// updateMeshPeersMetric requires the caller to hold the state mutex
func (g *gossipTracer) updateMeshPeersMetric(topic string) {
meshPeers, ok := g.meshPeers[topic]
if !ok {
return
}
partialPeers, ok := g.partialMessagePeers[topic]
if !ok {
return
}
var supportsPartial, doesNotSupportPartial float64
for p := range meshPeers {
if _, ok := partialPeers[p]; ok {
supportsPartial++
} else {
doesNotSupportPartial++
// For incoming messages from pubsub, we do not record metrics for them as these values
// could be junk.
if act == recv {
continue
}
pubCtr.WithLabelValues(*msg.Topic).Inc()
}
pubsubMeshPeers.WithLabelValues(topic, "true").Set(supportsPartial)
pubsubMeshPeers.WithLabelValues(topic, "false").Set(doesNotSupportPartial)
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/OffchainLabs/prysm/v7/async"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers/scorers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/types"
@@ -78,7 +77,6 @@ type Service struct {
privKey *ecdsa.PrivateKey
metaData metadata.Metadata
pubsub *pubsub.PubSub
partialColumnBroadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster
joinedTopics map[string]*pubsub.Topic
joinedTopicsLock sync.RWMutex
subnetsLock map[uint64]*sync.RWMutex
@@ -149,10 +147,6 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
custodyInfoSet: make(chan struct{}),
}
if cfg.PartialDataColumns {
s.partialColumnBroadcaster = partialdatacolumnbroadcaster.NewBroadcaster(log.Logger)
}
ipAddr := prysmnetwork.IPAddr()
opts, err := s.buildOptions(ipAddr, s.privKey)
@@ -320,7 +314,6 @@ func (s *Service) Stop() error {
if s.dv5Listener != nil {
s.dv5Listener.Close()
}
return nil
}
@@ -357,10 +350,6 @@ func (s *Service) PubSub() *pubsub.PubSub {
return s.pubsub
}
func (s *Service) PartialColumnBroadcaster() *partialdatacolumnbroadcaster.PartialColumnBroadcaster {
return s.partialColumnBroadcaster
}
// Host returns the currently running libp2p
// host of the service.
func (s *Service) Host() host.Host {

View File

@@ -21,7 +21,6 @@ go_library(
deps = [
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/partialdatacolumnbroadcaster:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//config/fieldparams:go_default_library",

View File

@@ -4,7 +4,6 @@ import (
"context"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
@@ -109,10 +108,6 @@ func (*FakeP2P) PubSub() *pubsub.PubSub {
return nil
}
func (*FakeP2P) PartialColumnBroadcaster() *partialdatacolumnbroadcaster.PartialColumnBroadcaster {
return nil
}
// MetadataSeq -- fake.
func (*FakeP2P) MetadataSeq() uint64 {
return 0
@@ -174,7 +169,7 @@ func (*FakeP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interfac
}
// BroadcastDataColumnSidecar -- fake.
func (*FakeP2P) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn, _ []blocks.PartialDataColumn) error {
func (*FakeP2P) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn) error {
return nil
}

View File

@@ -63,7 +63,7 @@ func (m *MockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
}
// BroadcastDataColumnSidecar broadcasts a data column for mock.
func (m *MockBroadcaster) BroadcastDataColumnSidecars(context.Context, []blocks.VerifiedRODataColumn, []blocks.PartialDataColumn) error {
func (m *MockBroadcaster) BroadcastDataColumnSidecars(context.Context, []blocks.VerifiedRODataColumn) error {
m.BroadcastCalled.Store(true)
return nil
}

View File

@@ -13,7 +13,6 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers/scorers"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
@@ -139,6 +138,9 @@ func connect(a, b host.Host) error {
func (p *TestP2P) ReceiveRPC(topic string, msg proto.Message) {
h, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{}))
require.NoError(p.t, err)
p.t.Cleanup(func() {
require.NoError(p.t, h.Close())
})
if err := connect(h, p.BHost); err != nil {
p.t.Fatalf("Failed to connect two peers for RPC: %v", err)
}
@@ -170,6 +172,9 @@ func (p *TestP2P) ReceiveRPC(topic string, msg proto.Message) {
func (p *TestP2P) ReceivePubSub(topic string, msg proto.Message) {
h, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{}))
require.NoError(p.t, err)
p.t.Cleanup(func() {
require.NoError(p.t, h.Close())
})
ps, err := pubsub.NewFloodSub(context.Background(), h,
pubsub.WithMessageSigning(false),
pubsub.WithStrictSignatureVerification(false),
@@ -243,7 +248,7 @@ func (p *TestP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interf
}
// BroadcastDataColumnSidecar broadcasts a data column for mock.
func (p *TestP2P) BroadcastDataColumnSidecars(context.Context, []blocks.VerifiedRODataColumn, []blocks.PartialDataColumn) error {
func (p *TestP2P) BroadcastDataColumnSidecars(context.Context, []blocks.VerifiedRODataColumn) error {
p.BroadcastCalled.Store(true)
return nil
}
@@ -309,10 +314,6 @@ func (p *TestP2P) PubSub() *pubsub.PubSub {
return p.pubsub
}
func (p *TestP2P) PartialColumnBroadcaster() *partialdatacolumnbroadcaster.PartialColumnBroadcaster {
return nil
}
// Disconnect from a peer.
func (p *TestP2P) Disconnect(pid peer.ID) error {
return p.BHost.Network().ClosePeer(pid)

View File

@@ -46,6 +46,8 @@ const (
GossipLightClientOptimisticUpdateMessage = "light_client_optimistic_update"
// GossipDataColumnSidecarMessage is the name for the data column sidecar message type.
GossipDataColumnSidecarMessage = "data_column_sidecar"
// GossipPayloadAttestationMessage is the name for the payload attestation message type.
GossipPayloadAttestationMessage = "payload_attestation_message"
// Topic Formats
//
@@ -75,6 +77,8 @@ const (
LightClientOptimisticUpdateTopicFormat = GossipProtocolAndDigest + GossipLightClientOptimisticUpdateMessage
// DataColumnSubnetTopicFormat is the topic format for the data column subnet.
DataColumnSubnetTopicFormat = GossipProtocolAndDigest + GossipDataColumnSidecarMessage + "_%d"
// PayloadAttestationMessageTopicFormat is the topic format for payload attestation messages.
PayloadAttestationMessageTopicFormat = GossipProtocolAndDigest + GossipPayloadAttestationMessage
)
// topic is a struct representing a single gossipsub topic.
@@ -141,7 +145,7 @@ func (s *Service) allTopics() []topic {
cfg := params.BeaconConfig()
// bellatrix: no special topics; electra: blobs topics handled all together
genesis, altair, capella := cfg.GenesisEpoch, cfg.AltairForkEpoch, cfg.CapellaForkEpoch
deneb, fulu, future := cfg.DenebForkEpoch, cfg.FuluForkEpoch, cfg.FarFutureEpoch
deneb, fulu, gloas, future := cfg.DenebForkEpoch, cfg.FuluForkEpoch, cfg.GloasForkEpoch, cfg.FarFutureEpoch
// Templates are starter topics - they have a placeholder digest and the subnet is set to the maximum value
// for the subnet (see how this is used in allSubnetsBelow). These are not directly returned by the method,
// they are copied and modified for each digest where they apply based on the start and end epochs.
@@ -158,6 +162,7 @@ func (s *Service) allTopics() []topic {
newTopic(altair, future, empty, GossipLightClientOptimisticUpdateMessage),
newTopic(altair, future, empty, GossipLightClientFinalityUpdateMessage),
newTopic(capella, future, empty, GossipBlsToExecutionChangeMessage),
newTopic(gloas, future, empty, GossipPayloadAttestationMessage),
}
last := params.GetNetworkScheduleEntry(genesis)
schedule := []params.NetworkScheduleEntry{last}

View File

@@ -7,7 +7,6 @@ import (
"sync"
"time"
"github.com/OffchainLabs/go-bitfield"
builderapi "github.com/OffchainLabs/prysm/v7/api/client/builder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v7/beacon-chain/builder"
@@ -309,7 +308,6 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
}
rob, err := blocks.NewROBlockWithRoot(block, root)
var partialColumns []blocks.PartialDataColumn
if block.IsBlinded() {
block, blobSidecars, err = vs.handleBlindedBlock(ctx, block)
if errors.Is(err, builderapi.ErrBadGateway) {
@@ -317,7 +315,7 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
}
} else if block.Version() >= version.Deneb {
blobSidecars, dataColumnSidecars, partialColumns, err = vs.handleUnblindedBlock(rob, req)
blobSidecars, dataColumnSidecars, err = vs.handleUnblindedBlock(rob, req)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "%s: %v", "handle block failed", err)
@@ -337,7 +335,7 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
wg.Wait()
if err := vs.broadcastAndReceiveSidecars(ctx, block, root, blobSidecars, dataColumnSidecars, partialColumns); err != nil {
if err := vs.broadcastAndReceiveSidecars(ctx, block, root, blobSidecars, dataColumnSidecars); err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive sidecars: %v", err)
}
if err := <-errChan; err != nil {
@@ -354,10 +352,9 @@ func (vs *Server) broadcastAndReceiveSidecars(
root [fieldparams.RootLength]byte,
blobSidecars []*ethpb.BlobSidecar,
dataColumnSidecars []blocks.RODataColumn,
partialColumns []blocks.PartialDataColumn,
) error {
if block.Version() >= version.Fulu {
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSidecars, partialColumns); err != nil {
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSidecars); err != nil {
return errors.Wrap(err, "broadcast and receive data columns")
}
return nil
@@ -406,41 +403,34 @@ func (vs *Server) handleBlindedBlock(ctx context.Context, block interfaces.Signe
func (vs *Server) handleUnblindedBlock(
block blocks.ROBlock,
req *ethpb.GenericSignedBeaconBlock,
) ([]*ethpb.BlobSidecar, []blocks.RODataColumn, []blocks.PartialDataColumn, error) {
) ([]*ethpb.BlobSidecar, []blocks.RODataColumn, error) {
rawBlobs, proofs, err := blobsAndProofs(req)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
if block.Version() >= version.Fulu {
// Compute cells and proofs from the blobs and cell proofs.
cellsPerBlob, proofsPerBlob, err := peerdas.ComputeCellsAndProofsFromFlat(rawBlobs, proofs)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "compute cells and proofs")
return nil, nil, errors.Wrap(err, "compute cells and proofs")
}
// Construct data column sidecars from the signed block and cells and proofs.
roDataColumnSidecars, err := peerdas.DataColumnSidecars(cellsPerBlob, proofsPerBlob, peerdas.PopulateFromBlock(block))
if err != nil {
return nil, nil, nil, errors.Wrap(err, "data column sidcars")
return nil, nil, errors.Wrap(err, "data column sidcars")
}
included := bitfield.NewBitlist(uint64(len(cellsPerBlob)))
included = included.Not() // all bits set to 1
partialColumns, err := peerdas.PartialColumns(included, cellsPerBlob, proofsPerBlob, peerdas.PopulateFromBlock(block))
if err != nil {
return nil, nil, nil, errors.Wrap(err, "data column sidcars")
}
return nil, roDataColumnSidecars, partialColumns, nil
return nil, roDataColumnSidecars, nil
}
blobSidecars, err := BuildBlobSidecars(block, rawBlobs, proofs)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "build blob sidecars")
return nil, nil, errors.Wrap(err, "build blob sidecars")
}
return blobSidecars, nil, nil, nil
return blobSidecars, nil, nil
}
// broadcastReceiveBlock broadcasts a block and handles its reception.
@@ -507,7 +497,7 @@ func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethp
}
// broadcastAndReceiveDataColumns handles the broadcasting and reception of data columns sidecars.
func (vs *Server) broadcastAndReceiveDataColumns(ctx context.Context, roSidecars []blocks.RODataColumn, partialColumns []blocks.PartialDataColumn) error {
func (vs *Server) broadcastAndReceiveDataColumns(ctx context.Context, roSidecars []blocks.RODataColumn) error {
// We built this block ourselves, so we can upgrade the read only data column sidecar into a verified one.
verifiedSidecars := make([]blocks.VerifiedRODataColumn, 0, len(roSidecars))
for _, sidecar := range roSidecars {
@@ -516,7 +506,7 @@ func (vs *Server) broadcastAndReceiveDataColumns(ctx context.Context, roSidecars
}
// Broadcast sidecars (non blocking).
if err := vs.P2P.BroadcastDataColumnSidecars(ctx, verifiedSidecars, partialColumns); err != nil {
if err := vs.P2P.BroadcastDataColumnSidecars(ctx, verifiedSidecars); err != nil {
return errors.Wrap(err, "broadcast data column sidecars")
}

View File

@@ -13,6 +13,7 @@ type writeOnlyGloasFields interface {
RotateBuilderPendingPayments() error
AppendBuilderPendingWithdrawals([]*ethpb.BuilderPendingWithdrawal) error
UpdateExecutionPayloadAvailabilityAtIndex(idx uint64, val byte) error
UpdatePendingPaymentWeight(att ethpb.Att, indices []uint64, participatedFlags map[uint8]bool) error
}
type readOnlyGloasFields interface {
@@ -20,5 +21,8 @@ type readOnlyGloasFields interface {
IsActiveBuilder(primitives.BuilderIndex) (bool, error)
CanBuilderCoverBid(primitives.BuilderIndex, primitives.Gwei) (bool, error)
LatestBlockHash() ([32]byte, error)
IsAttestationSameSlot(blockRoot [32]byte, slot primitives.Slot) (bool, error)
BuilderPendingPayment(index uint64) (*ethpb.BuilderPendingPayment, error)
BuilderPendingPayments() ([]*ethpb.BuilderPendingPayment, error)
ExecutionPayloadAvailability(slot primitives.Slot) (uint64, error)
}

Some files were not shown because too many files have changed in this diff Show More