Compare commits

..

33 Commits

Author SHA1 Message Date
satushh
0a22ae9803 no need to pass epoch to PTCDuties 2026-02-13 20:30:09 +05:30
satushh
3eb4f7bdba use correct function 2026-02-13 00:02:41 +05:30
satushh
b83907cdf3 Merge branch 'develop' into ptc-duty-endpoint 2026-02-12 23:58:20 +05:30
terence
63b69a63b1 Add gossip for payload envelope (#16349)
This PR implements gossip for execution payload envelope as outlined in
the following spec:

https://github.com/ethereum/consensus-specs/blob/master/specs/gloas/p2p-interface.md#execution_payload
2026-02-12 16:52:13 +00:00
satushh
fd98fbddd5 lint 2026-02-12 22:04:02 +05:30
satushh
5d528f8a69 use attestationDependentRoot in GetPTCDuties 2026-02-12 21:57:03 +05:30
satushh
cf63d112be Graffiti implementation (#16089)
<!-- Thanks for sending a PR! Before submitting:

1. If this is your first PR, check out our contribution guide here
https://docs.prylabs.network/docs/contribute/contribution-guidelines
You will then need to sign our Contributor License Agreement (CLA),
which will show up as a comment from a bot in this pull request after
you open it. We cannot review code without a signed CLA.
2. Please file an associated tracking issue if this pull request is
non-trivial and requires context for our team to understand. All
features and most bug fixes should have
an associated issue with a design discussed and decided upon. Small bug
   fixes and documentation improvements don't need issues.
3. New features and bug fixes must have tests. Documentation may need to
be updated. If you're unsure what to update, send the PR, and we'll
discuss
   in review.
4. Note that PRs updating dependencies and new Go versions are not
accepted.
   Please file an issue instead.
5. A changelog entry is required for user facing issues.
-->

**What type of PR is this?**

Feature

**What does this PR do? Why is it needed?**

This PR implements graffiti as described in the corresponding spec doc
`graffiti-proposal-brief.md `

**Which issues(s) does this PR fix?**

- https://github.com/OffchainLabs/prysm/issues/13558

**Other notes for review**

**Acknowledgements**

- [ ] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [ ] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [ ] I have added a description to this PR with sufficient context for
reviewers to understand this PR.
2026-02-12 12:35:15 +00:00
terence
6c045083a6 gloas: add modified attestation processing (#15736)
This PR implements
[process_attestation](https://github.com/ethereum/consensus-specs/blob/master/specs/gloas/beacon-chain.md#modified-process_attestation)
alongside spec tests

---------

Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
2026-02-11 23:22:42 +00:00
james-prysm
09d0338aa9 adding db functions for saving gloas block and payload (#16301)
<!-- Thanks for sending a PR! Before submitting:

1. If this is your first PR, check out our contribution guide here
https://docs.prylabs.network/docs/contribute/contribution-guidelines
You will then need to sign our Contributor License Agreement (CLA),
which will show up as a comment from a bot in this pull request after
you open it. We cannot review code without a signed CLA.
2. Please file an associated tracking issue if this pull request is
non-trivial and requires context for our team to understand. All
features and most bug fixes should have
an associated issue with a design discussed and decided upon. Small bug
   fixes and documentation improvements don't need issues.
3. New features and bug fixes must have tests. Documentation may need to
be updated. If you're unsure what to update, send the PR, and we'll
discuss
   in review.
4. Note that PRs updating dependencies and new Go versions are not
accepted.
   Please file an issue instead.
5. A changelog entry is required for user facing issues.
-->

**What type of PR is this?**

Feature

**What does this PR do? Why is it needed?**

gloas doesn't have the concept of blinded block anymore so instead we
save the full gloas block. that being said the full block does not
contain the payload envelope so there are separate functions for saving
those. this pr introduces these types and functions, the payload
envelope doesn't actually get saved yet in this pr.

a TODO comment is added for pruning as well

references epbs branch

**Which issues(s) does this PR fix?**

Fixes #

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [ ] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-02-11 17:59:25 +00:00
james-prysm
eaf3aa3e8e simplify get and post block parsing for REST (#16307)
**What type of PR is this?**

 Feature


**What does this PR do? Why is it needed?**

PR is attempts to remove code duplication and process through a map of
configurations for get and post block apis. this should simplify
maintainability.

**Which issues(s) does this PR fix?**

Fixes #

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-02-10 18:28:55 +00:00
terence
9c49bb484c Add gossip for payload attestation (#16333)
This PR implements gossip validation and subscription for payload
attestation
2026-02-10 16:17:22 +00:00
terence
bb0f70ad60 gloas: add read only wrapper for payload envelope (#16339)
This PR adds read only wrapper for execution payload envelope
2026-02-09 17:23:12 +00:00
satushh
dc66f8872d Close libp2p host (#16313)
**What type of PR is this?**

 Other

**What does this PR do? Why is it needed?**

Close host to save resource

**Which issues(s) does this PR fix?**

Fixes #

**Other notes for review**

**Acknowledgements**

- [ ] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [ ] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [ ] I have added a description with sufficient context for reviewers
to understand this PR.
- [ ] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).

---------

Co-authored-by: Bastin <43618253+Inspector-Butters@users.noreply.github.com>
2026-02-09 14:40:07 +00:00
Preston Van Loon
db2bb5505c db: Copy byte slices that live outside of the view transaction (#16332)
**What type of PR is this?**

Bug fix

**What does this PR do? Why is it needed?**

The bbolt documentation suggests that the byte slices used during the
View transaction should not be used outside of the transaction and
mutating those slices could break things.

**Which issues(s) does this PR fix?**

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-02-07 14:10:07 +00:00
terence
14f01bbc6c gloas: move kzg commitments to bid (#16309)
This PR moves kzg commitments to bid. The rationale behind is captured
in this [issue](https://github.com/ethereum/consensus-specs/issues/4870)
* Moves blob KZG commitments to the earlier point where builder intent
is known
* Removes duplicated commitments from data column sidecars which saves
descent b/w per slot
* Enables nodes to start fetching blobs via getBlobs as soon as the bid
is received
* Slightly increases bid size and may add minor bidding channel latency
but the tradeoff favors lower network load and simpler DA handling
2026-02-06 23:07:54 +00:00
Potuz
c3e74e4a5d Remove unused method in forkchoice (#16337) 2026-02-06 19:21:58 +00:00
Potuz
e7ae6a004b Remove unused method in forkchoice (#16331) 2026-02-06 15:30:50 +00:00
satushh
d130778def Merge branch 'develop' into ptc-duty-endpoint 2026-02-06 13:48:51 +05:30
satushh
7529f9c6cb add endpoint to test 2026-02-05 20:49:29 +05:30
satushh
3c9881a675 lint 2026-02-05 20:11:42 +05:30
satushh
4b42fd5fc9 lint 2026-02-05 19:40:36 +05:30
Bastin
862fb2eb4a Fix gen-logs.sh - gitignore bug (#16328)
**What does this PR do? Why is it needed?**
`gen-logs.sh` was skipping `cmd/beacon-chain/execution/` due to a rule
in `.gitignore`.
Added a fix in `gen-logs.sh` to ignore `.gitignore` entries by
specification.
2026-02-05 14:06:42 +00:00
satushh
11aeed325e bazel build 2026-02-05 19:27:51 +05:30
Potuz
bb80a9c832 Remove unused map in forkchoice (#16329)
nodeByPayload was not being used.
2026-02-05 13:25:06 +00:00
satushh
55ad19a512 deduplication and bad request 2026-02-05 17:56:40 +05:30
Bastin
c1b668a50a Fix logging issue (#16322)
**What does this PR do? Why is it needed?**
This PR, in an attempt to fix the logging issue described in #16314,
does the following:
- Adds a new field `Identifier` to the `WriterHook` struct, and filters
out log entries that have the key `log_target` and the value of the
hook's `Identifier`. For now the identifiers are `ephemeral` and `user`,
differentiating between the user facing terminal/log file, and the
debugger facing ephemeral log file.
- Stores the value of the `--verbosity` and `--log.vmodule` flags in
`io/logs`, so it can be accessed by packages that need to know the
verbosity they're logging with. (note that since #16272 each package can
have a different verbosity, so verbosity is now defined per package
instead of globally)
- Improves the calculation of the global logging level by ignoring the
`ephemeralLogFileVerbosity` when the `--disable-ephemeral-log-file` flag
is enabled.
- Uses these added logic to fix the problem in
`logStateTransitionData()` (described in #16314)

Note: since we're saving this new data in `io/logs`, we should refactor
`prefixFormatter` to read the data from here. but that creates a
circular import error. I will try to fix this and refactor the formatter
in a future PR.

---------

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
2026-02-05 10:24:50 +00:00
satushh
bd70298e94 use correct formula for dependent root 2026-02-05 15:26:13 +05:30
satushh
b330a0571a unit tests and optimised get of duties 2026-02-05 13:26:00 +05:30
Justin Traglia
fab687d96d Improve ethspecify integration (#16304)
**What type of PR is this?**

Documentation

**What does this PR do? Why is it needed?**

* Move the ethspecify config from `/specrefs/.ethspecify` to
`/.ethspecify`.
* This allows developers to use inline specrefs (eg spec functions in
godoc comments).
* To do this, simply add a spec tag and run `ethspecify` to populate it.
* Clean up specref exceptions; organize by upgrade & put items in the
correct section.
* Update a few godoc comments to use the new inline specref feature.
* Update check-specrefs GitHub action so that it enforces up-to-date
godocs.
* Standardize specref naming; requiring a `#fork` tag for everything.
* Add new specrefs (which haven't been implemented yet) which were
missing.

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).

---------

Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
2026-02-04 18:44:01 +00:00
satushh
f73073cfdf fix some edge cases 2026-02-04 23:37:31 +05:30
james-prysm
cf94ccbf72 node fallback cleanup (#16316)
**What type of PR is this?**

 Other

**What does this PR do? Why is it needed?**

Follow up to https://github.com/OffchainLabs/prysm/pull/16215 this pr
improves logging, fixes stuttering in package naming, adds additional
unit tests, and deduplicates fallback node code.

**Which issues(s) does this PR fix?**

fixes a potential race if reconnecting to the same host very quickly
which has a stale connection still.

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-02-04 15:59:42 +00:00
satushh
0076fc78bd Fix PTC duties endpoint state selection and dependent root handling 2026-02-04 20:46:48 +05:30
satushh
af2a6c29a6 Implement /eth/v1/validator/duties/ptc/{epoch} endpoint initial commit 2026-02-04 18:06:09 +05:30
304 changed files with 12925 additions and 7449 deletions

View File

@@ -1,25 +1,39 @@
version: v1.7.0-alpha.1
version: v1.7.0-alpha.2
style: full
specrefs:
search_root: ..
search_root: .
auto_standardize_names: true
auto_add_missing_entries: true
require_exceptions_have_fork: true
files:
- configs.yml
- constants.yml
- containers.yml
- dataclasses.yml
- functions.yml
- presets.yml
- specrefs/configs.yml
- specrefs/constants.yml
- specrefs/containers.yml
- specrefs/dataclasses.yml
- specrefs/functions.yml
- specrefs/presets.yml
exceptions:
presets:
# Not implemented: gloas (future fork)
# gloas
- BUILDER_PENDING_WITHDRAWALS_LIMIT#gloas
- MAX_PAYLOAD_ATTESTATIONS#gloas
- PTC_SIZE#gloas
constants:
# Constants in the KZG library
# phase0
- BASIS_POINTS#phase0
- ENDIANNESS#phase0
- MAX_CONCURRENT_REQUESTS#phase0
- UINT64_MAX#phase0
- UINT64_MAX_SQRT#phase0
# altair
- PARTICIPATION_FLAG_WEIGHTS#altair
# bellatrix
- SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY#bellatrix
# deneb
- BLS_MODULUS#deneb
- BYTES_PER_COMMITMENT#deneb
- BYTES_PER_FIELD_ELEMENT#deneb
@@ -33,18 +47,9 @@ exceptions:
- PRIMITIVE_ROOT_OF_UNITY#deneb
- RANDOM_CHALLENGE_KZG_BATCH_DOMAIN#deneb
- RANDOM_CHALLENGE_KZG_CELL_BATCH_DOMAIN#fulu
# Not implemented
- BASIS_POINTS#phase0
- ENDIANNESS#phase0
- MAX_CONCURRENT_REQUESTS#phase0
- PARTICIPATION_FLAG_WEIGHTS#altair
- SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY#bellatrix
# fulu
- UINT256_MAX#fulu
- UINT64_MAX#phase0
- UINT64_MAX_SQRT#phase0
# Not implemented: gloas (future fork)
# gloas
- BUILDER_PAYMENT_THRESHOLD_DENOMINATOR#gloas
- BUILDER_PAYMENT_THRESHOLD_NUMERATOR#gloas
- BUILDER_WITHDRAWAL_PREFIX#gloas
@@ -61,61 +66,62 @@ exceptions:
- PTC_TIMELINESS_INDEX#gloas
configs:
# Not implemented: gloas (future fork)
# gloas
- AGGREGATE_DUE_BPS_GLOAS#gloas
- ATTESTATION_DUE_BPS_GLOAS#gloas
- CONTRIBUTION_DUE_BPS_GLOAS#gloas
- GLOAS_FORK_EPOCH#gloas
- GLOAS_FORK_VERSION#gloas
- MAX_REQUEST_PAYLOADS#gloas
- MIN_BUILDER_WITHDRAWABILITY_DELAY#gloas
- PAYLOAD_ATTESTATION_DUE_BPS#gloas
- SYNC_MESSAGE_DUE_BPS_GLOAS#gloas
- MIN_BUILDER_WITHDRAWABILITY_DELAY#gloas
ssz_objects:
# Not implemented
# phase0
- Eth1Block#phase0
- MatrixEntry#fulu
# Not implemented: capella
# capella
- LightClientBootstrap#capella
- LightClientFinalityUpdate#capella
- LightClientOptimisticUpdate#capella
- LightClientUpdate#capella
# Not implemented: gloas (future fork)
# fulu
- MatrixEntry#fulu
# gloas
- BeaconBlockBody#gloas
- BeaconState#gloas
- Builder#gloas
- BuilderPendingPayment#gloas
- BuilderPendingWithdrawal#gloas
- DataColumnSidecar#gloas
- ExecutionPayloadEnvelope#gloas
- ExecutionPayloadBid#gloas
- ExecutionPayloadEnvelope#gloas
- ForkChoiceNode#gloas
- IndexedPayloadAttestation#gloas
- PayloadAttestation#gloas
- PayloadAttestationData#gloas
- PayloadAttestationMessage#gloas
- SignedExecutionPayloadEnvelope#gloas
- SignedExecutionPayloadBid#gloas
- Builder#gloas
- ProposerPreferences#gloas
- SignedExecutionPayloadBid#gloas
- SignedExecutionPayloadEnvelope#gloas
- SignedProposerPreferences#gloas
dataclasses:
# Not implemented
- BlobParameters#fulu
- ExpectedWithdrawals#capella
- ExpectedWithdrawals#electra
# phase0
- LatestMessage#phase0
- LightClientStore#altair
- OptimisticStore#bellatrix
- Store#phase0
# Not implemented: capella
# altair
- LightClientStore#altair
# bellatrix
- OptimisticStore#bellatrix
# capella
- ExpectedWithdrawals#capella
- LightClientStore#capella
# Not implemented: gloas (future fork)
# electra
- ExpectedWithdrawals#electra
# fulu
- BlobParameters#fulu
# gloas
- ExpectedWithdrawals#gloas
- LatestMessage#gloas
- Store#gloas
@@ -140,7 +146,6 @@ exceptions:
- g1_lincomb#deneb
- hash_to_bls_field#deneb
- is_power_of_two#deneb
- multi_exp#deneb
- reverse_bits#deneb
- validate_kzg_g1#deneb
- verify_blob_kzg_proof#deneb
@@ -175,7 +180,12 @@ exceptions:
- verify_cell_kzg_proof_batch#fulu
- verify_cell_kzg_proof_batch_impl#fulu
# Not implemented: phase0
# phase0
- update_proposer_boost_root#phase0
- is_proposer_equivocation#phase0
- record_block_timeliness#phase0
- compute_proposer_score#phase0
- get_attestation_score#phase0
- calculate_committee_fraction#phase0
- compute_fork_version#phase0
- compute_pulled_up_tip#phase0
@@ -221,8 +231,7 @@ exceptions:
- validate_on_attestation#phase0
- validate_target_epoch_against_current_time#phase0
- xor#phase0
# Not implemented: altair
# altair
- compute_merkle_proof#altair
- compute_sync_committee_period_at_slot#altair
- get_contribution_and_proof#altair
@@ -244,27 +253,29 @@ exceptions:
- process_sync_committee_contributions#altair
- set_or_append_list#altair
- validate_light_client_update#altair
# Not implemented: bellatrix
# bellatrix
- get_execution_payload#bellatrix
- is_merge_transition_block#bellatrix
- is_optimistic_candidate_block#bellatrix
- latest_verified_ancestor#bellatrix
- prepare_execution_payload#bellatrix
# Not implemented: capella
# capella
- apply_withdrawals#capella
- get_balance_after_withdrawals#capella
- get_lc_execution_root#capella
- get_validators_sweep_withdrawals#capella
- is_valid_light_client_header#capella
- prepare_execution_payload#capella
- process_epoch#capella
- update_next_withdrawal_index#capella
- update_next_withdrawal_validator_index#capella
- upgrade_lc_bootstrap_to_capella#capella
- upgrade_lc_finality_update_to_capella#capella
- upgrade_lc_header_to_capella#capella
- upgrade_lc_optimistic_update_to_capella#capella
- upgrade_lc_store_to_capella#capella
- upgrade_lc_update_to_capella#capella
# Not implemented: deneb
# deneb
- get_lc_execution_root#deneb
- is_valid_light_client_header#deneb
- prepare_execution_payload#deneb
@@ -274,33 +285,34 @@ exceptions:
- upgrade_lc_optimistic_update_to_deneb#deneb
- upgrade_lc_store_to_deneb#deneb
- upgrade_lc_update_to_deneb#deneb
# Not implemented: electra
# electra
- compute_weak_subjectivity_period#electra
- current_sync_committee_gindex_at_slot#electra
- finalized_root_gindex_at_slot#electra
- get_eth1_vote#electra
- get_lc_execution_root#electra
- get_pending_partial_withdrawals#electra
- get_validators_sweep_withdrawals#electra
- is_compounding_withdrawal_credential#electra
- is_eligible_for_partial_withdrawals#electra
- is_within_weak_subjectivity_period#electra
- next_sync_committee_gindex_at_slot#electra
- normalize_merkle_branch#electra
- prepare_execution_payload#electra
- update_pending_partial_withdrawals#electra
- upgrade_lc_bootstrap_to_electra#electra
- upgrade_lc_finality_update_to_electra#electra
- upgrade_lc_header_to_electra#electra
- upgrade_lc_optimistic_update_to_electra#electra
- upgrade_lc_store_to_electra#electra
- upgrade_lc_update_to_electra#electra
# Not implemented: fulu
# fulu
- compute_matrix#fulu
- get_blob_parameters#fulu
- get_data_column_sidecars_from_block#fulu
- get_data_column_sidecars_from_column_sidecar#fulu
- recover_matrix#fulu
# Not implemented: gloas (future fork)
# gloas
- compute_balance_weighted_acceptance#gloas
- compute_balance_weighted_selection#gloas
- compute_fork_version#gloas
@@ -368,49 +380,42 @@ exceptions:
- verify_execution_payload_bid_signature#gloas
- add_builder_to_registry#gloas
- apply_deposit_for_builder#gloas
- apply_withdrawals#capella
- apply_withdrawals#gloas
- can_builder_cover_bid#gloas
- compute_proposer_score#phase0
- convert_builder_index_to_validator_index#gloas
- convert_validator_index_to_builder_index#gloas
- get_attestation_score#gloas
- get_attestation_score#phase0
- get_balance_after_withdrawals#capella
- get_builder_from_deposit#gloas
- get_builder_withdrawals#gloas
- get_builders_sweep_withdrawals#gloas
- get_index_for_new_builder#gloas
- get_pending_balance_to_withdraw_for_builder#gloas
- get_pending_partial_withdrawals#electra
- get_proposer_preferences_signature#gloas
- get_upcoming_proposal_slots#gloas
- get_validators_sweep_withdrawals#capella
- get_validators_sweep_withdrawals#electra
- initiate_builder_exit#gloas
- is_active_builder#gloas
- is_builder_index#gloas
- is_data_available#gloas
- is_eligible_for_partial_withdrawals#electra
- is_head_late#gloas
- is_head_weak#gloas
- is_parent_strong#gloas
- is_proposer_equivocation#phase0
- is_valid_proposal_slot#gloas
- onboard_builders_from_pending_deposits#gloas
- process_deposit_request#gloas
- process_voluntary_exit#gloas
- record_block_timeliness#gloas
- record_block_timeliness#phase0
- verify_data_column_sidecar_kzg_proofs#gloas
- should_apply_proposer_boost#gloas
- update_builder_pending_withdrawals#gloas
- update_next_withdrawal_builder_index#gloas
- update_next_withdrawal_index#capella
- update_next_withdrawal_validator_index#capella
- update_payload_expected_withdrawals#gloas
- update_pending_partial_withdrawals#electra
- update_proposer_boost_root#gloas
- update_proposer_boost_root#phase0
presets:
# gloas
- BUILDER_PENDING_WITHDRAWALS_LIMIT#gloas
- BUILDER_REGISTRY_LIMIT#gloas
- MAX_BUILDERS_PER_WITHDRAWALS_SWEEP#gloas

View File

@@ -12,11 +12,11 @@ jobs:
- name: Check version consistency
run: |
WORKSPACE_VERSION=$(grep 'consensus_spec_version = ' WORKSPACE | sed 's/.*"\(.*\)"/\1/')
ETHSPECIFY_VERSION=$(grep '^version:' specrefs/.ethspecify.yml | sed 's/version: //')
ETHSPECIFY_VERSION=$(grep '^version:' .ethspecify.yml | sed 's/version: //')
if [ "$WORKSPACE_VERSION" != "$ETHSPECIFY_VERSION" ]; then
echo "Version mismatch between WORKSPACE and ethspecify"
echo " WORKSPACE: $WORKSPACE_VERSION"
echo " specrefs/.ethspecify.yml: $ETHSPECIFY_VERSION"
echo " .ethspecify.yml: $ETHSPECIFY_VERSION"
exit 1
else
echo "Versions match: $WORKSPACE_VERSION"
@@ -26,7 +26,7 @@ jobs:
run: python3 -mpip install ethspecify
- name: Update spec references
run: ethspecify process --path=specrefs
run: ethspecify
- name: Check for differences
run: |
@@ -40,4 +40,4 @@ jobs:
fi
- name: Check spec references
run: ethspecify check --path=specrefs
run: ethspecify check

View File

@@ -273,16 +273,16 @@ filegroup(
url = "https://github.com/ethereum/EIPs/archive/5480440fe51742ed23342b68cf106cefd427e39d.tar.gz",
)
consensus_spec_version = "v1.7.0-alpha.1"
consensus_spec_version = "v1.7.0-alpha.2"
load("@prysm//tools:download_spectests.bzl", "consensus_spec_tests")
consensus_spec_tests(
name = "consensus_spec_tests",
flavors = {
"general": "sha256-j5R3jA7Oo4OSDMTvpMuD+8RomaCByeFSwtfkq6fL0Zg=",
"minimal": "sha256-tdTqByoyswOS4r6OxFmo70y2BP7w1TgEok+gf4cbxB0=",
"mainnet": "sha256-5gB4dt6SnSDKzdBc06VedId3NkgvSYyv9n9FRxWKwYI=",
"general": "sha256-iGQsGZ1cHah+2CSod9jC3kN8Ku4n6KO0hIwfINrn/po=",
"minimal": "sha256-TgcYt8N8sXSttdHTGvOa+exUZ1zn1UzlAMz0V7i37xc=",
"mainnet": "sha256-LnXyiLoJtrvEvbqLDSAAqpLMdN/lXv92SAgYG8fNjCs=",
},
version = consensus_spec_version,
)
@@ -298,7 +298,7 @@ filegroup(
visibility = ["//visibility:public"],
)
""",
integrity = "sha256-J+43DrK1pF658kTXTwMS6zGf4KDjvas++m8w2a8swpg=",
integrity = "sha256-Y/67Dg393PksZj5rTFNLntiJ6hNdB7Rxbu5gZE2gebY=",
strip_prefix = "consensus-specs-" + consensus_spec_version[1:],
url = "https://github.com/ethereum/consensus-specs/archive/refs/tags/%s.tar.gz" % consensus_spec_version,
)

19
api/fallback/BUILD.bazel Normal file
View File

@@ -0,0 +1,19 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"fallback.go",
"log.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/api/fallback",
visibility = ["//visibility:public"],
deps = ["@com_github_sirupsen_logrus//:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = ["fallback_test.go"],
embed = [":go_default_library"],
deps = ["//testing/assert:go_default_library"],
)

66
api/fallback/fallback.go Normal file
View File

@@ -0,0 +1,66 @@
package fallback
import (
"context"
"github.com/sirupsen/logrus"
)
// HostProvider is the subset of connection-provider methods that EnsureReady
// needs. Both grpc.GrpcConnectionProvider and rest.RestConnectionProvider
// satisfy this interface.
type HostProvider interface {
Hosts() []string
CurrentHost() string
SwitchHost(index int) error
}
// ReadyChecker can report whether the current endpoint is ready.
// iface.NodeClient satisfies this implicitly.
type ReadyChecker interface {
IsReady(ctx context.Context) bool
}
// EnsureReady iterates through the configured hosts and returns true as soon as
// one responds as ready. It starts from the provider's current host and wraps
// around using modular arithmetic, performing failover when a host is not ready.
func EnsureReady(ctx context.Context, provider HostProvider, checker ReadyChecker) bool {
hosts := provider.Hosts()
numHosts := len(hosts)
startingHost := provider.CurrentHost()
var attemptedHosts []string
// Find current index
currentIdx := 0
for i, h := range hosts {
if h == startingHost {
currentIdx = i
break
}
}
for i := range numHosts {
if checker.IsReady(ctx) {
if len(attemptedHosts) > 0 {
log.WithFields(logrus.Fields{
"previous": startingHost,
"current": provider.CurrentHost(),
"tried": attemptedHosts,
}).Info("Switched to responsive beacon node")
}
return true
}
attemptedHosts = append(attemptedHosts, provider.CurrentHost())
// Try next host if not the last iteration
if i < numHosts-1 {
nextIdx := (currentIdx + i + 1) % numHosts
if err := provider.SwitchHost(nextIdx); err != nil {
log.WithError(err).Error("Failed to switch host")
}
}
}
log.WithField("tried", attemptedHosts).Warn("No responsive beacon node found")
return false
}

View File

@@ -0,0 +1,94 @@
package fallback
import (
"context"
"testing"
"github.com/OffchainLabs/prysm/v7/testing/assert"
)
// mockHostProvider is a minimal HostProvider for unit tests.
type mockHostProvider struct {
hosts []string
hostIndex int
}
func (m *mockHostProvider) Hosts() []string { return m.hosts }
func (m *mockHostProvider) CurrentHost() string {
return m.hosts[m.hostIndex%len(m.hosts)]
}
func (m *mockHostProvider) SwitchHost(index int) error { m.hostIndex = index; return nil }
// mockReadyChecker records per-call IsReady results in sequence.
type mockReadyChecker struct {
results []bool
idx int
}
func (m *mockReadyChecker) IsReady(_ context.Context) bool {
if m.idx >= len(m.results) {
return false
}
r := m.results[m.idx]
m.idx++
return r
}
func TestEnsureReady_SingleHostReady(t *testing.T) {
provider := &mockHostProvider{hosts: []string{"http://host1:3500"}, hostIndex: 0}
checker := &mockReadyChecker{results: []bool{true}}
assert.Equal(t, true, EnsureReady(t.Context(), provider, checker))
assert.Equal(t, 0, provider.hostIndex)
}
func TestEnsureReady_SingleHostNotReady(t *testing.T) {
provider := &mockHostProvider{hosts: []string{"http://host1:3500"}, hostIndex: 0}
checker := &mockReadyChecker{results: []bool{false}}
assert.Equal(t, false, EnsureReady(t.Context(), provider, checker))
}
func TestEnsureReady_SingleHostError(t *testing.T) {
provider := &mockHostProvider{hosts: []string{"http://host1:3500"}, hostIndex: 0}
checker := &mockReadyChecker{results: []bool{false}}
assert.Equal(t, false, EnsureReady(t.Context(), provider, checker))
}
func TestEnsureReady_MultipleHostsFirstReady(t *testing.T) {
provider := &mockHostProvider{
hosts: []string{"http://host1:3500", "http://host2:3500"},
hostIndex: 0,
}
checker := &mockReadyChecker{results: []bool{true}}
assert.Equal(t, true, EnsureReady(t.Context(), provider, checker))
assert.Equal(t, 0, provider.hostIndex)
}
func TestEnsureReady_MultipleHostsFailoverToSecond(t *testing.T) {
provider := &mockHostProvider{
hosts: []string{"http://host1:3500", "http://host2:3500"},
hostIndex: 0,
}
checker := &mockReadyChecker{results: []bool{false, true}}
assert.Equal(t, true, EnsureReady(t.Context(), provider, checker))
assert.Equal(t, 1, provider.hostIndex)
}
func TestEnsureReady_MultipleHostsNoneReady(t *testing.T) {
provider := &mockHostProvider{
hosts: []string{"http://host1:3500", "http://host2:3500", "http://host3:3500"},
hostIndex: 0,
}
checker := &mockReadyChecker{results: []bool{false, false, false}}
assert.Equal(t, false, EnsureReady(t.Context(), provider, checker))
}
func TestEnsureReady_WrapAroundFromNonZeroIndex(t *testing.T) {
provider := &mockHostProvider{
hosts: []string{"http://host0:3500", "http://host1:3500", "http://host2:3500"},
hostIndex: 1,
}
// host1 (start) fails, host2 fails, host0 succeeds
checker := &mockReadyChecker{results: []bool{false, false, true}}
assert.Equal(t, true, EnsureReady(t.Context(), provider, checker))
assert.Equal(t, 0, provider.hostIndex)
}

View File

@@ -1,9 +1,9 @@
// Code generated by hack/gen-logs.sh; DO NOT EDIT.
// This file is created and regenerated automatically. Anything added here might get removed.
package blocks
package fallback
import "github.com/sirupsen/logrus"
// The prefix for logs from this package will be the text after the last slash in the package path.
// If you wish to change this, you should add your desired name in the runtime/logging/logrus-prefixed-formatter/prefix-replacement.go file.
var log = logrus.WithField("package", "consensus-types/blocks")
var log = logrus.WithField("package", "api/fallback")

View File

@@ -25,6 +25,11 @@ type GrpcConnectionProvider interface {
// SwitchHost switches to the endpoint at the given index.
// The new connection is created lazily on next CurrentConn() call.
SwitchHost(index int) error
// ConnectionCounter returns a monotonically increasing counter that increments
// each time SwitchHost changes the active endpoint. This allows consumers to
// detect connection changes even when the host string returns to a previous value
// (e.g., host0 → host1 → host0).
ConnectionCounter() uint64
// Close closes the current connection.
Close()
}
@@ -38,6 +43,7 @@ type grpcConnectionProvider struct {
// Current connection state (protected by mutex)
currentIndex uint64
conn *grpc.ClientConn
connCounter uint64
mu sync.Mutex
closed bool
@@ -138,6 +144,7 @@ func (p *grpcConnectionProvider) SwitchHost(index int) error {
p.conn = nil // Clear immediately - new connection created lazily
p.currentIndex = uint64(index)
p.connCounter++
// Close old connection asynchronously to avoid blocking the caller
if oldConn != nil {
@@ -155,6 +162,12 @@ func (p *grpcConnectionProvider) SwitchHost(index int) error {
return nil
}
func (p *grpcConnectionProvider) ConnectionCounter() uint64 {
p.mu.Lock()
defer p.mu.Unlock()
return p.connCounter
}
func (p *grpcConnectionProvider) Close() {
p.mu.Lock()
defer p.mu.Unlock()

View File

@@ -4,17 +4,24 @@ import "google.golang.org/grpc"
// MockGrpcProvider implements GrpcConnectionProvider for testing.
type MockGrpcProvider struct {
MockConn *grpc.ClientConn
MockHosts []string
MockConn *grpc.ClientConn
MockHosts []string
CurrentIndex int
ConnCounter uint64
}
func (m *MockGrpcProvider) CurrentConn() *grpc.ClientConn { return m.MockConn }
func (m *MockGrpcProvider) CurrentHost() string {
if len(m.MockHosts) > 0 {
return m.MockHosts[0]
return m.MockHosts[m.CurrentIndex]
}
return ""
}
func (m *MockGrpcProvider) Hosts() []string { return m.MockHosts }
func (m *MockGrpcProvider) SwitchHost(int) error { return nil }
func (m *MockGrpcProvider) Close() {}
func (m *MockGrpcProvider) Hosts() []string { return m.MockHosts }
func (m *MockGrpcProvider) SwitchHost(idx int) error {
m.CurrentIndex = idx
m.ConnCounter++
return nil
}
func (m *MockGrpcProvider) ConnectionCounter() uint64 { return m.ConnCounter }
func (m *MockGrpcProvider) Close() {}

View File

@@ -9,13 +9,13 @@ import (
// MockRestProvider implements RestConnectionProvider for testing.
type MockRestProvider struct {
MockClient *http.Client
MockHandler RestHandler
MockHandler Handler
MockHosts []string
HostIndex int
}
func (m *MockRestProvider) HttpClient() *http.Client { return m.MockClient }
func (m *MockRestProvider) RestHandler() RestHandler { return m.MockHandler }
func (m *MockRestProvider) Handler() Handler { return m.MockHandler }
func (m *MockRestProvider) CurrentHost() string {
if len(m.MockHosts) > 0 {
return m.MockHosts[m.HostIndex%len(m.MockHosts)]
@@ -25,25 +25,22 @@ func (m *MockRestProvider) CurrentHost() string {
func (m *MockRestProvider) Hosts() []string { return m.MockHosts }
func (m *MockRestProvider) SwitchHost(index int) error { m.HostIndex = index; return nil }
// MockRestHandler implements RestHandler for testing.
type MockRestHandler struct {
MockHost string
MockClient *http.Client
// MockHandler implements Handler for testing.
type MockHandler struct {
MockHost string
}
func (m *MockRestHandler) Get(_ context.Context, _ string, _ any) error { return nil }
func (m *MockRestHandler) GetStatusCode(_ context.Context, _ string) (int, error) {
func (m *MockHandler) Get(_ context.Context, _ string, _ any) error { return nil }
func (m *MockHandler) GetStatusCode(_ context.Context, _ string) (int, error) {
return http.StatusOK, nil
}
func (m *MockRestHandler) GetSSZ(_ context.Context, _ string) ([]byte, http.Header, error) {
func (m *MockHandler) GetSSZ(_ context.Context, _ string) ([]byte, http.Header, error) {
return nil, nil, nil
}
func (m *MockRestHandler) Post(_ context.Context, _ string, _ map[string]string, _ *bytes.Buffer, _ any) error {
func (m *MockHandler) Post(_ context.Context, _ string, _ map[string]string, _ *bytes.Buffer, _ any) error {
return nil
}
func (m *MockRestHandler) PostSSZ(_ context.Context, _ string, _ map[string]string, _ *bytes.Buffer) ([]byte, http.Header, error) {
func (m *MockHandler) PostSSZ(_ context.Context, _ string, _ map[string]string, _ *bytes.Buffer) ([]byte, http.Header, error) {
return nil, nil, nil
}
func (m *MockRestHandler) HttpClient() *http.Client { return m.MockClient }
func (m *MockRestHandler) Host() string { return m.MockHost }
func (m *MockRestHandler) SwitchHost(host string) { m.MockHost = host }
func (m *MockHandler) Host() string { return m.MockHost }

View File

@@ -17,8 +17,8 @@ import (
type RestConnectionProvider interface {
// HttpClient returns the configured HTTP client with headers, timeout, and optional tracing.
HttpClient() *http.Client
// RestHandler returns the REST handler for making API requests.
RestHandler() RestHandler
// Handler returns the REST handler for making API requests.
Handler() Handler
// CurrentHost returns the current REST API endpoint URL.
CurrentHost() string
// Hosts returns all configured REST API endpoint URLs.
@@ -54,7 +54,7 @@ func WithTracing() RestConnectionProviderOption {
type restConnectionProvider struct {
endpoints []string
httpClient *http.Client
restHandler RestHandler
restHandler *handler
currentIndex atomic.Uint64
timeout time.Duration
headers map[string][]string
@@ -96,7 +96,7 @@ func NewRestConnectionProvider(endpoint string, opts ...RestConnectionProviderOp
}
// Create the REST handler with the HTTP client and initial host
p.restHandler = newRestHandler(*p.httpClient, endpoints[0])
p.restHandler = newHandler(*p.httpClient, endpoints[0])
log.WithFields(logrus.Fields{
"endpoints": endpoints,
@@ -124,7 +124,7 @@ func (p *restConnectionProvider) HttpClient() *http.Client {
return p.httpClient
}
func (p *restConnectionProvider) RestHandler() RestHandler {
func (p *restConnectionProvider) Handler() Handler {
return p.restHandler
}

View File

@@ -21,32 +21,35 @@ import (
type reqOption func(*http.Request)
// RestHandler defines the interface for making REST API requests.
type RestHandler interface {
// Handler defines the interface for making REST API requests.
type Handler interface {
Get(ctx context.Context, endpoint string, resp any) error
GetStatusCode(ctx context.Context, endpoint string) (int, error)
GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error)
Post(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer, resp any) error
PostSSZ(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer) ([]byte, http.Header, error)
HttpClient() *http.Client
Host() string
SwitchHost(host string)
}
type restHandler struct {
type handler struct {
client http.Client
host string
reqOverrides []reqOption
}
// newRestHandler returns a RestHandler (internal use)
func newRestHandler(client http.Client, host string) RestHandler {
return NewRestHandler(client, host)
// newHandler returns a *handler for internal use within the rest package.
func newHandler(client http.Client, host string) *handler {
rh := &handler{
client: client,
host: host,
}
rh.appendAcceptOverride()
return rh
}
// NewRestHandler returns a RestHandler
func NewRestHandler(client http.Client, host string) RestHandler {
rh := &restHandler{
// NewHandler returns a Handler
func NewHandler(client http.Client, host string) Handler {
rh := &handler{
client: client,
host: host,
}
@@ -57,7 +60,7 @@ func NewRestHandler(client http.Client, host string) RestHandler {
// appendAcceptOverride enables the Accept header to be customized at runtime via an environment variable.
// This is specified as an env var because it is a niche option that prysm may use for performance testing or debugging
// bug which users are unlikely to need. Using an env var keeps the set of user-facing flags cleaner.
func (c *restHandler) appendAcceptOverride() {
func (c *handler) appendAcceptOverride() {
if accept := os.Getenv(params.EnvNameOverrideAccept); accept != "" {
c.reqOverrides = append(c.reqOverrides, func(req *http.Request) {
req.Header.Set("Accept", accept)
@@ -66,18 +69,18 @@ func (c *restHandler) appendAcceptOverride() {
}
// HttpClient returns the underlying HTTP client of the handler
func (c *restHandler) HttpClient() *http.Client {
func (c *handler) HttpClient() *http.Client {
return &c.client
}
// Host returns the underlying HTTP host
func (c *restHandler) Host() string {
func (c *handler) Host() string {
return c.host
}
// Get sends a GET request and decodes the response body as a JSON object into the passed in object.
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
func (c *restHandler) Get(ctx context.Context, endpoint string, resp any) error {
func (c *handler) Get(ctx context.Context, endpoint string, resp any) error {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@@ -100,7 +103,7 @@ func (c *restHandler) Get(ctx context.Context, endpoint string, resp any) error
// GetStatusCode sends a GET request and returns only the HTTP status code.
// This is useful for endpoints like /eth/v1/node/health that communicate status via HTTP codes
// (200 = ready, 206 = syncing, 503 = unavailable) rather than response bodies.
func (c *restHandler) GetStatusCode(ctx context.Context, endpoint string) (int, error) {
func (c *handler) GetStatusCode(ctx context.Context, endpoint string) (int, error) {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@@ -119,7 +122,7 @@ func (c *restHandler) GetStatusCode(ctx context.Context, endpoint string) (int,
return httpResp.StatusCode, nil
}
func (c *restHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
func (c *handler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@@ -174,7 +177,7 @@ func (c *restHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http
// Post sends a POST request and decodes the response body as a JSON object into the passed in object.
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
func (c *restHandler) Post(
func (c *handler) Post(
ctx context.Context,
apiEndpoint string,
headers map[string]string,
@@ -210,7 +213,7 @@ func (c *restHandler) Post(
}
// PostSSZ sends a POST request and prefers an SSZ (application/octet-stream) response body.
func (c *restHandler) PostSSZ(
func (c *handler) PostSSZ(
ctx context.Context,
apiEndpoint string,
headers map[string]string,
@@ -311,6 +314,6 @@ func decodeResp(httpResp *http.Response, resp any) error {
return nil
}
func (c *restHandler) SwitchHost(host string) {
func (c *handler) SwitchHost(host string) {
c.host = host
}

View File

@@ -509,17 +509,17 @@ func (s *SignedBlindedBeaconBlockFulu) SigString() string {
// ----------------------------------------------------------------------------
type ExecutionPayloadBid struct {
ParentBlockHash string `json:"parent_block_hash"`
ParentBlockRoot string `json:"parent_block_root"`
BlockHash string `json:"block_hash"`
PrevRandao string `json:"prev_randao"`
FeeRecipient string `json:"fee_recipient"`
GasLimit string `json:"gas_limit"`
BuilderIndex string `json:"builder_index"`
Slot string `json:"slot"`
Value string `json:"value"`
ExecutionPayment string `json:"execution_payment"`
BlobKzgCommitmentsRoot string `json:"blob_kzg_commitments_root"`
ParentBlockHash string `json:"parent_block_hash"`
ParentBlockRoot string `json:"parent_block_root"`
BlockHash string `json:"block_hash"`
PrevRandao string `json:"prev_randao"`
FeeRecipient string `json:"fee_recipient"`
GasLimit string `json:"gas_limit"`
BuilderIndex string `json:"builder_index"`
Slot string `json:"slot"`
Value string `json:"value"`
ExecutionPayment string `json:"execution_payment"`
BlobKzgCommitments []string `json:"blob_kzg_commitments"`
}
type SignedExecutionPayloadBid struct {

View File

@@ -2939,18 +2939,22 @@ func SignedExecutionPayloadBidFromConsensus(b *eth.SignedExecutionPayloadBid) *S
}
func ExecutionPayloadBidFromConsensus(b *eth.ExecutionPayloadBid) *ExecutionPayloadBid {
blobKzgCommitments := make([]string, len(b.BlobKzgCommitments))
for i := range b.BlobKzgCommitments {
blobKzgCommitments[i] = hexutil.Encode(b.BlobKzgCommitments[i])
}
return &ExecutionPayloadBid{
ParentBlockHash: hexutil.Encode(b.ParentBlockHash),
ParentBlockRoot: hexutil.Encode(b.ParentBlockRoot),
BlockHash: hexutil.Encode(b.BlockHash),
PrevRandao: hexutil.Encode(b.PrevRandao),
FeeRecipient: hexutil.Encode(b.FeeRecipient),
GasLimit: fmt.Sprintf("%d", b.GasLimit),
BuilderIndex: fmt.Sprintf("%d", b.BuilderIndex),
Slot: fmt.Sprintf("%d", b.Slot),
Value: fmt.Sprintf("%d", b.Value),
ExecutionPayment: fmt.Sprintf("%d", b.ExecutionPayment),
BlobKzgCommitmentsRoot: hexutil.Encode(b.BlobKzgCommitmentsRoot),
ParentBlockHash: hexutil.Encode(b.ParentBlockHash),
ParentBlockRoot: hexutil.Encode(b.ParentBlockRoot),
BlockHash: hexutil.Encode(b.BlockHash),
PrevRandao: hexutil.Encode(b.PrevRandao),
FeeRecipient: hexutil.Encode(b.FeeRecipient),
GasLimit: fmt.Sprintf("%d", b.GasLimit),
BuilderIndex: fmt.Sprintf("%d", b.BuilderIndex),
Slot: fmt.Sprintf("%d", b.Slot),
Value: fmt.Sprintf("%d", b.Value),
ExecutionPayment: fmt.Sprintf("%d", b.ExecutionPayment),
BlobKzgCommitments: blobKzgCommitments,
}
}
@@ -3187,22 +3191,30 @@ func (b *ExecutionPayloadBid) ToConsensus() (*eth.ExecutionPayloadBid, error) {
if err != nil {
return nil, server.NewDecodeError(err, "ExecutionPayment")
}
blobKzgCommitmentsRoot, err := bytesutil.DecodeHexWithLength(b.BlobKzgCommitmentsRoot, fieldparams.RootLength)
err = slice.VerifyMaxLength(b.BlobKzgCommitments, fieldparams.MaxBlobCommitmentsPerBlock)
if err != nil {
return nil, server.NewDecodeError(err, "BlobKzgCommitmentsRoot")
return nil, server.NewDecodeError(err, "BlobKzgCommitments")
}
blobKzgCommitments := make([][]byte, len(b.BlobKzgCommitments))
for i, commitment := range b.BlobKzgCommitments {
kzg, err := bytesutil.DecodeHexWithLength(commitment, fieldparams.BLSPubkeyLength)
if err != nil {
return nil, server.NewDecodeError(err, fmt.Sprintf("BlobKzgCommitments[%d]", i))
}
blobKzgCommitments[i] = kzg
}
return &eth.ExecutionPayloadBid{
ParentBlockHash: parentBlockHash,
ParentBlockRoot: parentBlockRoot,
BlockHash: blockHash,
PrevRandao: prevRandao,
FeeRecipient: feeRecipient,
GasLimit: gasLimit,
BuilderIndex: primitives.BuilderIndex(builderIndex),
Slot: primitives.Slot(slot),
Value: primitives.Gwei(value),
ExecutionPayment: primitives.Gwei(executionPayment),
BlobKzgCommitmentsRoot: blobKzgCommitmentsRoot,
ParentBlockHash: parentBlockHash,
ParentBlockRoot: parentBlockRoot,
BlockHash: blockHash,
PrevRandao: prevRandao,
FeeRecipient: feeRecipient,
GasLimit: gasLimit,
BuilderIndex: primitives.BuilderIndex(builderIndex),
Slot: primitives.Slot(slot),
Value: primitives.Gwei(value),
ExecutionPayment: primitives.Gwei(executionPayment),
BlobKzgCommitments: blobKzgCommitments,
}, nil
}

View File

@@ -74,6 +74,18 @@ type SyncCommitteeDuty struct {
ValidatorSyncCommitteeIndices []string `json:"validator_sync_committee_indices"`
}
type GetPTCDutiesResponse struct {
DependentRoot string `json:"dependent_root"`
ExecutionOptimistic bool `json:"execution_optimistic"`
Data []*PTCDuty `json:"data"`
}
type PTCDuty struct {
Pubkey string `json:"pubkey"`
ValidatorIndex string `json:"validator_index"`
Slot string `json:"slot"`
}
// ProduceBlockV3Response is a wrapper json object for the returned block from the ProduceBlockV3 endpoint
type ProduceBlockV3Response struct {
Version string `json:"version"`

View File

@@ -27,6 +27,8 @@ go_library(
"receive_blob.go",
"receive_block.go",
"receive_data_column.go",
"receive_execution_payload_envelope.go",
"receive_payload_attestation_message.go",
"service.go",
"setup_forkchoice.go",
"tracked_proposer.go",
@@ -85,6 +87,7 @@ go_library(
"//consensus-types/primitives:go_default_library",
"//crypto/bls:go_default_library",
"//encoding/bytesutil:go_default_library",
"//io/logs:go_default_library",
"//math:go_default_library",
"//monitoring/tracing:go_default_library",
"//monitoring/tracing/trace:go_default_library",

View File

@@ -110,7 +110,7 @@ func VerifyCellKZGProofBatch(commitmentsBytes []Bytes48, cellIndices []uint64, c
ckzgCells := make([]ckzg4844.Cell, len(cells))
for i := range cells {
ckzgCells[i] = ckzg4844.Cell(cells[i])
copy(ckzgCells[i][:], cells[i][:])
}
return ckzg4844.VerifyCellKZGProofBatch(commitmentsBytes, cellIndices, ckzgCells, proofsBytes)
}

View File

@@ -10,6 +10,7 @@ import (
consensus_types "github.com/OffchainLabs/prysm/v7/consensus-types"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/io/logs"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
prysmTime "github.com/OffchainLabs/prysm/v7/time"
@@ -87,36 +88,45 @@ func logStateTransitionData(b interfaces.ReadOnlyBeaconBlock) error {
func logBlockSyncStatus(block interfaces.ReadOnlyBeaconBlock, blockRoot [32]byte, justified, finalized *ethpb.Checkpoint, receivedTime time.Time, genesis time.Time, daWaitedTime time.Duration) error {
startTime, err := slots.StartTime(genesis, block.Slot())
if err != nil {
return err
return errors.Wrap(err, "failed to get slot start time")
}
level := log.Logger.GetLevel()
parentRoot := block.ParentRoot()
blkRoot := fmt.Sprintf("0x%s...", hex.EncodeToString(blockRoot[:])[:8])
finalizedRoot := fmt.Sprintf("0x%s...", hex.EncodeToString(finalized.Root)[:8])
sinceSlotStartTime := prysmTime.Now().Sub(startTime)
lessFields := logrus.Fields{
"slot": block.Slot(),
"block": blkRoot,
"finalizedEpoch": finalized.Epoch,
"finalizedRoot": finalizedRoot,
"epoch": slots.ToEpoch(block.Slot()),
"sinceSlotStartTime": sinceSlotStartTime,
}
moreFields := logrus.Fields{
"slot": block.Slot(),
"slotInEpoch": block.Slot() % params.BeaconConfig().SlotsPerEpoch,
"block": blkRoot,
"epoch": slots.ToEpoch(block.Slot()),
"justifiedEpoch": justified.Epoch,
"justifiedRoot": fmt.Sprintf("0x%s...", hex.EncodeToString(justified.Root)[:8]),
"finalizedEpoch": finalized.Epoch,
"finalizedRoot": finalizedRoot,
"parentRoot": fmt.Sprintf("0x%s...", hex.EncodeToString(parentRoot[:])[:8]),
"version": version.String(block.Version()),
"sinceSlotStartTime": sinceSlotStartTime,
"chainServiceProcessedTime": prysmTime.Now().Sub(receivedTime) - daWaitedTime,
"dataAvailabilityWaitedTime": daWaitedTime,
}
level := logs.PackageVerbosity("beacon-chain/blockchain")
if level >= logrus.DebugLevel {
parentRoot := block.ParentRoot()
lf := logrus.Fields{
"slot": block.Slot(),
"slotInEpoch": block.Slot() % params.BeaconConfig().SlotsPerEpoch,
"block": fmt.Sprintf("0x%s...", hex.EncodeToString(blockRoot[:])[:8]),
"epoch": slots.ToEpoch(block.Slot()),
"justifiedEpoch": justified.Epoch,
"justifiedRoot": fmt.Sprintf("0x%s...", hex.EncodeToString(justified.Root)[:8]),
"finalizedEpoch": finalized.Epoch,
"finalizedRoot": fmt.Sprintf("0x%s...", hex.EncodeToString(finalized.Root)[:8]),
"parentRoot": fmt.Sprintf("0x%s...", hex.EncodeToString(parentRoot[:])[:8]),
"version": version.String(block.Version()),
"sinceSlotStartTime": prysmTime.Now().Sub(startTime),
"chainServiceProcessedTime": prysmTime.Now().Sub(receivedTime) - daWaitedTime,
"dataAvailabilityWaitedTime": daWaitedTime,
}
log.WithFields(lf).Debug("Synced new block")
} else {
log.WithFields(logrus.Fields{
"slot": block.Slot(),
"block": fmt.Sprintf("0x%s...", hex.EncodeToString(blockRoot[:])[:8]),
"finalizedEpoch": finalized.Epoch,
"finalizedRoot": fmt.Sprintf("0x%s...", hex.EncodeToString(finalized.Root)[:8]),
"epoch": slots.ToEpoch(block.Slot()),
}).Info("Synced new block")
log.WithFields(moreFields).Info("Synced new block")
return nil
}
log.WithFields(lessFields).WithField(logs.LogTargetField, logs.LogTargetUser).Info("Synced new block")
log.WithFields(moreFields).WithField(logs.LogTargetField, logs.LogTargetEphemeral).Info("Synced new block")
return nil
}

View File

@@ -0,0 +1,19 @@
package blockchain
import (
"context"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
)
// ExecutionPayloadEnvelopeReceiver interface defines the methods of chain service for receiving
// validated execution payload envelopes.
type ExecutionPayloadEnvelopeReceiver interface {
ReceiveExecutionPayloadEnvelope(context.Context, interfaces.ROSignedExecutionPayloadEnvelope) error
}
// ReceiveExecutionPayloadEnvelope accepts a signed execution payload envelope.
func (s *Service) ReceiveExecutionPayloadEnvelope(_ context.Context, _ interfaces.ROSignedExecutionPayloadEnvelope) error {
// TODO: wire into execution payload envelope processing pipeline.
return nil
}

View File

@@ -0,0 +1,19 @@
package blockchain
import (
"context"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
)
// PayloadAttestationReceiver interface defines the methods of chain service for receiving
// validated payload attestation messages.
type PayloadAttestationReceiver interface {
ReceivePayloadAttestationMessage(context.Context, *ethpb.PayloadAttestationMessage) error
}
// ReceivePayloadAttestationMessage accepts a payload attestation message.
func (s *Service) ReceivePayloadAttestationMessage(ctx context.Context, a *ethpb.PayloadAttestationMessage) error {
// TODO: Handle payload attestation message processing once Gloas is fully wired.
return nil
}

View File

@@ -89,7 +89,7 @@ func (mb *mockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
return nil
}
func (mb *mockBroadcaster) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn, _ []blocks.PartialDataColumn) error {
func (mb *mockBroadcaster) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn) error {
mb.broadcastCalled = true
return nil
}

View File

@@ -757,6 +757,16 @@ func (c *ChainService) ReceiveDataColumns(dcs []blocks.VerifiedRODataColumn) err
return nil
}
// ReceivePayloadAttestationMessage implements the same method in the chain service.
func (c *ChainService) ReceivePayloadAttestationMessage(_ context.Context, _ *ethpb.PayloadAttestationMessage) error {
return nil
}
// ReceiveExecutionPayloadEnvelope implements the same method in the chain service.
func (c *ChainService) ReceiveExecutionPayloadEnvelope(_ context.Context, _ interfaces.ROSignedExecutionPayloadEnvelope) error {
return nil
}
// DependentRootForEpoch mocks the same method in the chain service
func (c *ChainService) DependentRootForEpoch(_ [32]byte, _ primitives.Epoch) ([32]byte, error) {
return c.TargetRoot, nil

View File

@@ -17,6 +17,7 @@ go_library(
"error.go",
"interfaces.go",
"log.go",
"payload_attestation.go",
"payload_id.go",
"proposer_indices.go",
"proposer_indices_disabled.go", # keep
@@ -76,6 +77,7 @@ go_test(
"checkpoint_state_test.go",
"committee_fuzz_test.go",
"committee_test.go",
"payload_attestation_test.go",
"payload_id_test.go",
"private_access_test.go",
"proposer_indices_test.go",

View File

@@ -0,0 +1,53 @@
package cache
import (
"sync"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
)
// PayloadAttestationCache tracks seen payload attestation messages for a single slot.
type PayloadAttestationCache struct {
slot primitives.Slot
seen map[primitives.ValidatorIndex]struct{}
mu sync.RWMutex
}
// Seen returns true if a vote for the given slot has already been
// processed for this validator index.
func (p *PayloadAttestationCache) Seen(slot primitives.Slot, idx primitives.ValidatorIndex) bool {
p.mu.RLock()
defer p.mu.RUnlock()
if p.slot != slot {
return false
}
if p.seen == nil {
return false
}
_, ok := p.seen[idx]
return ok
}
// Add marks the given slot and validator index as seen.
// This function assumes that the message has already been validated.
func (p *PayloadAttestationCache) Add(slot primitives.Slot, idx primitives.ValidatorIndex) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.slot != slot {
p.slot = slot
p.seen = make(map[primitives.ValidatorIndex]struct{})
}
if p.seen == nil {
p.seen = make(map[primitives.ValidatorIndex]struct{})
}
p.seen[idx] = struct{}{}
return nil
}
// Clear clears the internal cache.
func (p *PayloadAttestationCache) Clear() {
p.mu.Lock()
defer p.mu.Unlock()
p.slot = 0
p.seen = nil
}

View File

@@ -0,0 +1,48 @@
package cache_test
import (
"testing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/stretchr/testify/require"
)
func TestPayloadAttestationCache_SeenAndAdd(t *testing.T) {
var c cache.PayloadAttestationCache
slot1 := primitives.Slot(1)
slot2 := primitives.Slot(2)
idx1 := primitives.ValidatorIndex(3)
idx2 := primitives.ValidatorIndex(4)
require.False(t, c.Seen(slot1, idx1))
require.NoError(t, c.Add(slot1, idx1))
require.True(t, c.Seen(slot1, idx1))
require.False(t, c.Seen(slot1, idx2))
require.False(t, c.Seen(slot2, idx1))
require.NoError(t, c.Add(slot1, idx2))
require.True(t, c.Seen(slot1, idx1))
require.True(t, c.Seen(slot1, idx2))
require.NoError(t, c.Add(slot2, idx1))
require.True(t, c.Seen(slot2, idx1))
require.False(t, c.Seen(slot1, idx1))
require.False(t, c.Seen(slot1, idx2))
}
func TestPayloadAttestationCache_Clear(t *testing.T) {
var c cache.PayloadAttestationCache
slot := primitives.Slot(10)
idx := primitives.ValidatorIndex(42)
require.NoError(t, c.Add(slot, idx))
require.True(t, c.Seen(slot, idx))
c.Clear()
require.False(t, c.Seen(slot, idx))
require.NoError(t, c.Add(slot, idx))
require.True(t, c.Seen(slot, idx))
}

View File

@@ -20,6 +20,7 @@ go_library(
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/epoch:go_default_library",
"//beacon-chain/core/epoch/precompute:go_default_library",
"//beacon-chain/core/gloas:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/time:go_default_library",

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/blocks"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/gloas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
@@ -75,7 +76,11 @@ func ProcessAttestationNoVerifySignature(
return nil, err
}
return SetParticipationAndRewardProposer(ctx, beaconState, att.GetData().Target.Epoch, indices, participatedFlags, totalBalance)
if err := beaconState.UpdatePendingPaymentWeight(att, indices, participatedFlags); err != nil {
return nil, errors.Wrap(err, "failed to update pending payment weight")
}
return SetParticipationAndRewardProposer(ctx, beaconState, att.GetData().Target.Epoch, indices, participatedFlags, totalBalance, att)
}
// SetParticipationAndRewardProposer retrieves and sets the epoch participation bits in state. Based on the epoch participation, it rewards
@@ -105,7 +110,9 @@ func SetParticipationAndRewardProposer(
beaconState state.BeaconState,
targetEpoch primitives.Epoch,
indices []uint64,
participatedFlags map[uint8]bool, totalBalance uint64) (state.BeaconState, error) {
participatedFlags map[uint8]bool,
totalBalance uint64,
att ethpb.Att) (state.BeaconState, error) {
var proposerRewardNumerator uint64
currentEpoch := time.CurrentEpoch(beaconState)
var stateErr error
@@ -299,6 +306,19 @@ func AttestationParticipationFlagIndices(beaconState state.ReadOnlyBeaconState,
participatedFlags[targetFlagIndex] = true
}
matchedSrcTgtHead := matchedHead && matchedSrcTgt
var beaconBlockRoot [32]byte
copy(beaconBlockRoot[:], data.BeaconBlockRoot)
matchingPayload, err := gloas.MatchingPayload(
beaconState,
beaconBlockRoot,
data.Slot,
uint64(data.CommitteeIndex),
)
if err != nil {
return nil, err
}
matchedSrcTgtHead = matchedSrcTgtHead && matchingPayload
if matchedSrcTgtHead && delay == cfg.MinAttestationInclusionDelay {
participatedFlags[headFlagIndex] = true
}

View File

@@ -1,7 +1,9 @@
package altair_test
import (
"bytes"
"fmt"
"reflect"
"testing"
"github.com/OffchainLabs/go-bitfield"
@@ -556,7 +558,7 @@ func TestSetParticipationAndRewardProposer(t *testing.T) {
b, err := helpers.TotalActiveBalance(beaconState)
require.NoError(t, err)
st, err := altair.SetParticipationAndRewardProposer(t.Context(), beaconState, test.epoch, test.indices, test.participatedFlags, b)
st, err := altair.SetParticipationAndRewardProposer(t.Context(), beaconState, test.epoch, test.indices, test.participatedFlags, b, &ethpb.Attestation{})
require.NoError(t, err)
i, err := helpers.BeaconProposerIndex(t.Context(), st)
@@ -775,11 +777,67 @@ func TestAttestationParticipationFlagIndices(t *testing.T) {
headFlagIndex: true,
},
},
{
name: "gloas same-slot committee index non-zero errors",
inputState: func() state.BeaconState {
stateSlot := primitives.Slot(5)
slot := primitives.Slot(3)
targetRoot := bytes.Repeat([]byte{0xAA}, 32)
headRoot := bytes.Repeat([]byte{0xBB}, 32)
prevRoot := bytes.Repeat([]byte{0xCC}, 32)
return buildGloasStateForFlags(t, stateSlot, slot, targetRoot, headRoot, prevRoot, 0, 0)
}(),
inputData: &ethpb.AttestationData{
Slot: 3,
CommitteeIndex: 1, // invalid for same-slot
BeaconBlockRoot: bytes.Repeat([]byte{0xBB}, 32),
Source: &ethpb.Checkpoint{Root: bytes.Repeat([]byte{0xDD}, 32)},
Target: &ethpb.Checkpoint{
Epoch: 0,
Root: bytes.Repeat([]byte{0xAA}, 32),
},
},
inputDelay: 1,
participationIndices: nil,
},
{
name: "gloas payload availability matches committee index",
inputState: func() state.BeaconState {
stateSlot := primitives.Slot(5)
slot := primitives.Slot(3)
targetRoot := bytes.Repeat([]byte{0xAA}, 32)
headRoot := bytes.Repeat([]byte{0xBB}, 32)
// Same prev root to make SameSlotAttestation false and use payload availability.
return buildGloasStateForFlags(t, stateSlot, slot, targetRoot, headRoot, headRoot, 1, slot)
}(),
inputData: &ethpb.AttestationData{
Slot: 3,
CommitteeIndex: 1,
BeaconBlockRoot: bytes.Repeat([]byte{0xBB}, 32),
Source: &ethpb.Checkpoint{Root: bytes.Repeat([]byte{0xDD}, 32)},
Target: &ethpb.Checkpoint{
Epoch: 0,
Root: bytes.Repeat([]byte{0xAA}, 32),
},
},
inputDelay: 1,
participationIndices: map[uint8]bool{
sourceFlagIndex: true,
targetFlagIndex: true,
headFlagIndex: true,
},
},
}
for _, test := range tests {
flagIndices, err := altair.AttestationParticipationFlagIndices(test.inputState, test.inputData, test.inputDelay)
if test.participationIndices == nil {
require.ErrorContains(t, "committee index", err)
continue
}
require.NoError(t, err)
require.DeepEqual(t, test.participationIndices, flagIndices)
if !reflect.DeepEqual(test.participationIndices, flagIndices) {
t.Fatalf("unexpected participation indices: got %v want %v", flagIndices, test.participationIndices)
}
}
}
@@ -858,3 +916,61 @@ func TestMatchingStatus(t *testing.T) {
require.Equal(t, test.matchedHead, head)
}
}
func buildGloasStateForFlags(t *testing.T, stateSlot, slot primitives.Slot, targetRoot, headRoot, prevRoot []byte, availabilityBit uint8, availabilitySlot primitives.Slot) state.BeaconState {
t.Helper()
cfg := params.BeaconConfig()
blockRoots := make([][]byte, cfg.SlotsPerHistoricalRoot)
blockRoots[0] = targetRoot
blockRoots[slot%cfg.SlotsPerHistoricalRoot] = headRoot
blockRoots[(slot-1)%cfg.SlotsPerHistoricalRoot] = prevRoot
stateRoots := make([][]byte, cfg.SlotsPerHistoricalRoot)
for i := range stateRoots {
stateRoots[i] = make([]byte, fieldparams.RootLength)
}
randaoMixes := make([][]byte, cfg.EpochsPerHistoricalVector)
for i := range randaoMixes {
randaoMixes[i] = make([]byte, fieldparams.RootLength)
}
execPayloadAvailability := make([]byte, cfg.SlotsPerHistoricalRoot/8)
idx := availabilitySlot % cfg.SlotsPerHistoricalRoot
byteIndex := idx / 8
bitIndex := idx % 8
if availabilityBit == 1 {
execPayloadAvailability[byteIndex] |= 1 << bitIndex
}
checkpointRoot := bytes.Repeat([]byte{0xDD}, fieldparams.RootLength)
justified := &ethpb.Checkpoint{Root: checkpointRoot}
stProto := &ethpb.BeaconStateGloas{
Slot: stateSlot,
GenesisValidatorsRoot: bytes.Repeat([]byte{0x11}, fieldparams.RootLength),
BlockRoots: blockRoots,
StateRoots: stateRoots,
RandaoMixes: randaoMixes,
ExecutionPayloadAvailability: execPayloadAvailability,
CurrentJustifiedCheckpoint: justified,
PreviousJustifiedCheckpoint: justified,
Validators: []*ethpb.Validator{
{
EffectiveBalance: cfg.MinActivationBalance,
WithdrawalCredentials: append([]byte{cfg.ETH1AddressWithdrawalPrefixByte}, bytes.Repeat([]byte{0x01}, 31)...),
},
},
Balances: []uint64{cfg.MinActivationBalance},
BuilderPendingPayments: make([]*ethpb.BuilderPendingPayment, cfg.SlotsPerEpoch*2),
Fork: &ethpb.Fork{
CurrentVersion: bytes.Repeat([]byte{0x01}, 4),
PreviousVersion: bytes.Repeat([]byte{0x01}, 4),
Epoch: 0,
},
}
beaconState, err := state_native.InitializeFromProtoGloas(stProto)
require.NoError(t, err)
return beaconState
}

View File

@@ -111,10 +111,21 @@ func VerifyAttestationNoVerifySignature(
var indexedAtt ethpb.IndexedAtt
if att.Version() >= version.Electra {
if att.GetData().CommitteeIndex != 0 {
return errors.New("committee index must be 0 post-Electra")
ci := att.GetData().CommitteeIndex
// Spec v1.7.0-alpha pseudocode:
//
// # [Modified in Gloas:EIP7732]
// assert data.index < 2
//
if beaconState.Version() >= version.Gloas {
if ci >= 2 {
return fmt.Errorf("incorrect committee index %d", ci)
}
} else {
if ci != 0 {
return errors.New("committee index must be 0 between Electra and Gloas forks")
}
}
aggBits := att.GetAggregationBits()
committeeIndices := att.CommitteeBitsVal().BitIndices()
committees := make([][]primitives.ValidatorIndex, len(committeeIndices))

View File

@@ -1,6 +1,7 @@
package blocks_test
import (
"bytes"
"context"
"testing"
@@ -262,7 +263,7 @@ func TestVerifyAttestationNoVerifySignature_Electra(t *testing.T) {
CommitteeBits: bitfield.NewBitvector64(),
}
err = blocks.VerifyAttestationNoVerifySignature(context.TODO(), beaconState, att)
assert.ErrorContains(t, "committee index must be 0 post-Electra", err)
assert.ErrorContains(t, "committee index must be 0", err)
})
t.Run("index of committee too big", func(t *testing.T) {
aggBits := bitfield.NewBitlist(3)
@@ -314,6 +315,75 @@ func TestVerifyAttestationNoVerifySignature_Electra(t *testing.T) {
})
}
func TestVerifyAttestationNoVerifySignature_GloasCommitteeIndexLimit(t *testing.T) {
cfg := params.BeaconConfig()
stateSlot := cfg.MinAttestationInclusionDelay + 1
blockRoots := make([][]byte, cfg.SlotsPerHistoricalRoot)
for i := range blockRoots {
blockRoots[i] = make([]byte, fieldparams.RootLength)
}
stateRoots := make([][]byte, cfg.SlotsPerHistoricalRoot)
for i := range stateRoots {
stateRoots[i] = make([]byte, fieldparams.RootLength)
}
randaoMixes := make([][]byte, cfg.EpochsPerHistoricalVector)
for i := range randaoMixes {
randaoMixes[i] = make([]byte, fieldparams.RootLength)
}
checkpointRoot := bytes.Repeat([]byte{0xAA}, fieldparams.RootLength)
justified := &ethpb.Checkpoint{Epoch: 0, Root: checkpointRoot}
gloasStateProto := &ethpb.BeaconStateGloas{
Slot: stateSlot,
GenesisValidatorsRoot: bytes.Repeat([]byte{0x11}, fieldparams.RootLength),
BlockRoots: blockRoots,
StateRoots: stateRoots,
RandaoMixes: randaoMixes,
ExecutionPayloadAvailability: make([]byte, cfg.SlotsPerHistoricalRoot/8),
CurrentJustifiedCheckpoint: justified,
PreviousJustifiedCheckpoint: justified,
Validators: []*ethpb.Validator{
{
EffectiveBalance: cfg.MinActivationBalance,
WithdrawalCredentials: append([]byte{cfg.ETH1AddressWithdrawalPrefixByte}, bytes.Repeat([]byte{0x01}, 31)...),
},
},
Balances: []uint64{cfg.MinActivationBalance},
BuilderPendingPayments: make([]*ethpb.BuilderPendingPayment, cfg.SlotsPerEpoch*2),
Fork: &ethpb.Fork{
CurrentVersion: bytes.Repeat([]byte{0x01}, 4),
PreviousVersion: bytes.Repeat([]byte{0x01}, 4),
Epoch: 0,
},
}
beaconState, err := state_native.InitializeFromProtoGloas(gloasStateProto)
require.NoError(t, err)
committeeBits := bitfield.NewBitvector64()
committeeBits.SetBitAt(0, true)
aggBits := bitfield.NewBitlist(1)
aggBits.SetBitAt(0, true)
att := &ethpb.AttestationElectra{
Data: &ethpb.AttestationData{
Slot: 0,
CommitteeIndex: 2, // invalid for Gloas (must be <2)
BeaconBlockRoot: blockRoots[0],
Source: justified,
Target: justified,
},
AggregationBits: aggBits,
CommitteeBits: committeeBits,
Signature: bytes.Repeat([]byte{0x00}, fieldparams.BLSSignatureLength),
}
err = blocks.VerifyAttestationNoVerifySignature(context.TODO(), beaconState, att)
assert.ErrorContains(t, "incorrect committee index 2", err)
}
func TestConvertToIndexed_OK(t *testing.T) {
helpers.ClearCache()
validators := make([]*ethpb.Validator, 2*params.BeaconConfig().SlotsPerEpoch)
@@ -583,6 +653,7 @@ func TestVerifyAttestations_HandlesPlannedFork(t *testing.T) {
}
func TestRetrieveAttestationSignatureSet_VerifiesMultipleAttestations(t *testing.T) {
helpers.ClearCache()
ctx := t.Context()
numOfValidators := uint64(params.BeaconConfig().SlotsPerEpoch.Mul(4))
validators := make([]*ethpb.Validator, numOfValidators)

View File

@@ -3,6 +3,7 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"attestation.go",
"bid.go",
"payload_attestation.go",
"pending_payment.go",
@@ -26,6 +27,7 @@ go_library(
"//crypto/hash:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_pkg_errors//:go_default_library",
],
@@ -34,6 +36,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"attestation_test.go",
"bid_test.go",
"payload_attestation_test.go",
"pending_payment_test.go",

View File

@@ -0,0 +1,52 @@
package gloas
import (
"fmt"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/pkg/errors"
)
// MatchingPayload returns true if the attestation's committee index matches the expected payload index.
//
// For pre-Gloas forks, this always returns true.
//
// Spec v1.7.0-alpha (pseudocode):
//
// # [New in Gloas:EIP7732]
// if is_attestation_same_slot(state, data):
// assert data.index == 0
// payload_matches = True
// else:
// slot_index = data.slot % SLOTS_PER_HISTORICAL_ROOT
// payload_index = state.execution_payload_availability[slot_index]
// payload_matches = data.index == payload_index
func MatchingPayload(
beaconState state.ReadOnlyBeaconState,
beaconBlockRoot [32]byte,
slot primitives.Slot,
committeeIndex uint64,
) (bool, error) {
if beaconState.Version() < version.Gloas {
return true, nil
}
sameSlot, err := beaconState.IsAttestationSameSlot(beaconBlockRoot, slot)
if err != nil {
return false, errors.Wrap(err, "failed to get same slot attestation status")
}
if sameSlot {
if committeeIndex != 0 {
return false, fmt.Errorf("committee index %d for same slot attestation must be 0", committeeIndex)
}
return true, nil
}
executionPayloadAvail, err := beaconState.ExecutionPayloadAvailability(slot)
if err != nil {
return false, errors.Wrap(err, "failed to get execution payload availability status")
}
return executionPayloadAvail == committeeIndex, nil
}

View File

@@ -0,0 +1,110 @@
package gloas
import (
"bytes"
"testing"
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/testing/require"
)
func buildStateWithBlockRoots(t *testing.T, stateSlot primitives.Slot, roots map[primitives.Slot][]byte) *state_native.BeaconState {
t.Helper()
cfg := params.BeaconConfig()
blockRoots := make([][]byte, cfg.SlotsPerHistoricalRoot)
for slot, root := range roots {
blockRoots[slot%cfg.SlotsPerHistoricalRoot] = root
}
stProto := &ethpb.BeaconStateGloas{
Slot: stateSlot,
BlockRoots: blockRoots,
}
state, err := state_native.InitializeFromProtoGloas(stProto)
require.NoError(t, err)
return state.(*state_native.BeaconState)
}
func TestMatchingPayload(t *testing.T) {
t.Run("pre-gloas always true", func(t *testing.T) {
stIface, err := state_native.InitializeFromProtoElectra(&ethpb.BeaconStateElectra{})
require.NoError(t, err)
ok, err := MatchingPayload(stIface, [32]byte{}, 0, 123)
require.NoError(t, err)
require.Equal(t, true, ok)
})
t.Run("same slot requires committee index 0", func(t *testing.T) {
root := bytes.Repeat([]byte{0xAA}, 32)
state := buildStateWithBlockRoots(t, 6, map[primitives.Slot][]byte{
4: root,
3: bytes.Repeat([]byte{0xBB}, 32),
})
var rootArr [32]byte
copy(rootArr[:], root)
ok, err := MatchingPayload(state, rootArr, 4, 1)
require.ErrorContains(t, "committee index", err)
require.Equal(t, false, ok)
})
t.Run("same slot matches when committee index is 0", func(t *testing.T) {
root := bytes.Repeat([]byte{0xAA}, 32)
state := buildStateWithBlockRoots(t, 6, map[primitives.Slot][]byte{
4: root,
3: bytes.Repeat([]byte{0xBB}, 32),
})
var rootArr [32]byte
copy(rootArr[:], root)
ok, err := MatchingPayload(state, rootArr, 4, 0)
require.NoError(t, err)
require.Equal(t, true, ok)
})
t.Run("non same slot checks payload availability", func(t *testing.T) {
cfg := params.BeaconConfig()
root := bytes.Repeat([]byte{0xAA}, 32)
blockRoots := make([][]byte, cfg.SlotsPerHistoricalRoot)
blockRoots[4%cfg.SlotsPerHistoricalRoot] = bytes.Repeat([]byte{0xCC}, 32)
blockRoots[3%cfg.SlotsPerHistoricalRoot] = bytes.Repeat([]byte{0xBB}, 32)
availability := make([]byte, cfg.SlotsPerHistoricalRoot/8)
slotIndex := uint64(4)
availability[slotIndex/8] = byte(1 << (slotIndex % 8))
stIface, err := state_native.InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
Slot: 6,
BlockRoots: blockRoots,
ExecutionPayloadAvailability: availability,
Fork: &ethpb.Fork{
CurrentVersion: bytes.Repeat([]byte{0x66}, 4),
PreviousVersion: bytes.Repeat([]byte{0x66}, 4),
Epoch: 0,
},
})
require.NoError(t, err)
state := stIface.(*state_native.BeaconState)
require.Equal(t, version.Gloas, state.Version())
var rootArr [32]byte
copy(rootArr[:], root)
ok, err := MatchingPayload(state, rootArr, 4, 1)
require.NoError(t, err)
require.Equal(t, true, ok)
ok, err = MatchingPayload(state, rootArr, 4, 0)
require.NoError(t, err)
require.Equal(t, false, ok)
})
}

View File

@@ -17,27 +17,56 @@ import (
)
// ProcessExecutionPayloadBid processes a signed execution payload bid in the Gloas fork.
// Spec v1.7.0-alpha.0 (pseudocode):
// process_execution_payload_bid(state: BeaconState, block: BeaconBlock):
//
// signed_bid = block.body.signed_execution_payload_bid
// bid = signed_bid.message
// builder_index = bid.builder_index
// amount = bid.value
// if builder_index == BUILDER_INDEX_SELF_BUILD:
// assert amount == 0
// assert signed_bid.signature == G2_POINT_AT_INFINITY
// else:
// assert is_active_builder(state, builder_index)
// assert can_builder_cover_bid(state, builder_index, amount)
// assert verify_execution_payload_bid_signature(state, signed_bid)
// assert bid.slot == block.slot
// assert bid.parent_block_hash == state.latest_block_hash
// assert bid.parent_block_root == block.parent_root
// assert bid.prev_randao == get_randao_mix(state, get_current_epoch(state))
// if amount > 0:
// state.builder_pending_payments[...] = BuilderPendingPayment(weight=0, withdrawal=BuilderPendingWithdrawal(fee_recipient=bid.fee_recipient, amount=amount, builder_index=builder_index))
// state.latest_execution_payload_bid = bid
// <spec fn="process_execution_payload_bid" fork="gloas" hash="823c9f3a">
// def process_execution_payload_bid(state: BeaconState, block: BeaconBlock) -> None:
// signed_bid = block.body.signed_execution_payload_bid
// bid = signed_bid.message
// builder_index = bid.builder_index
// amount = bid.value
//
// # For self-builds, amount must be zero regardless of withdrawal credential prefix
// if builder_index == BUILDER_INDEX_SELF_BUILD:
// assert amount == 0
// assert signed_bid.signature == bls.G2_POINT_AT_INFINITY
// else:
// # Verify that the builder is active
// assert is_active_builder(state, builder_index)
// # Verify that the builder has funds to cover the bid
// assert can_builder_cover_bid(state, builder_index, amount)
// # Verify that the bid signature is valid
// assert verify_execution_payload_bid_signature(state, signed_bid)
//
// # Verify commitments are under limit
// assert (
// len(bid.blob_kzg_commitments)
// <= get_blob_parameters(get_current_epoch(state)).max_blobs_per_block
// )
//
// # Verify that the bid is for the current slot
// assert bid.slot == block.slot
// # Verify that the bid is for the right parent block
// assert bid.parent_block_hash == state.latest_block_hash
// assert bid.parent_block_root == block.parent_root
// assert bid.prev_randao == get_randao_mix(state, get_current_epoch(state))
//
// # Record the pending payment if there is some payment
// if amount > 0:
// pending_payment = BuilderPendingPayment(
// weight=0,
// withdrawal=BuilderPendingWithdrawal(
// fee_recipient=bid.fee_recipient,
// amount=amount,
// builder_index=builder_index,
// ),
// )
// state.builder_pending_payments[SLOTS_PER_EPOCH + bid.slot % SLOTS_PER_EPOCH] = (
// pending_payment
// )
//
// # Cache the signed execution payload bid
// state.latest_execution_payload_bid = bid
// </spec>
func ProcessExecutionPayloadBid(st state.BeaconState, block interfaces.ReadOnlyBeaconBlock) error {
signedBid, err := block.Body().SignedExecutionPayloadBid()
if err != nil {
@@ -86,6 +115,12 @@ func ProcessExecutionPayloadBid(st state.BeaconState, block interfaces.ReadOnlyB
}
}
maxBlobsPerBlock := params.BeaconConfig().MaxBlobsPerBlockAtEpoch(slots.ToEpoch(block.Slot()))
commitmentCount := bid.BlobKzgCommitmentCount()
if commitmentCount > uint64(maxBlobsPerBlock) {
return fmt.Errorf("bid has %d blob KZG commitments over max %d", commitmentCount, maxBlobsPerBlock)
}
if err := validateBidConsistency(st, bid, block); err != nil {
return errors.Wrap(err, "bid consistency validation failed")
}

View File

@@ -184,6 +184,28 @@ func signBid(t *testing.T, sk common.SecretKey, bid *ethpb.ExecutionPayloadBid,
return out
}
func blobCommitmentsForSlot(slot primitives.Slot, count int) [][]byte {
max := int(params.BeaconConfig().MaxBlobsPerBlockAtEpoch(slots.ToEpoch(slot)))
if count > max {
count = max
}
commitments := make([][]byte, count)
for i := range commitments {
commitments[i] = bytes.Repeat([]byte{0xEE}, 48)
}
return commitments
}
func tooManyBlobCommitmentsForSlot(slot primitives.Slot) [][]byte {
max := int(params.BeaconConfig().MaxBlobsPerBlockAtEpoch(slots.ToEpoch(slot)))
count := max + 1
commitments := make([][]byte, count)
for i := range commitments {
commitments[i] = bytes.Repeat([]byte{0xEE}, 48)
}
return commitments
}
func TestProcessExecutionPayloadBid_SelfBuildSuccess(t *testing.T) {
slot := primitives.Slot(12)
proposerIdx := primitives.ValidatorIndex(0)
@@ -194,17 +216,17 @@ func TestProcessExecutionPayloadBid_SelfBuildSuccess(t *testing.T) {
state := buildGloasState(t, slot, proposerIdx, builderIdx, params.BeaconConfig().MinActivationBalance+1000, randao, latestHash, pubKey)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 0,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0xEE}, 32),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 0,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
}
signed := &ethpb.SignedExecutionPayloadBid{
Message: bid,
@@ -236,16 +258,16 @@ func TestProcessExecutionPayloadBid_SelfBuildNonZeroAmountFails(t *testing.T) {
state := buildGloasState(t, slot, proposerIdx, builderIdx, params.BeaconConfig().MinActivationBalance+1000, randao, latestHash, [48]byte{})
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xAA}, 32),
BlockHash: bytes.Repeat([]byte{0xBB}, 32),
PrevRandao: randao[:],
BuilderIndex: builderIdx,
Slot: slot,
Value: 10,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0xCC}, 32),
FeeRecipient: bytes.Repeat([]byte{0xDD}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xAA}, 32),
BlockHash: bytes.Repeat([]byte{0xBB}, 32),
PrevRandao: randao[:],
BuilderIndex: builderIdx,
Slot: slot,
Value: 10,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0xDD}, 20),
}
signed := &ethpb.SignedExecutionPayloadBid{
Message: bid,
@@ -280,17 +302,17 @@ func TestProcessExecutionPayloadBid_PendingPaymentAndCacheBid(t *testing.T) {
state := buildGloasState(t, slot, proposerIdx, builderIdx, balance, randao, latestHash, pubKey)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 500_000,
ExecutionPayment: 1,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0xEE}, 32),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 500_000,
ExecutionPayment: 1,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
}
genesis := bytesutil.ToBytes32(state.GenesisValidatorsRoot())
@@ -341,17 +363,17 @@ func TestProcessExecutionPayloadBid_BuilderNotActive(t *testing.T) {
state = stateIface.(*state_native.BeaconState)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0x03}, 32),
BlockHash: bytes.Repeat([]byte{0x04}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 10,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0x05}, 32),
FeeRecipient: bytes.Repeat([]byte{0x06}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0x03}, 32),
BlockHash: bytes.Repeat([]byte{0x04}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 10,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0x06}, 20),
}
genesis := bytesutil.ToBytes32(state.GenesisValidatorsRoot())
sig := signBid(t, sk, bid, state.Fork(), genesis)
@@ -394,17 +416,17 @@ func TestProcessExecutionPayloadBid_CannotCoverBid(t *testing.T) {
state = stateIface.(*state_native.BeaconState)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 25,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0xEE}, 32),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 25,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
}
genesis := bytesutil.ToBytes32(state.GenesisValidatorsRoot())
sig := signBid(t, sk, bid, state.Fork(), genesis)
@@ -436,17 +458,17 @@ func TestProcessExecutionPayloadBid_InvalidSignature(t *testing.T) {
state := buildGloasState(t, slot, proposerIdx, builderIdx, params.BeaconConfig().MinDepositAmount+1000, randao, latestHash, pubKey)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 10,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0xEE}, 32),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 10,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
}
// Use an invalid signature.
invalidSig := [96]byte{1}
@@ -463,6 +485,42 @@ func TestProcessExecutionPayloadBid_InvalidSignature(t *testing.T) {
require.ErrorContains(t, "bid signature validation failed", err)
}
func TestProcessExecutionPayloadBid_TooManyBlobCommitments(t *testing.T) {
slot := primitives.Slot(9)
proposerIdx := primitives.ValidatorIndex(0)
builderIdx := params.BeaconConfig().BuilderIndexSelfBuild
randao := [32]byte(bytes.Repeat([]byte{0xAA}, 32))
latestHash := [32]byte(bytes.Repeat([]byte{0xBB}, 32))
pubKey := [48]byte{}
state := buildGloasState(t, slot, proposerIdx, builderIdx, params.BeaconConfig().MinActivationBalance+1000, randao, latestHash, pubKey)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xCC}, 32),
BlockHash: bytes.Repeat([]byte{0xDD}, 32),
PrevRandao: randao[:],
BuilderIndex: builderIdx,
Slot: slot,
BlobKzgCommitments: tooManyBlobCommitmentsForSlot(slot),
FeeRecipient: bytes.Repeat([]byte{0xFF}, 20),
}
signed := &ethpb.SignedExecutionPayloadBid{
Message: bid,
Signature: common.InfiniteSignature[:],
}
block := stubBlock{
slot: slot,
proposer: proposerIdx,
parentRoot: bytesutil.ToBytes32(bid.ParentBlockRoot),
body: stubBlockBody{signedBid: signed},
v: version.Gloas,
}
err := ProcessExecutionPayloadBid(state, block)
require.ErrorContains(t, "blob KZG commitments over max", err)
}
func TestProcessExecutionPayloadBid_SlotMismatch(t *testing.T) {
slot := primitives.Slot(10)
builderIdx := primitives.BuilderIndex(1)
@@ -478,17 +536,17 @@ func TestProcessExecutionPayloadBid_SlotMismatch(t *testing.T) {
state := buildGloasState(t, slot, proposerIdx, builderIdx, params.BeaconConfig().MinDepositAmount+1000, randao, latestHash, pubKey)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xAA}, 32),
BlockHash: bytes.Repeat([]byte{0xBB}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot + 1, // mismatch
Value: 1,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0xCC}, 32),
FeeRecipient: bytes.Repeat([]byte{0xDD}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0xAA}, 32),
BlockHash: bytes.Repeat([]byte{0xBB}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot + 1, // mismatch
Value: 1,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0xDD}, 20),
}
genesis := bytesutil.ToBytes32(state.GenesisValidatorsRoot())
sig := signBid(t, sk, bid, state.Fork(), genesis)
@@ -520,17 +578,17 @@ func TestProcessExecutionPayloadBid_ParentHashMismatch(t *testing.T) {
state := buildGloasState(t, slot, proposerIdx, builderIdx, params.BeaconConfig().MinDepositAmount+1000, randao, latestHash, pubKey)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: bytes.Repeat([]byte{0x11}, 32), // mismatch
ParentBlockRoot: bytes.Repeat([]byte{0x22}, 32),
BlockHash: bytes.Repeat([]byte{0x33}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 1,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0x44}, 32),
FeeRecipient: bytes.Repeat([]byte{0x55}, 20),
ParentBlockHash: bytes.Repeat([]byte{0x11}, 32), // mismatch
ParentBlockRoot: bytes.Repeat([]byte{0x22}, 32),
BlockHash: bytes.Repeat([]byte{0x33}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 1,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0x55}, 20),
}
genesis := bytesutil.ToBytes32(state.GenesisValidatorsRoot())
sig := signBid(t, sk, bid, state.Fork(), genesis)
@@ -563,17 +621,17 @@ func TestProcessExecutionPayloadBid_ParentRootMismatch(t *testing.T) {
parentRoot := bytes.Repeat([]byte{0x22}, 32)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: parentRoot,
BlockHash: bytes.Repeat([]byte{0x33}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 1,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0x44}, 32),
FeeRecipient: bytes.Repeat([]byte{0x55}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: parentRoot,
BlockHash: bytes.Repeat([]byte{0x33}, 32),
PrevRandao: randao[:],
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 1,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0x55}, 20),
}
genesis := bytesutil.ToBytes32(state.GenesisValidatorsRoot())
sig := signBid(t, sk, bid, state.Fork(), genesis)
@@ -605,17 +663,17 @@ func TestProcessExecutionPayloadBid_PrevRandaoMismatch(t *testing.T) {
state := buildGloasState(t, slot, proposerIdx, builderIdx, params.BeaconConfig().MinDepositAmount+1000, randao, latestHash, pubKey)
bid := &ethpb.ExecutionPayloadBid{
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0x22}, 32),
BlockHash: bytes.Repeat([]byte{0x33}, 32),
PrevRandao: bytes.Repeat([]byte{0x01}, 32), // mismatch
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 1,
ExecutionPayment: 0,
BlobKzgCommitmentsRoot: bytes.Repeat([]byte{0x44}, 32),
FeeRecipient: bytes.Repeat([]byte{0x55}, 20),
ParentBlockHash: latestHash[:],
ParentBlockRoot: bytes.Repeat([]byte{0x22}, 32),
BlockHash: bytes.Repeat([]byte{0x33}, 32),
PrevRandao: bytes.Repeat([]byte{0x01}, 32), // mismatch
GasLimit: 1,
BuilderIndex: builderIdx,
Slot: slot,
Value: 1,
ExecutionPayment: 0,
BlobKzgCommitments: blobCommitmentsForSlot(slot, 1),
FeeRecipient: bytes.Repeat([]byte{0x55}, 20),
}
genesis := bytesutil.ToBytes32(state.GenesisValidatorsRoot())
sig := signBid(t, sk, bid, state.Fork(), genesis)

View File

@@ -24,14 +24,21 @@ import (
)
// ProcessPayloadAttestations validates payload attestations in a block body.
// Spec v1.7.0-alpha.0 (pseudocode):
// process_payload_attestation(state: BeaconState, payload_attestation: PayloadAttestation):
//
// data = payload_attestation.data
// assert data.beacon_block_root == state.latest_block_header.parent_root
// assert data.slot + 1 == state.slot
// indexed = get_indexed_payload_attestation(state, data.slot, payload_attestation)
// assert is_valid_indexed_payload_attestation(state, indexed)
// <spec fn="process_payload_attestation" fork="gloas" hash="f46bf0b0">
// def process_payload_attestation(
// state: BeaconState, payload_attestation: PayloadAttestation
// ) -> None:
// data = payload_attestation.data
//
// # Check that the attestation is for the parent beacon block
// assert data.beacon_block_root == state.latest_block_header.parent_root
// # Check that the attestation is for the previous slot
// assert data.slot + 1 == state.slot
// # Verify signature
// indexed_payload_attestation = get_indexed_payload_attestation(state, payload_attestation)
// assert is_valid_indexed_payload_attestation(state, indexed_payload_attestation)
// </spec>
func ProcessPayloadAttestations(ctx context.Context, st state.BeaconState, body interfaces.ReadOnlyBeaconBlockBody) error {
atts, err := body.PayloadAttestations()
if err != nil {
@@ -70,7 +77,7 @@ func ProcessPayloadAttestations(ctx context.Context, st state.BeaconState, body
// indexedPayloadAttestation converts a payload attestation into its indexed form.
func indexedPayloadAttestation(ctx context.Context, st state.ReadOnlyBeaconState, att *eth.PayloadAttestation) (*consensus_types.IndexedPayloadAttestation, error) {
committee, err := payloadCommittee(ctx, st, att.Data.Slot)
committee, err := PayloadCommittee(ctx, st, att.Data.Slot)
if err != nil {
return nil, err
}
@@ -89,19 +96,26 @@ func indexedPayloadAttestation(ctx context.Context, st state.ReadOnlyBeaconState
}, nil
}
// payloadCommittee returns the payload timeliness committee for a given slot for the state.
// Spec v1.7.0-alpha.0 (pseudocode):
// get_ptc(state: BeaconState, slot: Slot) -> Vector[ValidatorIndex, PTC_SIZE]:
// PayloadCommittee returns the payload timeliness committee for a given slot for the state.
//
// epoch = compute_epoch_at_slot(slot)
// seed = hash(get_seed(state, epoch, DOMAIN_PTC_ATTESTER) + uint_to_bytes(slot))
// indices = []
// committees_per_slot = get_committee_count_per_slot(state, epoch)
// for i in range(committees_per_slot):
// committee = get_beacon_committee(state, slot, CommitteeIndex(i))
// indices.extend(committee)
// return compute_balance_weighted_selection(state, indices, seed, size=PTC_SIZE, shuffle_indices=False)
func payloadCommittee(ctx context.Context, st state.ReadOnlyBeaconState, slot primitives.Slot) ([]primitives.ValidatorIndex, error) {
// <spec fn="get_ptc" fork="gloas" hash="ae15f761">
// def get_ptc(state: BeaconState, slot: Slot) -> Vector[ValidatorIndex, PTC_SIZE]:
// """
// Get the payload timeliness committee for the given ``slot``.
// """
// epoch = compute_epoch_at_slot(slot)
// seed = hash(get_seed(state, epoch, DOMAIN_PTC_ATTESTER) + uint_to_bytes(slot))
// indices: List[ValidatorIndex] = []
// # Concatenate all committees for this slot in order
// committees_per_slot = get_committee_count_per_slot(state, epoch)
// for i in range(committees_per_slot):
// committee = get_beacon_committee(state, slot, CommitteeIndex(i))
// indices.extend(committee)
// return compute_balance_weighted_selection(
// state, indices, seed, size=PTC_SIZE, shuffle_indices=False
// )
// </spec>
func PayloadCommittee(ctx context.Context, st state.ReadOnlyBeaconState, slot primitives.Slot) ([]primitives.ValidatorIndex, error) {
epoch := slots.ToEpoch(slot)
seed, err := ptcSeed(st, epoch, slot)
if err != nil {
@@ -142,6 +156,65 @@ func payloadCommittee(ctx context.Context, st state.ReadOnlyBeaconState, slot pr
return selected, nil
}
// PTCDuty represents a validator's PTC assignment for a slot.
type PTCDuty struct {
ValidatorIndex primitives.ValidatorIndex
Slot primitives.Slot
}
// PTCDuties returns PTC slot assignments for the requested validators in the epoch derived from the state's slot.
// It's optimized for batch lookups with early exit once all validators are found.
// Validators not in any PTC for the epoch will not appear in the result.
func PTCDuties(
ctx context.Context,
st state.ReadOnlyBeaconState,
validators map[primitives.ValidatorIndex]struct{},
) ([]PTCDuty, error) {
if len(validators) == 0 {
return nil, nil
}
epoch := slots.ToEpoch(st.Slot())
startSlot, err := slots.EpochStart(epoch)
if err != nil {
return nil, err
}
// Track remaining validators to find.
remaining := make(map[primitives.ValidatorIndex]struct{}, len(validators))
for v := range validators {
remaining[v] = struct{}{}
}
var duties []PTCDuty
endSlot := startSlot + params.BeaconConfig().SlotsPerEpoch
for slot := startSlot; slot < endSlot && len(remaining) > 0; slot++ {
if ctx.Err() != nil {
return nil, ctx.Err()
}
// Compute PTC for this slot.
ptc, err := PayloadCommittee(ctx, st, slot)
if err != nil {
return nil, errors.Wrapf(err, "failed to get PTC for slot %d", slot)
}
// Check which remaining validators are in this PTC.
for _, valIdx := range ptc {
if _, ok := remaining[valIdx]; ok {
duties = append(duties, PTCDuty{
ValidatorIndex: valIdx,
Slot: slot,
})
delete(remaining, valIdx)
}
}
}
return duties, nil
}
// ptcSeed computes the seed for the payload timeliness committee.
func ptcSeed(st state.ReadOnlyBeaconState, epoch primitives.Epoch, slot primitives.Slot) ([32]byte, error) {
seed, err := helpers.Seed(st, epoch, params.BeaconConfig().DomainPTCAttester)
@@ -152,17 +225,35 @@ func ptcSeed(st state.ReadOnlyBeaconState, epoch primitives.Epoch, slot primitiv
}
// selectByBalance selects a balance-weighted subset of input candidates.
// Spec v1.7.0-alpha.0 (pseudocode):
// compute_balance_weighted_selection(state, indices, seed, size, shuffle_indices):
// Note: shuffle_indices is false for PTC.
//
// total = len(indices); selected = []; i = 0
// while len(selected) < size:
// next = i % total
// if shuffle_indices: next = compute_shuffled_index(next, total, seed)
// if compute_balance_weighted_acceptance(state, indices[next], seed, i):
// selected.append(indices[next])
// i += 1
// <spec fn="compute_balance_weighted_selection" fork="gloas" hash="2c9f1c23">
// def compute_balance_weighted_selection(
// state: BeaconState,
// indices: Sequence[ValidatorIndex],
// seed: Bytes32,
// size: uint64,
// shuffle_indices: bool,
// ) -> Sequence[ValidatorIndex]:
// """
// Return ``size`` indices sampled by effective balance, using ``indices``
// as candidates. If ``shuffle_indices`` is ``True``, candidate indices
// are themselves sampled from ``indices`` by shuffling it, otherwise
// ``indices`` is traversed in order.
// """
// total = uint64(len(indices))
// assert total > 0
// selected: List[ValidatorIndex] = []
// i = uint64(0)
// while len(selected) < size:
// next_index = i % total
// if shuffle_indices:
// next_index = compute_shuffled_index(next_index, total, seed)
// candidate_index = indices[next_index]
// if compute_balance_weighted_acceptance(state, candidate_index, seed, i):
// selected.append(candidate_index)
// i += 1
// return selected
// </spec>
func selectByBalanceFill(
ctx context.Context,
st state.ReadOnlyBeaconState,
@@ -199,15 +290,22 @@ func selectByBalanceFill(
}
// acceptByBalance determines if a validator is accepted based on its effective balance.
// Spec v1.7.0-alpha.0 (pseudocode):
// compute_balance_weighted_acceptance(state, index, seed, i):
//
// MAX_RANDOM_VALUE = 2**16 - 1
// random_bytes = hash(seed + uint_to_bytes(i // 16))
// offset = i % 16 * 2
// random_value = bytes_to_uint64(random_bytes[offset:offset+2])
// effective_balance = state.validators[index].effective_balance
// return effective_balance * MAX_RANDOM_VALUE >= MAX_EFFECTIVE_BALANCE_ELECTRA * random_value
// <spec fn="compute_balance_weighted_acceptance" fork="gloas" hash="9954dcd0">
// def compute_balance_weighted_acceptance(
// state: BeaconState, index: ValidatorIndex, seed: Bytes32, i: uint64
// ) -> bool:
// """
// Return whether to accept the selection of the validator ``index``, with probability
// proportional to its ``effective_balance``, and randomness given by ``seed`` and ``i``.
// """
// MAX_RANDOM_VALUE = 2**16 - 1
// random_bytes = hash(seed + uint_to_bytes(i // 16))
// offset = i % 16 * 2
// random_value = bytes_to_uint64(random_bytes[offset : offset + 2])
// effective_balance = state.validators[index].effective_balance
// return effective_balance * MAX_RANDOM_VALUE >= MAX_EFFECTIVE_BALANCE_ELECTRA * random_value
// </spec>
func acceptByBalance(st state.ReadOnlyBeaconState, idx primitives.ValidatorIndex, seedBuf []byte, hashFunc func([]byte) [32]byte, maxBalance uint64, round uint64) (bool, error) {
// Reuse the seed buffer by overwriting the last 8 bytes with the round counter.
binary.LittleEndian.PutUint64(seedBuf[len(seedBuf)-8:], round/16)
@@ -224,16 +322,26 @@ func acceptByBalance(st state.ReadOnlyBeaconState, idx primitives.ValidatorIndex
}
// validIndexedPayloadAttestation verifies the signature of an indexed payload attestation.
// Spec v1.7.0-alpha.0 (pseudocode):
// is_valid_indexed_payload_attestation(state: BeaconState, indexed_payload_attestation: IndexedPayloadAttestation) -> bool:
//
// indices = indexed_payload_attestation.attesting_indices
// return len(indices) > 0 and indices == sorted(indices) and
// bls.FastAggregateVerify(
// [state.validators[i].pubkey for i in indices],
// compute_signing_root(indexed_payload_attestation.data, get_domain(state, DOMAIN_PTC_ATTESTER, compute_epoch_at_slot(attestation.data.slot)),
// indexed_payload_attestation.signature,
// )
// <spec fn="is_valid_indexed_payload_attestation" fork="gloas" hash="d76e0f89">
// def is_valid_indexed_payload_attestation(
// state: BeaconState, attestation: IndexedPayloadAttestation
// ) -> bool:
// """
// Check if ``attestation`` is non-empty, has sorted indices, and has
// a valid aggregate signature.
// """
// # Verify indices are non-empty and sorted
// indices = attestation.attesting_indices
// if len(indices) == 0 or not indices == sorted(indices):
// return False
//
// # Verify aggregate signature
// pubkeys = [state.validators[i].pubkey for i in indices]
// domain = get_domain(state, DOMAIN_PTC_ATTESTER, compute_epoch_at_slot(attestation.data.slot))
// signing_root = compute_signing_root(attestation.data, domain)
// return bls.FastAggregateVerify(pubkeys, signing_root, attestation.signature)
// </spec>
func validIndexedPayloadAttestation(st state.ReadOnlyBeaconState, att *consensus_types.IndexedPayloadAttestation) error {
indices := att.AttestingIndices
if len(indices) == 0 || !slices.IsSorted(indices) {

View File

@@ -291,6 +291,95 @@ func signAttestation(t *testing.T, st state.ReadOnlyBeaconState, data *eth.Paylo
return agg.Marshal()
}
func TestPTCDuties(t *testing.T) {
helpers.ClearCache()
setupTestConfig(t)
// Create state with enough validators.
numVals := 100
vals := make([]*eth.Validator, numVals)
for i := range numVals {
_, pk := newKey(t)
vals[i] = activeValidator(pk)
}
st := newTestState(t, vals, 0)
t.Run("returns duties for validators in PTC", func(t *testing.T) {
// Request duties for all validators.
requested := make(map[primitives.ValidatorIndex]struct{})
for i := range numVals {
requested[primitives.ValidatorIndex(i)] = struct{}{}
}
duties, err := gloas.PTCDuties(t.Context(), st, requested)
require.NoError(t, err)
require.NotEmpty(t, duties, "Should return some duties")
// Verify all returned duties are for requested validators.
for _, duty := range duties {
_, ok := requested[duty.ValidatorIndex]
require.Equal(t, true, ok, "Returned validator should be in requested set")
}
})
t.Run("returns empty for empty request", func(t *testing.T) {
requested := make(map[primitives.ValidatorIndex]struct{})
duties, err := gloas.PTCDuties(t.Context(), st, requested)
require.NoError(t, err)
require.Equal(t, 0, len(duties), "Should return no duties for empty request")
})
t.Run("returns empty for validators not in any PTC", func(t *testing.T) {
// Request duties for validators that don't exist.
requested := make(map[primitives.ValidatorIndex]struct{})
for i := 1000000; i < 1000010; i++ {
requested[primitives.ValidatorIndex(i)] = struct{}{}
}
duties, err := gloas.PTCDuties(t.Context(), st, requested)
require.NoError(t, err)
require.Equal(t, 0, len(duties), "Non-existent validators should have no duties")
})
t.Run("each validator has at most one duty per epoch", func(t *testing.T) {
requested := make(map[primitives.ValidatorIndex]struct{})
for i := range numVals {
requested[primitives.ValidatorIndex(i)] = struct{}{}
}
duties, err := gloas.PTCDuties(t.Context(), st, requested)
require.NoError(t, err)
// Check for duplicates.
seen := make(map[primitives.ValidatorIndex]bool)
for _, duty := range duties {
if seen[duty.ValidatorIndex] {
t.Errorf("Validator %d appears in duties multiple times", duty.ValidatorIndex)
}
seen[duty.ValidatorIndex] = true
}
})
t.Run("duties are deterministic", func(t *testing.T) {
requested := make(map[primitives.ValidatorIndex]struct{})
for i := range 50 {
requested[primitives.ValidatorIndex(i)] = struct{}{}
}
duties1, err := gloas.PTCDuties(t.Context(), st, requested)
require.NoError(t, err)
duties2, err := gloas.PTCDuties(t.Context(), st, requested)
require.NoError(t, err)
require.Equal(t, len(duties1), len(duties2), "Should return same number of duties")
for i := range duties1 {
require.Equal(t, duties1[i].ValidatorIndex, duties2[i].ValidatorIndex)
require.Equal(t, duties1[i].Slot, duties2[i].Slot)
}
})
}
type validatorLookupErrState struct {
state.BeaconState
errIndex primitives.ValidatorIndex

View File

@@ -10,17 +10,21 @@ import (
)
// ProcessBuilderPendingPayments processes the builder pending payments from the previous epoch.
// Spec v1.7.0-alpha.0 (pseudocode):
// def process_builder_pending_payments(state: BeaconState) -> None:
//
// quorum = get_builder_payment_quorum_threshold(state)
// for payment in state.builder_pending_payments[:SLOTS_PER_EPOCH]:
// if payment.weight >= quorum:
// state.builder_pending_withdrawals.append(payment.withdrawal)
// <spec fn="process_builder_pending_payments" fork="gloas" hash="10da48dd">
// def process_builder_pending_payments(state: BeaconState) -> None:
// """
// Processes the builder pending payments from the previous epoch.
// """
// quorum = get_builder_payment_quorum_threshold(state)
// for payment in state.builder_pending_payments[:SLOTS_PER_EPOCH]:
// if payment.weight >= quorum:
// state.builder_pending_withdrawals.append(payment.withdrawal)
//
// old_payments = state.builder_pending_payments[SLOTS_PER_EPOCH:]
// new_payments = [BuilderPendingPayment() for _ in range(SLOTS_PER_EPOCH)]
// state.builder_pending_payments = old_payments + new_payments
// old_payments = state.builder_pending_payments[SLOTS_PER_EPOCH:]
// new_payments = [BuilderPendingPayment() for _ in range(SLOTS_PER_EPOCH)]
// state.builder_pending_payments = old_payments + new_payments
// </spec>
func ProcessBuilderPendingPayments(state state.BeaconState) error {
quorum, err := builderQuorumThreshold(state)
if err != nil {
@@ -53,12 +57,16 @@ func ProcessBuilderPendingPayments(state state.BeaconState) error {
}
// builderQuorumThreshold calculates the quorum threshold for builder payments.
// Spec v1.7.0-alpha.0 (pseudocode):
// def get_builder_payment_quorum_threshold(state: BeaconState) -> uint64:
//
// per_slot_balance = get_total_active_balance(state) // SLOTS_PER_EPOCH
// quorum = per_slot_balance * BUILDER_PAYMENT_THRESHOLD_NUMERATOR
// return uint64(quorum // BUILDER_PAYMENT_THRESHOLD_DENOMINATOR)
// <spec fn="get_builder_payment_quorum_threshold" fork="gloas" hash="a64b7ffb">
// def get_builder_payment_quorum_threshold(state: BeaconState) -> uint64:
// """
// Calculate the quorum threshold for builder payments.
// """
// per_slot_balance = get_total_active_balance(state) // SLOTS_PER_EPOCH
// quorum = per_slot_balance * BUILDER_PAYMENT_THRESHOLD_NUMERATOR
// return uint64(quorum // BUILDER_PAYMENT_THRESHOLD_DENOMINATOR)
// </spec>
func builderQuorumThreshold(state state.ReadOnlyBeaconState) (primitives.Gwei, error) {
activeBalance, err := helpers.TotalActiveBalance(state)
if err != nil {

View File

@@ -11,16 +11,20 @@ import (
)
// RemoveBuilderPendingPayment removes the pending builder payment for the proposal slot.
// Spec v1.7.0 (pseudocode):
//
// <spec fn="process_proposer_slashing" fork="gloas" lines="22-32" hash="4da721ef">
// # [New in Gloas:EIP7732]
// # Remove the BuilderPendingPayment corresponding to
// # this proposal if it is still in the 2-epoch window.
// slot = header_1.slot
// proposal_epoch = compute_epoch_at_slot(slot)
// if proposal_epoch == get_current_epoch(state):
// payment_index = SLOTS_PER_EPOCH + slot % SLOTS_PER_EPOCH
// state.builder_pending_payments[payment_index] = BuilderPendingPayment()
// payment_index = SLOTS_PER_EPOCH + slot % SLOTS_PER_EPOCH
// state.builder_pending_payments[payment_index] = BuilderPendingPayment()
// elif proposal_epoch == get_previous_epoch(state):
// payment_index = slot % SLOTS_PER_EPOCH
// state.builder_pending_payments[payment_index] = BuilderPendingPayment()
// payment_index = slot % SLOTS_PER_EPOCH
// state.builder_pending_payments[payment_index] = BuilderPendingPayment()
// </spec>
func RemoveBuilderPendingPayment(st state.BeaconState, header *eth.BeaconBlockHeader) error {
proposalEpoch := slots.ToEpoch(header.Slot)
currentEpoch := time.CurrentEpoch(st)

View File

@@ -33,7 +33,6 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

View File

@@ -1,15 +1,11 @@
package peerdas
import (
stderrors "errors"
"iter"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/container/trie"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/pkg/errors"
)
@@ -20,7 +16,6 @@ var (
ErrIndexTooLarge = errors.New("column index is larger than the specified columns count")
ErrNoKzgCommitments = errors.New("no KZG commitments found")
ErrMismatchLength = errors.New("mismatch in the length of the column, commitments or proofs")
ErrEmptySegment = errors.New("empty segment in batch")
ErrInvalidKZGProof = errors.New("invalid KZG proof")
ErrBadRootLength = errors.New("bad root length")
ErrInvalidInclusionProof = errors.New("invalid inclusion proof")
@@ -62,127 +57,62 @@ func VerifyDataColumnSidecar(sidecar blocks.RODataColumn) error {
return nil
}
// CellProofBundleSegment is returned when a batch fails. The caller can call
// the `.Verify` method to verify just this segment.
type CellProofBundleSegment struct {
indices []uint64
commitments []kzg.Bytes48
cells []kzg.Cell
proofs []kzg.Bytes48
}
// Verify verifies this segment without batching.
func (s CellProofBundleSegment) Verify() error {
if len(s.cells) == 0 {
return ErrEmptySegment
}
verified, err := kzg.VerifyCellKZGProofBatch(s.commitments, s.indices, s.cells, s.proofs)
if err != nil {
return stderrors.Join(err, ErrInvalidKZGProof)
}
if !verified {
return ErrInvalidKZGProof
}
return nil
}
func VerifyDataColumnsCellsKZGProofs(sizeHint int, cellProofsIter iter.Seq[blocks.CellProofBundle]) error {
// ignore the failed segment list since we are just passing in one segment.
_, err := BatchVerifyDataColumnsCellsKZGProofs(sizeHint, []iter.Seq[blocks.CellProofBundle]{cellProofsIter})
return err
}
// BatchVerifyDataColumnsCellsKZGProofs verifies if the KZG proofs are correct.
// VerifyDataColumnsSidecarKZGProofs verifies if the KZG proofs are correct.
// Note: We are slightly deviating from the specification here:
// The specification verifies the KZG proofs for each sidecar separately,
// while we are verifying all the KZG proofs from multiple sidecars in a batch.
// This is done to improve performance since the internal KZG library is way more
// efficient when verifying in batch. If the batch fails, the failed segments
// are returned to the caller so that they may try segment by segment without
// batching. On success the failed segment list is empty.
//
// efficient when verifying in batch.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#verify_data_column_sidecar_kzg_proofs
func BatchVerifyDataColumnsCellsKZGProofs(sizeHint int, cellProofsIters []iter.Seq[blocks.CellProofBundle]) ( /* failed segment list */ []CellProofBundleSegment, error) {
commitments := make([]kzg.Bytes48, 0, sizeHint)
indices := make([]uint64, 0, sizeHint)
cells := make([]kzg.Cell, 0, sizeHint)
proofs := make([]kzg.Bytes48, 0, sizeHint)
func VerifyDataColumnsSidecarKZGProofs(sidecars []blocks.RODataColumn) error {
// Compute the total count.
count := 0
for _, sidecar := range sidecars {
count += len(sidecar.Column)
}
var anySegmentEmpty bool
var segments []CellProofBundleSegment
for _, cellProofsIter := range cellProofsIters {
startIdx := len(cells)
for bundle := range cellProofsIter {
commitments := make([]kzg.Bytes48, 0, count)
indices := make([]uint64, 0, count)
cells := make([]kzg.Cell, 0, count)
proofs := make([]kzg.Bytes48, 0, count)
for _, sidecar := range sidecars {
for i := range sidecar.Column {
var (
commitment kzg.Bytes48
cell kzg.Cell
proof kzg.Bytes48
)
if len(bundle.Commitment) != len(commitment) ||
len(bundle.Cell) != len(cell) ||
len(bundle.Proof) != len(proof) {
return nil, ErrMismatchLength
commitmentBytes := sidecar.KzgCommitments[i]
cellBytes := sidecar.Column[i]
proofBytes := sidecar.KzgProofs[i]
if len(commitmentBytes) != len(commitment) ||
len(cellBytes) != len(cell) ||
len(proofBytes) != len(proof) {
return ErrMismatchLength
}
copy(commitment[:], bundle.Commitment)
copy(cell[:], bundle.Cell)
copy(proof[:], bundle.Proof)
copy(commitment[:], commitmentBytes)
copy(cell[:], cellBytes)
copy(proof[:], proofBytes)
commitments = append(commitments, commitment)
indices = append(indices, bundle.ColumnIndex)
indices = append(indices, sidecar.Index)
cells = append(cells, cell)
proofs = append(proofs, proof)
}
if len(cells[startIdx:]) == 0 {
anySegmentEmpty = true
}
segments = append(segments, CellProofBundleSegment{
indices: indices[startIdx:],
commitments: commitments[startIdx:],
cells: cells[startIdx:],
proofs: proofs[startIdx:],
})
}
if anySegmentEmpty {
return segments, ErrEmptySegment
}
// Batch verify that the cells match the corresponding commitments and proofs.
verified, err := kzg.VerifyCellKZGProofBatch(commitments, indices, cells, proofs)
if err != nil {
return segments, stderrors.Join(err, ErrInvalidKZGProof)
return errors.Wrap(err, "verify cell KZG proof batch")
}
if !verified {
return segments, ErrInvalidKZGProof
}
return nil, nil
}
// verifyKzgCommitmentsInclusionProof is the shared implementation for inclusion proof verification.
func verifyKzgCommitmentsInclusionProof(bodyRoot []byte, kzgCommitments [][]byte, inclusionProof [][]byte) error {
if len(bodyRoot) != fieldparams.RootLength {
return ErrBadRootLength
}
leaves := blocks.LeavesFromCommitments(kzgCommitments)
sparse, err := trie.GenerateTrieFromItems(leaves, fieldparams.LogMaxBlobCommitments)
if err != nil {
return errors.Wrap(err, "generate trie from items")
}
hashTreeRoot, err := sparse.HashTreeRoot()
if err != nil {
return errors.Wrap(err, "hash tree root")
}
verified := trie.VerifyMerkleProof(bodyRoot, hashTreeRoot[:], kzgPosition, inclusionProof)
if !verified {
return ErrInvalidInclusionProof
return ErrInvalidKZGProof
}
return nil
@@ -194,23 +124,30 @@ func VerifyDataColumnSidecarInclusionProof(sidecar blocks.RODataColumn) error {
if sidecar.SignedBlockHeader == nil || sidecar.SignedBlockHeader.Header == nil {
return ErrNilBlockHeader
}
return verifyKzgCommitmentsInclusionProof(
sidecar.SignedBlockHeader.Header.BodyRoot,
sidecar.KzgCommitments,
sidecar.KzgCommitmentsInclusionProof,
)
}
// VerifyPartialDataColumnHeaderInclusionProof verifies if the KZG commitments are included in the beacon block.
func VerifyPartialDataColumnHeaderInclusionProof(header *ethpb.PartialDataColumnHeader) error {
if header.SignedBlockHeader == nil || header.SignedBlockHeader.Header == nil {
return ErrNilBlockHeader
root := sidecar.SignedBlockHeader.Header.BodyRoot
if len(root) != fieldparams.RootLength {
return ErrBadRootLength
}
return verifyKzgCommitmentsInclusionProof(
header.SignedBlockHeader.Header.BodyRoot,
header.KzgCommitments,
header.KzgCommitmentsInclusionProof,
)
leaves := blocks.LeavesFromCommitments(sidecar.KzgCommitments)
sparse, err := trie.GenerateTrieFromItems(leaves, fieldparams.LogMaxBlobCommitments)
if err != nil {
return errors.Wrap(err, "generate trie from items")
}
hashTreeRoot, err := sparse.HashTreeRoot()
if err != nil {
return errors.Wrap(err, "hash tree root")
}
verified := trie.VerifyMerkleProof(root, hashTreeRoot[:], kzgPosition, sidecar.KzgCommitmentsInclusionProof)
if !verified {
return ErrInvalidInclusionProof
}
return nil
}
// ComputeSubnetForDataColumnSidecar computes the subnet for a data column sidecar.

View File

@@ -3,7 +3,6 @@ package peerdas_test
import (
"crypto/rand"
"fmt"
"iter"
"testing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
@@ -73,7 +72,7 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
sidecars := generateRandomSidecars(t, seed, blobCount)
sidecars[0].Column[0] = sidecars[0].Column[0][:len(sidecars[0].Column[0])-1] // Remove one byte to create size mismatch
err := peerdas.VerifyDataColumnsCellsKZGProofs(0, blocks.RODataColumnsToCellProofBundles(sidecars))
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
require.ErrorIs(t, err, peerdas.ErrMismatchLength)
})
@@ -81,15 +80,14 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
sidecars := generateRandomSidecars(t, seed, blobCount)
sidecars[0].Column[0][0]++ // It is OK to overflow
err := peerdas.VerifyDataColumnsCellsKZGProofs(0, blocks.RODataColumnsToCellProofBundles(sidecars))
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
require.ErrorIs(t, err, peerdas.ErrInvalidKZGProof)
})
t.Run("nominal", func(t *testing.T) {
sidecars := generateRandomSidecars(t, seed, blobCount)
failedSegments, err := peerdas.BatchVerifyDataColumnsCellsKZGProofs(blobCount, []iter.Seq[blocks.CellProofBundle]{blocks.RODataColumnsToCellProofBundles(sidecars)})
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
require.NoError(t, err)
require.Equal(t, 0, len(failedSegments))
})
}
@@ -275,7 +273,7 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_SameCommitments_NoBatch(b *testin
for _, sidecar := range sidecars {
sidecars := []blocks.RODataColumn{sidecar}
b.StartTimer()
err := peerdas.VerifyDataColumnsCellsKZGProofs(0, blocks.RODataColumnsToCellProofBundles(sidecars))
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
b.StopTimer()
require.NoError(b, err)
}
@@ -310,7 +308,7 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_DiffCommitments_Batch(b *testing.
}
b.StartTimer()
err := peerdas.VerifyDataColumnsCellsKZGProofs(0, blocks.RODataColumnsToCellProofBundles(allSidecars))
err := peerdas.VerifyDataColumnsSidecarKZGProofs(allSidecars)
b.StopTimer()
require.NoError(b, err)
}
@@ -343,7 +341,7 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_DiffCommitments_Batch4(b *testing
for _, sidecars := range allSidecars {
b.StartTimer()
err := peerdas.VerifyDataColumnsCellsKZGProofs(len(allSidecars), blocks.RODataColumnsToCellProofBundles(sidecars))
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
b.StopTimer()
require.NoError(b, err)
}

View File

@@ -5,7 +5,6 @@ import (
"sync"
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
@@ -340,8 +339,7 @@ func ComputeCellsAndProofsFromFlat(blobs [][]byte, cellProofs [][]byte) ([][]kzg
}
// ComputeCellsAndProofsFromStructured computes the cells and proofs from blobs and cell proofs.
// commitmentCount is required to return the correct sized bitlist even if we see a nil slice of blobsAndProofs.
func ComputeCellsAndProofsFromStructured(commitmentCount uint64, blobsAndProofs []*pb.BlobAndProofV2) (bitfield.Bitlist /* parts included */, [][]kzg.Cell, [][]kzg.Proof, error) {
func ComputeCellsAndProofsFromStructured(blobsAndProofs []*pb.BlobAndProofV2) ([][]kzg.Cell, [][]kzg.Proof, error) {
start := time.Now()
defer func() {
cellsAndProofsFromStructuredComputationTime.Observe(float64(time.Since(start).Milliseconds()))
@@ -349,24 +347,14 @@ func ComputeCellsAndProofsFromStructured(commitmentCount uint64, blobsAndProofs
var wg errgroup.Group
var blobsPresent int
for _, blobAndProof := range blobsAndProofs {
if blobAndProof != nil {
blobsPresent++
}
}
cellsPerBlob := make([][]kzg.Cell, blobsPresent)
proofsPerBlob := make([][]kzg.Proof, blobsPresent)
included := bitfield.NewBitlist(commitmentCount)
cellsPerBlob := make([][]kzg.Cell, len(blobsAndProofs))
proofsPerBlob := make([][]kzg.Proof, len(blobsAndProofs))
var j int
for i, blobAndProof := range blobsAndProofs {
if blobAndProof == nil {
continue
return nil, nil, ErrNilBlobAndProof
}
included.SetBitAt(uint64(i), true)
compactIndex := j
wg.Go(func() error {
var kzgBlob kzg.Blob
if copy(kzgBlob[:], blobAndProof.Blob) != len(kzgBlob) {
@@ -393,18 +381,17 @@ func ComputeCellsAndProofsFromStructured(commitmentCount uint64, blobsAndProofs
kzgProofs = append(kzgProofs, kzgProof)
}
cellsPerBlob[compactIndex] = cells
proofsPerBlob[compactIndex] = kzgProofs
cellsPerBlob[i] = cells
proofsPerBlob[i] = kzgProofs
return nil
})
j++
}
if err := wg.Wait(); err != nil {
return nil, nil, nil, err
return nil, nil, err
}
return included, cellsPerBlob, proofsPerBlob, nil
return cellsPerBlob, proofsPerBlob, nil
}
// ReconstructBlobs reconstructs blobs from data column sidecars without computing KZG proofs or creating sidecars.

View File

@@ -479,9 +479,8 @@ func TestComputeCellsAndProofsFromFlat(t *testing.T) {
func TestComputeCellsAndProofsFromStructured(t *testing.T) {
t.Run("nil blob and proof", func(t *testing.T) {
included, _, _, err := peerdas.ComputeCellsAndProofsFromStructured(0, []*pb.BlobAndProofV2{nil})
require.NoError(t, err)
require.Equal(t, uint64(0), included.Count())
_, _, err := peerdas.ComputeCellsAndProofsFromStructured([]*pb.BlobAndProofV2{nil})
require.ErrorIs(t, err, peerdas.ErrNilBlobAndProof)
})
t.Run("nominal", func(t *testing.T) {
@@ -534,8 +533,7 @@ func TestComputeCellsAndProofsFromStructured(t *testing.T) {
require.NoError(t, err)
// Test ComputeCellsAndProofs
included, actualCellsPerBlob, actualProofsPerBlob, err := peerdas.ComputeCellsAndProofsFromStructured(uint64(len(blobsAndProofs)), blobsAndProofs)
require.Equal(t, included.Count(), uint64(len(actualCellsPerBlob)))
actualCellsPerBlob, actualProofsPerBlob, err := peerdas.ComputeCellsAndProofsFromStructured(blobsAndProofs)
require.NoError(t, err)
require.Equal(t, blobCount, len(actualCellsPerBlob))

View File

@@ -3,7 +3,6 @@ package peerdas
import (
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
beaconState "github.com/OffchainLabs/prysm/v7/beacon-chain/state"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
@@ -144,40 +143,6 @@ func DataColumnSidecars(cellsPerBlob [][]kzg.Cell, proofsPerBlob [][]kzg.Proof,
return roSidecars, nil
}
func PartialColumns(included bitfield.Bitlist, cellsPerBlob [][]kzg.Cell, proofsPerBlob [][]kzg.Proof, src ConstructionPopulator) ([]blocks.PartialDataColumn, error) {
start := time.Now()
const numberOfColumns = uint64(fieldparams.NumberOfColumns)
cells, proofs, err := rotateRowsToCols(cellsPerBlob, proofsPerBlob, numberOfColumns)
if err != nil {
return nil, errors.Wrap(err, "rotate cells and proofs")
}
info, err := src.extract()
if err != nil {
return nil, errors.Wrap(err, "extract block info")
}
dataColumns := make([]blocks.PartialDataColumn, 0, numberOfColumns)
for idx := range numberOfColumns {
dc, err := blocks.NewPartialDataColumn(info.signedBlockHeader, idx, info.kzgCommitments, info.kzgInclusionProof)
if err != nil {
return nil, errors.Wrap(err, "new ro data column")
}
for i := range len(info.kzgCommitments) {
if !included.BitAt(uint64(i)) {
continue
}
dc.ExtendFromVerfifiedCell(uint64(i), cells[idx][0], proofs[idx][0])
cells[idx] = cells[idx][1:]
proofs[idx] = proofs[idx][1:]
}
dataColumns = append(dataColumns, dc)
}
dataColumnComputationTime.Observe(float64(time.Since(start).Milliseconds()))
return dataColumns, nil
}
// Slot returns the slot of the source
func (s *BlockReconstructionSource) Slot() primitives.Slot {
return s.Block().Slot()

View File

@@ -143,10 +143,11 @@ func ProcessSlot(ctx context.Context, state state.BeaconState) (state.BeaconStat
return nil, err
}
// Spec v1.6.1 (pseudocode):
// <spec fn="process_slot" fork="gloas" lines="11-13" hash="62b28839">
// # [New in Gloas:EIP7732]
// # Unset the next payload availability
// state.execution_payload_availability[(state.slot + 1) % SLOTS_PER_HISTORICAL_ROOT] = 0b0
// </spec>
if state.Version() >= version.Gloas {
index := uint64((state.Slot() + 1) % params.BeaconConfig().SlotsPerHistoricalRoot)
if err := state.UpdateExecutionPayloadAvailabilityAtIndex(index, 0x0); err != nil {

View File

@@ -78,7 +78,7 @@ func newGloasState(t *testing.T, slot primitives.Slot, availability []byte) stat
BlockHash: make([]byte, 32),
PrevRandao: make([]byte, 32),
FeeRecipient: make([]byte, 20),
BlobKzgCommitmentsRoot: make([]byte, 32),
BlobKzgCommitments: [][]byte{make([]byte, 48)},
},
Eth1Data: &ethpb.Eth1Data{
DepositRoot: make([]byte, 32),

View File

@@ -66,6 +66,10 @@ type ReadOnlyDatabase interface {
OriginCheckpointBlockRoot(ctx context.Context) ([32]byte, error)
BackfillStatus(context.Context) (*dbval.BackfillStatus, error)
// Execution payload envelope operations (Gloas+).
ExecutionPayloadEnvelope(ctx context.Context, blockRoot [32]byte) (*ethpb.SignedBlindedExecutionPayloadEnvelope, error)
HasExecutionPayloadEnvelope(ctx context.Context, blockRoot [32]byte) bool
// P2P Metadata operations.
MetadataSeqNum(ctx context.Context) (uint64, error)
}
@@ -115,6 +119,10 @@ type NoHeadAccessDatabase interface {
SaveLightClientUpdate(ctx context.Context, period uint64, update interfaces.LightClientUpdate) error
SaveLightClientBootstrap(ctx context.Context, blockRoot []byte, bootstrap interfaces.LightClientBootstrap) error
// Execution payload envelope operations (Gloas+).
SaveExecutionPayloadEnvelope(ctx context.Context, envelope *ethpb.SignedExecutionPayloadEnvelope) error
DeleteExecutionPayloadEnvelope(ctx context.Context, blockRoot [32]byte) error
CleanUpDirtyStates(ctx context.Context, slotsPerArchivedPoint primitives.Slot) error
DeleteHistoricalDataBeforeSlot(ctx context.Context, slot primitives.Slot, batchSize int) (int, error)

View File

@@ -13,6 +13,7 @@ go_library(
"encoding.go",
"error.go",
"execution_chain.go",
"execution_payload_envelope.go",
"finalized_block_roots.go",
"genesis.go",
"key.go",
@@ -96,6 +97,7 @@ go_test(
"deposit_contract_test.go",
"encoding_test.go",
"execution_chain_test.go",
"execution_payload_envelope_test.go",
"finalized_block_roots_test.go",
"genesis_test.go",
"init_test.go",

View File

@@ -2,6 +2,7 @@ package kv
import (
"context"
"slices"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
@@ -33,6 +34,9 @@ func (s *Store) LastArchivedRoot(ctx context.Context) [32]byte {
if err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(stateSlotIndicesBucket)
_, blockRoot = bkt.Cursor().Last()
if len(blockRoot) > 0 {
blockRoot = slices.Clone(blockRoot)
}
return nil
}); err != nil { // This view never returns an error, but we'll handle anyway for sanity.
panic(err) // lint:nopanic -- View never returns an error.
@@ -51,6 +55,9 @@ func (s *Store) ArchivedPointRoot(ctx context.Context, slot primitives.Slot) [32
if err := s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(stateSlotIndicesBucket)
blockRoot = bucket.Get(bytesutil.SlotToBytesBigEndian(slot))
if len(blockRoot) > 0 {
blockRoot = slices.Clone(blockRoot)
}
return nil
}); err != nil { // This view never returns an error, but we'll handle anyway for sanity.
panic(err) // lint:nopanic -- View never returns an error.

View File

@@ -517,6 +517,10 @@ func (s *Store) DeleteHistoricalDataBeforeSlot(ctx context.Context, cutoffSlot p
return errors.Wrap(err, "could not delete validators")
}
// TODO: execution payload envelopes (Gloas+) are keyed by execution payload
// block hash, not beacon block root, so they cannot be pruned in this loop.
// A separate pruning mechanism is needed (e.g. secondary index or cursor scan).
numSlotsDeleted++
}
@@ -812,7 +816,10 @@ func (s *Store) FeeRecipientByValidatorID(ctx context.Context, id primitives.Val
var addr []byte
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(feeRecipientBucket)
addr = bkt.Get(bytesutil.Uint64ToBytesBigEndian(uint64(id)))
stored := bkt.Get(bytesutil.Uint64ToBytesBigEndian(uint64(id)))
if len(stored) > 0 {
addr = slices.Clone(stored)
}
// IF the fee recipient is not found in the standard fee recipient bucket, then
// check the registration bucket. The fee recipient may be there.
// This is to resolve imcompatility until we fully migrate to the registration bucket.
@@ -826,7 +833,7 @@ func (s *Store) FeeRecipientByValidatorID(ctx context.Context, id primitives.Val
if err := decode(ctx, enc, reg); err != nil {
return err
}
addr = reg.FeeRecipient
addr = slices.Clone(reg.FeeRecipient)
}
return nil
})
@@ -1247,6 +1254,12 @@ func unmarshalBlock(_ context.Context, enc []byte) (interfaces.ReadOnlySignedBea
if err := rawBlock.UnmarshalSSZ(enc[len(fuluBlindKey):]); err != nil {
return nil, errors.Wrap(err, "could not unmarshal blinded Fulu block")
}
case hasGloasKey(enc):
// post Gloas we save the full beacon block as EIP-7732 separates beacon block and payload
rawBlock = &ethpb.SignedBeaconBlockGloas{}
if err := rawBlock.UnmarshalSSZ(enc[len(gloasKey):]); err != nil {
return nil, errors.Wrap(err, "could not unmarshal Gloas block")
}
default:
// Marshal block bytes to phase 0 beacon block.
rawBlock = &ethpb.SignedBeaconBlock{}
@@ -1277,6 +1290,11 @@ func encodeBlock(blk interfaces.ReadOnlySignedBeaconBlock) ([]byte, error) {
func keyForBlock(blk interfaces.ReadOnlySignedBeaconBlock) ([]byte, error) {
v := blk.Version()
if v >= version.Gloas {
// Gloas blocks are never blinded (no execution payload in block body).
return gloasKey, nil
}
if v >= version.Fulu {
if blk.IsBlinded() {
return fuluBlindKey, nil

View File

@@ -151,6 +151,17 @@ var blockTests = []struct {
}
return blocks.NewSignedBeaconBlock(b)
}},
{
name: "gloas",
newBlock: func(slot primitives.Slot, root []byte) (interfaces.ReadOnlySignedBeaconBlock, error) {
b := util.NewBeaconBlockGloas()
b.Block.Slot = slot
if root != nil {
b.Block.ParentRoot = root
}
return blocks.NewSignedBeaconBlock(b)
},
},
}
func TestStore_SaveBlock_NoDuplicates(t *testing.T) {
@@ -211,7 +222,7 @@ func TestStore_BlocksCRUD(t *testing.T) {
retrievedBlock, err = db.Block(ctx, blockRoot)
require.NoError(t, err)
wanted := retrievedBlock
if retrievedBlock.Version() >= version.Bellatrix {
if retrievedBlock.Version() >= version.Bellatrix && retrievedBlock.Version() < version.Gloas {
wanted, err = retrievedBlock.ToBlinded()
require.NoError(t, err)
}
@@ -643,7 +654,7 @@ func TestStore_BlocksCRUD_NoCache(t *testing.T) {
require.NoError(t, err)
wanted := blk
if blk.Version() >= version.Bellatrix {
if blk.Version() >= version.Bellatrix && blk.Version() < version.Gloas {
wanted, err = blk.ToBlinded()
require.NoError(t, err)
}
@@ -1014,7 +1025,7 @@ func TestStore_SaveBlock_CanGetHighestAt(t *testing.T) {
b, err := db.Block(ctx, root)
require.NoError(t, err)
wanted := block1
if block1.Version() >= version.Bellatrix {
if block1.Version() >= version.Bellatrix && block1.Version() < version.Gloas {
wanted, err = wanted.ToBlinded()
require.NoError(t, err)
}
@@ -1032,7 +1043,7 @@ func TestStore_SaveBlock_CanGetHighestAt(t *testing.T) {
b, err = db.Block(ctx, root)
require.NoError(t, err)
wanted2 := block2
if block2.Version() >= version.Bellatrix {
if block2.Version() >= version.Bellatrix && block2.Version() < version.Gloas {
wanted2, err = block2.ToBlinded()
require.NoError(t, err)
}
@@ -1050,7 +1061,7 @@ func TestStore_SaveBlock_CanGetHighestAt(t *testing.T) {
b, err = db.Block(ctx, root)
require.NoError(t, err)
wanted = block3
if block3.Version() >= version.Bellatrix {
if block3.Version() >= version.Bellatrix && block3.Version() < version.Gloas {
wanted, err = wanted.ToBlinded()
require.NoError(t, err)
}
@@ -1086,7 +1097,7 @@ func TestStore_GenesisBlock_CanGetHighestAt(t *testing.T) {
b, err := db.Block(ctx, root)
require.NoError(t, err)
wanted := block1
if block1.Version() >= version.Bellatrix {
if block1.Version() >= version.Bellatrix && block1.Version() < version.Gloas {
wanted, err = block1.ToBlinded()
require.NoError(t, err)
}
@@ -1103,7 +1114,7 @@ func TestStore_GenesisBlock_CanGetHighestAt(t *testing.T) {
b, err = db.Block(ctx, root)
require.NoError(t, err)
wanted = genesisBlock
if genesisBlock.Version() >= version.Bellatrix {
if genesisBlock.Version() >= version.Bellatrix && genesisBlock.Version() < version.Gloas {
wanted, err = genesisBlock.ToBlinded()
require.NoError(t, err)
}
@@ -1120,7 +1131,7 @@ func TestStore_GenesisBlock_CanGetHighestAt(t *testing.T) {
b, err = db.Block(ctx, root)
require.NoError(t, err)
wanted = genesisBlock
if genesisBlock.Version() >= version.Bellatrix {
if genesisBlock.Version() >= version.Bellatrix && genesisBlock.Version() < version.Gloas {
wanted, err = genesisBlock.ToBlinded()
require.NoError(t, err)
}
@@ -1216,7 +1227,7 @@ func TestStore_BlocksBySlot_BlockRootsBySlot(t *testing.T) {
require.NoError(t, err)
wanted := b1
if b1.Version() >= version.Bellatrix {
if b1.Version() >= version.Bellatrix && b1.Version() < version.Gloas {
wanted, err = b1.ToBlinded()
require.NoError(t, err)
}
@@ -1232,7 +1243,7 @@ func TestStore_BlocksBySlot_BlockRootsBySlot(t *testing.T) {
t.Fatalf("Expected 2 blocks, received %d blocks", len(retrievedBlocks))
}
wanted = b2
if b2.Version() >= version.Bellatrix {
if b2.Version() >= version.Bellatrix && b2.Version() < version.Gloas {
wanted, err = b2.ToBlinded()
require.NoError(t, err)
}
@@ -1242,7 +1253,7 @@ func TestStore_BlocksBySlot_BlockRootsBySlot(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, true, proto.Equal(wantedPb, retrieved0Pb), "Wanted: %v, received: %v", retrievedBlocks[0], wanted)
wanted = b3
if b3.Version() >= version.Bellatrix {
if b3.Version() >= version.Bellatrix && b3.Version() < version.Gloas {
wanted, err = b3.ToBlinded()
require.NoError(t, err)
}

View File

@@ -3,6 +3,7 @@ package kv
import (
"context"
"fmt"
"slices"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
"github.com/ethereum/go-ethereum/common"
@@ -17,7 +18,10 @@ func (s *Store) DepositContractAddress(ctx context.Context) ([]byte, error) {
var addr []byte
if err := s.db.View(func(tx *bolt.Tx) error {
chainInfo := tx.Bucket(chainMetadataBucket)
addr = chainInfo.Get(depositContractAddressKey)
stored := chainInfo.Get(depositContractAddressKey)
if len(stored) > 0 {
addr = slices.Clone(stored)
}
return nil
}); err != nil { // This view never returns an error, but we'll handle anyway for sanity.
panic(err) // lint:nopanic -- View never returns an error.

View File

@@ -0,0 +1,123 @@
package kv
import (
"context"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/golang/snappy"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
)
// SaveExecutionPayloadEnvelope blinds and saves a signed execution payload envelope keyed by
// beacon block root. The envelope is stored in blinded form: the full execution payload is replaced
// with its block hash. The full payload can later be retrieved from the EL via
// engine_getPayloadBodiesByHash.
func (s *Store) SaveExecutionPayloadEnvelope(ctx context.Context, env *ethpb.SignedExecutionPayloadEnvelope) error {
_, span := trace.StartSpan(ctx, "BeaconDB.SaveExecutionPayloadEnvelope")
defer span.End()
if env == nil || env.Message == nil || env.Message.Payload == nil {
return errors.New("cannot save nil execution payload envelope")
}
blockRoot := bytesutil.ToBytes32(env.Message.BeaconBlockRoot)
blinded := blindEnvelope(env)
enc, err := encodeBlindedEnvelope(blinded)
if err != nil {
return err
}
return s.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(executionPayloadEnvelopesBucket)
return bkt.Put(blockRoot[:], enc)
})
}
// ExecutionPayloadEnvelope retrieves the blinded signed execution payload envelope by beacon block root.
func (s *Store) ExecutionPayloadEnvelope(ctx context.Context, blockRoot [32]byte) (*ethpb.SignedBlindedExecutionPayloadEnvelope, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.ExecutionPayloadEnvelope")
defer span.End()
var enc []byte
if err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(executionPayloadEnvelopesBucket)
enc = bkt.Get(blockRoot[:])
return nil
}); err != nil {
return nil, err
}
if enc == nil {
return nil, errors.Wrap(ErrNotFound, "execution payload envelope not found")
}
return decodeBlindedEnvelope(enc)
}
// HasExecutionPayloadEnvelope checks whether an execution payload envelope exists for the given beacon block root.
func (s *Store) HasExecutionPayloadEnvelope(ctx context.Context, blockRoot [32]byte) bool {
_, span := trace.StartSpan(ctx, "BeaconDB.HasExecutionPayloadEnvelope")
defer span.End()
var exists bool
if err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(executionPayloadEnvelopesBucket)
exists = bkt.Get(blockRoot[:]) != nil
return nil
}); err != nil {
return false
}
return exists
}
// DeleteExecutionPayloadEnvelope removes a signed execution payload envelope by beacon block root.
func (s *Store) DeleteExecutionPayloadEnvelope(ctx context.Context, blockRoot [32]byte) error {
_, span := trace.StartSpan(ctx, "BeaconDB.DeleteExecutionPayloadEnvelope")
defer span.End()
return s.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(executionPayloadEnvelopesBucket)
return bkt.Delete(blockRoot[:])
})
}
// blindEnvelope converts a full signed envelope to its blinded form by replacing
// the execution payload with its block hash. This avoids computing the expensive
// payload hash tree root on the critical path.
func blindEnvelope(env *ethpb.SignedExecutionPayloadEnvelope) *ethpb.SignedBlindedExecutionPayloadEnvelope {
return &ethpb.SignedBlindedExecutionPayloadEnvelope{
Message: &ethpb.BlindedExecutionPayloadEnvelope{
BlockHash: env.Message.Payload.BlockHash,
ExecutionRequests: env.Message.ExecutionRequests,
BuilderIndex: env.Message.BuilderIndex,
BeaconBlockRoot: env.Message.BeaconBlockRoot,
Slot: env.Message.Slot,
StateRoot: env.Message.StateRoot,
},
Signature: env.Signature,
}
}
// encodeBlindedEnvelope SSZ-encodes and snappy-compresses a blinded envelope for storage.
func encodeBlindedEnvelope(env *ethpb.SignedBlindedExecutionPayloadEnvelope) ([]byte, error) {
sszBytes, err := env.MarshalSSZ()
if err != nil {
return nil, errors.Wrap(err, "could not marshal blinded envelope")
}
return snappy.Encode(nil, sszBytes), nil
}
// decodeBlindedEnvelope snappy-decompresses and SSZ-decodes a blinded envelope from storage.
func decodeBlindedEnvelope(enc []byte) (*ethpb.SignedBlindedExecutionPayloadEnvelope, error) {
dec, err := snappy.Decode(nil, enc)
if err != nil {
return nil, errors.Wrap(err, "could not snappy decode envelope")
}
blinded := &ethpb.SignedBlindedExecutionPayloadEnvelope{}
if err := blinded.UnmarshalSSZ(dec); err != nil {
return nil, errors.Wrap(err, "could not unmarshal blinded envelope")
}
return blinded, nil
}

View File

@@ -0,0 +1,124 @@
package kv
import (
"context"
"testing"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
enginev1 "github.com/OffchainLabs/prysm/v7/proto/engine/v1"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
)
func testEnvelope(t *testing.T) *ethpb.SignedExecutionPayloadEnvelope {
t.Helper()
return &ethpb.SignedExecutionPayloadEnvelope{
Message: &ethpb.ExecutionPayloadEnvelope{
Payload: &enginev1.ExecutionPayloadDeneb{
ParentHash: bytesutil.PadTo([]byte("parent"), 32),
FeeRecipient: bytesutil.PadTo([]byte("fee"), 20),
StateRoot: bytesutil.PadTo([]byte("stateroot"), 32),
ReceiptsRoot: bytesutil.PadTo([]byte("receipts"), 32),
LogsBloom: bytesutil.PadTo([]byte{}, 256),
PrevRandao: bytesutil.PadTo([]byte("randao"), 32),
BlockNumber: 100,
GasLimit: 30000000,
GasUsed: 21000,
Timestamp: 1000,
ExtraData: []byte("extra"),
BaseFeePerGas: bytesutil.PadTo([]byte{1}, 32),
BlockHash: bytesutil.PadTo([]byte("blockhash"), 32),
Transactions: [][]byte{[]byte("tx1"), []byte("tx2")},
Withdrawals: []*enginev1.Withdrawal{{Index: 1, ValidatorIndex: 2, Address: bytesutil.PadTo([]byte("addr"), 20), Amount: 100}},
BlobGasUsed: 131072,
ExcessBlobGas: 0,
},
ExecutionRequests: &enginev1.ExecutionRequests{},
BuilderIndex: primitives.BuilderIndex(42),
BeaconBlockRoot: bytesutil.PadTo([]byte("beaconroot"), 32),
Slot: primitives.Slot(99),
StateRoot: bytesutil.PadTo([]byte("envelopestateroot"), 32),
},
Signature: bytesutil.PadTo([]byte("sig"), 96),
}
}
func TestStore_SaveAndRetrieveExecutionPayloadEnvelope(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
env := testEnvelope(t)
// Keyed by beacon block root.
blockRoot := bytesutil.ToBytes32(env.Message.BeaconBlockRoot)
// Initially should not exist.
assert.Equal(t, false, db.HasExecutionPayloadEnvelope(ctx, blockRoot))
// Save (always blinds internally).
require.NoError(t, db.SaveExecutionPayloadEnvelope(ctx, env))
// Should exist now.
assert.Equal(t, true, db.HasExecutionPayloadEnvelope(ctx, blockRoot))
// Load and verify it's blinded.
loaded, err := db.ExecutionPayloadEnvelope(ctx, blockRoot)
require.NoError(t, err)
// Verify metadata is preserved.
assert.Equal(t, env.Message.Slot, loaded.Message.Slot)
assert.Equal(t, env.Message.BuilderIndex, loaded.Message.BuilderIndex)
assert.DeepEqual(t, env.Message.BeaconBlockRoot, loaded.Message.BeaconBlockRoot)
assert.DeepEqual(t, env.Message.StateRoot, loaded.Message.StateRoot)
assert.DeepEqual(t, env.Signature, loaded.Signature)
// BlockHash should be the payload's block hash (not a hash tree root).
assert.DeepEqual(t, env.Message.Payload.BlockHash, loaded.Message.BlockHash)
}
func TestStore_DeleteExecutionPayloadEnvelope(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
env := testEnvelope(t)
blockRoot := bytesutil.ToBytes32(env.Message.BeaconBlockRoot)
require.NoError(t, db.SaveExecutionPayloadEnvelope(ctx, env))
assert.Equal(t, true, db.HasExecutionPayloadEnvelope(ctx, blockRoot))
require.NoError(t, db.DeleteExecutionPayloadEnvelope(ctx, blockRoot))
assert.Equal(t, false, db.HasExecutionPayloadEnvelope(ctx, blockRoot))
}
func TestStore_ExecutionPayloadEnvelope_NotFound(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
nonExistent := bytesutil.ToBytes32([]byte("nonexistent"))
_, err := db.ExecutionPayloadEnvelope(ctx, nonExistent)
require.ErrorContains(t, "not found", err)
}
func TestStore_SaveExecutionPayloadEnvelope_NilRejected(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
err := db.SaveExecutionPayloadEnvelope(ctx, nil)
require.ErrorContains(t, "nil", err)
}
func TestBlindEnvelope_PreservesBlockHash(t *testing.T) {
env := testEnvelope(t)
blinded := blindEnvelope(env)
// Should contain the block hash from the payload, not a hash tree root.
assert.DeepEqual(t, env.Message.Payload.BlockHash, blinded.Message.BlockHash)
// Metadata should be preserved.
assert.Equal(t, env.Message.BuilderIndex, blinded.Message.BuilderIndex)
assert.Equal(t, env.Message.Slot, blinded.Message.Slot)
assert.DeepEqual(t, env.Message.BeaconBlockRoot, blinded.Message.BeaconBlockRoot)
assert.DeepEqual(t, env.Message.StateRoot, blinded.Message.StateRoot)
assert.DeepEqual(t, env.Signature, blinded.Signature)
}

View File

@@ -87,3 +87,10 @@ func hasFuluBlindKey(enc []byte) bool {
}
return bytes.Equal(enc[:len(fuluBlindKey)], fuluBlindKey)
}
func hasGloasKey(enc []byte) bool {
if len(gloasKey) >= len(enc) {
return false
}
return bytes.Equal(enc[:len(gloasKey)], gloasKey)
}

View File

@@ -126,6 +126,7 @@ var Buckets = [][]byte{
feeRecipientBucket,
registrationBucket,
custodyBucket,
executionPayloadEnvelopesBucket,
}
// KVStoreOption is a functional option that modifies a kv.Store.

View File

@@ -199,7 +199,7 @@ func performValidatorStateMigration(ctx context.Context, bar *progressbar.Progre
func stateBucketKeys(stateBucket *bolt.Bucket) ([][]byte, error) {
var keys [][]byte
if err := stateBucket.ForEach(func(pubKey, v []byte) error {
keys = append(keys, pubKey)
keys = append(keys, bytes.Clone(pubKey))
return nil
}); err != nil {
return nil, err

View File

@@ -7,16 +7,17 @@ package kv
// it easy to scan for keys that have a certain shard number as a prefix and return those
// corresponding attestations.
var (
blocksBucket = []byte("blocks")
stateBucket = []byte("state")
stateSummaryBucket = []byte("state-summary")
chainMetadataBucket = []byte("chain-metadata")
checkpointBucket = []byte("check-point")
powchainBucket = []byte("powchain")
stateValidatorsBucket = []byte("state-validators")
feeRecipientBucket = []byte("fee-recipient")
registrationBucket = []byte("registration")
stateDiffBucket = []byte("state-diff")
blocksBucket = []byte("blocks")
stateBucket = []byte("state")
stateSummaryBucket = []byte("state-summary")
chainMetadataBucket = []byte("chain-metadata")
checkpointBucket = []byte("check-point")
powchainBucket = []byte("powchain")
stateValidatorsBucket = []byte("state-validators")
feeRecipientBucket = []byte("fee-recipient")
registrationBucket = []byte("registration")
stateDiffBucket = []byte("state-diff")
executionPayloadEnvelopesBucket = []byte("execution-payload-envelopes")
// Light Client Updates Bucket
lightClientUpdatesBucket = []byte("light-client-updates")
@@ -60,6 +61,8 @@ var (
electraBlindKey = []byte("blind-electra")
fuluKey = []byte("fulu")
fuluBlindKey = []byte("blind-fulu")
gloasKey = []byte("gloas")
// No gloasBlindKey needed - Gloas blocks are never blinded (no execution payload in block body).
// block root included in the beacon state used by weak subjectivity initial sync
originCheckpointBlockRootKey = []byte("origin-checkpoint-block-root")

View File

@@ -2,6 +2,7 @@ package kv
import (
"context"
"slices"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
@@ -187,20 +188,23 @@ func (s *Store) getDiff(lvl int, slot uint64) (hdiff.HdiffBytes, error) {
return bolt.ErrBucketNotFound
}
buf := append(key, stateSuffix...)
stateDiff = bucket.Get(buf)
if stateDiff == nil {
rawStateDiff := bucket.Get(buf)
if len(rawStateDiff) == 0 {
return errors.New("state diff not found")
}
stateDiff = slices.Clone(rawStateDiff)
buf = append(key, validatorSuffix...)
validatorDiff = bucket.Get(buf)
if validatorDiff == nil {
rawValidatorDiff := bucket.Get(buf)
if len(rawValidatorDiff) == 0 {
return errors.New("validator diff not found")
}
validatorDiff = slices.Clone(rawValidatorDiff)
buf = append(key, balancesSuffix...)
balancesDiff = bucket.Get(buf)
if balancesDiff == nil {
rawBalancesDiff := bucket.Get(buf)
if len(rawBalancesDiff) == 0 {
return errors.New("balances diff not found")
}
balancesDiff = slices.Clone(rawBalancesDiff)
return nil
})
@@ -224,10 +228,11 @@ func (s *Store) getFullSnapshot(slot uint64) (state.BeaconState, error) {
if bucket == nil {
return bolt.ErrBucketNotFound
}
enc = bucket.Get(key)
if enc == nil {
rawEnc := bucket.Get(key)
if rawEnc == nil {
return errors.New("state not found")
}
enc = slices.Clone(rawEnc)
return nil
})

View File

@@ -2,6 +2,7 @@ package kv
import (
"context"
"slices"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
@@ -47,7 +48,11 @@ func (s *Store) StateSummary(ctx context.Context, blockRoot [32]byte) (*ethpb.St
}
var enc []byte
if err := s.db.View(func(tx *bolt.Tx) error {
enc = tx.Bucket(stateSummaryBucket).Get(blockRoot[:])
rawEnc := tx.Bucket(stateSummaryBucket).Get(blockRoot[:])
if len(rawEnc) == 0 {
return nil
}
enc = slices.Clone(rawEnc)
return nil
}); err != nil {
return nil, err

View File

@@ -8,6 +8,7 @@ go_library(
"deposit.go",
"engine_client.go",
"errors.go",
"graffiti_info.go",
"log.go",
"log_processing.go",
"metrics.go",
@@ -73,7 +74,6 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_k8s_client_go//tools/cache:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
@@ -90,6 +90,7 @@ go_test(
"engine_client_fuzz_test.go",
"engine_client_test.go",
"execution_chain_test.go",
"graffiti_info_test.go",
"init_test.go",
"log_processing_test.go",
"mock_test.go",

View File

@@ -7,7 +7,6 @@ import (
"strings"
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/execution/types"
@@ -59,11 +58,20 @@ var (
fuluEngineEndpoints = []string{
GetPayloadMethodV5,
GetBlobsV2,
GetBlobsV3,
}
)
// ClientVersionV1 represents the response from engine_getClientVersionV1.
type ClientVersionV1 struct {
Code string `json:"code"`
Name string `json:"name"`
Version string `json:"version"`
Commit string `json:"commit"`
}
const (
// GetClientVersionMethod is the engine_getClientVersionV1 method for JSON-RPC.
GetClientVersionMethod = "engine_getClientVersionV1"
// NewPayloadMethod v1 request string for JSON-RPC.
NewPayloadMethod = "engine_newPayloadV1"
// NewPayloadMethodV2 v2 request string for JSON-RPC.
@@ -101,8 +109,6 @@ const (
GetBlobsV1 = "engine_getBlobsV1"
// GetBlobsV2 request string for JSON-RPC.
GetBlobsV2 = "engine_getBlobsV2"
// GetBlobsV3 request string for JSON-RPC.
GetBlobsV3 = "engine_getBlobsV3"
// Defines the seconds before timing out engine endpoints with non-block execution semantics.
defaultEngineTimeout = time.Second
)
@@ -126,7 +132,7 @@ type Reconstructor interface {
ctx context.Context, blindedBlocks []interfaces.ReadOnlySignedBeaconBlock,
) ([]interfaces.SignedBeaconBlock, error)
ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [fieldparams.RootLength]byte, hi func(uint64) bool) ([]blocks.VerifiedROBlob, error)
ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, []blocks.PartialDataColumn, error)
ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error)
}
// EngineCaller defines a client that can interact with an Ethereum
@@ -354,6 +360,24 @@ func (s *Service) ExchangeCapabilities(ctx context.Context) ([]string, error) {
return elSupportedEndpointsSlice, nil
}
// GetClientVersion calls engine_getClientVersionV1 to retrieve EL client information.
func (s *Service) GetClientVersion(ctx context.Context) ([]ClientVersionV1, error) {
ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.GetClientVersion")
defer span.End()
// Per spec, we send our own client info as the parameter
clVersion := ClientVersionV1{
Code: CLCode,
Name: Name,
Version: version.SemanticVersion(),
Commit: version.GetCommitPrefix(),
}
var result []ClientVersionV1
err := s.rpcClient.CallContext(ctx, &result, GetClientVersionMethod, clVersion)
return result, handleRPCError(err)
}
// GetTerminalBlockHash returns the valid terminal block hash based on total difficulty.
//
// Spec code:
@@ -557,22 +581,6 @@ func (s *Service) GetBlobsV2(ctx context.Context, versionedHashes []common.Hash)
return result, handleRPCError(err)
}
func (s *Service) GetBlobsV3(ctx context.Context, versionedHashes []common.Hash) ([]*pb.BlobAndProofV2, error) {
ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.GetBlobsV3")
defer span.End()
start := time.Now()
if !s.capabilityCache.has(GetBlobsV3) {
return nil, errors.New(fmt.Sprintf("%s is not supported", GetBlobsV3))
}
getBlobsV3RequestsTotal.Inc()
result := make([]*pb.BlobAndProofV2, len(versionedHashes))
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV3, versionedHashes)
getBlobsV3Latency.Observe(time.Since(start).Seconds())
return result, handleRPCError(err)
}
// ReconstructFullBlock takes in a blinded beacon block and reconstructs
// a beacon block with a full execution payload via the engine API.
func (s *Service) ReconstructFullBlock(
@@ -683,47 +691,40 @@ func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.
return verifiedBlobs, nil
}
func (s *Service) ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, []blocks.PartialDataColumn, error) {
func (s *Service) ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error) {
root := populator.Root()
// Fetch cells and proofs from the execution client using the KZG commitments from the sidecar.
commitments, err := populator.Commitments()
if err != nil {
return nil, nil, wrapWithBlockRoot(err, root, "commitments")
return nil, wrapWithBlockRoot(err, root, "commitments")
}
included, cellsPerBlob, proofsPerBlob, err := s.fetchCellsAndProofsFromExecution(ctx, commitments)
log.Info("Received cells and proofs from execution client", "included", included, "cells count", len(cellsPerBlob), "err", err)
cellsPerBlob, proofsPerBlob, err := s.fetchCellsAndProofsFromExecution(ctx, commitments)
if err != nil {
return nil, nil, wrapWithBlockRoot(err, root, "fetch cells and proofs from execution client")
return nil, wrapWithBlockRoot(err, root, "fetch cells and proofs from execution client")
}
partialColumns, err := peerdas.PartialColumns(included, cellsPerBlob, proofsPerBlob, populator)
haveAllBlobs := included.Count() == uint64(len(commitments))
log.Info("Constructed partial columns", "haveAllBlobs", haveAllBlobs)
if haveAllBlobs {
// Construct data column sidears from the signed block and cells and proofs.
roSidecars, err := peerdas.DataColumnSidecars(cellsPerBlob, proofsPerBlob, populator)
if err != nil {
return nil, nil, wrapWithBlockRoot(err, populator.Root(), "data column sidcars from column sidecar")
}
// Upgrade the sidecars to verified sidecars.
// We trust the execution layer we are connected to, so we can upgrade the sidecar into a verified one.
verifiedROSidecars := upgradeSidecarsToVerifiedSidecars(roSidecars)
return verifiedROSidecars, partialColumns, nil
// Return early if nothing is returned from the EL.
if len(cellsPerBlob) == 0 {
return nil, nil
}
// Construct data column sidears from the signed block and cells and proofs.
roSidecars, err := peerdas.DataColumnSidecars(cellsPerBlob, proofsPerBlob, populator)
if err != nil {
return nil, nil, wrapWithBlockRoot(err, populator.Root(), "partial columns from column sidecar")
return nil, wrapWithBlockRoot(err, populator.Root(), "data column sidcars from column sidecar")
}
return nil, partialColumns, nil
// Upgrade the sidecars to verified sidecars.
// We trust the execution layer we are connected to, so we can upgrade the sidecar into a verified one.
verifiedROSidecars := upgradeSidecarsToVerifiedSidecars(roSidecars)
return verifiedROSidecars, nil
}
// fetchCellsAndProofsFromExecution fetches cells and proofs from the execution client (using engine_getBlobsV2 execution API method)
func (s *Service) fetchCellsAndProofsFromExecution(ctx context.Context, kzgCommitments [][]byte) (bitfield.Bitlist /* included parts */, [][]kzg.Cell, [][]kzg.Proof, error) {
func (s *Service) fetchCellsAndProofsFromExecution(ctx context.Context, kzgCommitments [][]byte) ([][]kzg.Cell, [][]kzg.Proof, error) {
// Collect KZG hashes for all blobs.
versionedHashes := make([]common.Hash, 0, len(kzgCommitments))
for _, commitment := range kzgCommitments {
@@ -731,34 +732,24 @@ func (s *Service) fetchCellsAndProofsFromExecution(ctx context.Context, kzgCommi
versionedHashes = append(versionedHashes, versionedHash)
}
var blobAndProofs []*pb.BlobAndProofV2
// Fetch all blobsAndCellsProofs from the execution client.
var err error
useV3 := s.capabilityCache.has(GetBlobsV3)
if useV3 {
// v3 can return a partial response. V2 is all or nothing
blobAndProofs, err = s.GetBlobsV3(ctx, versionedHashes)
} else {
blobAndProofs, err = s.GetBlobsV2(ctx, versionedHashes)
blobAndProofV2s, err := s.GetBlobsV2(ctx, versionedHashes)
if err != nil {
return nil, nil, errors.Wrapf(err, "get blobs V2")
}
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "get blobs V2/3")
// Return early if nothing is returned from the EL.
if len(blobAndProofV2s) == 0 {
return nil, nil, nil
}
// Compute cells and proofs from the blobs and cell proofs.
included, cellsPerBlob, proofsPerBlob, err := peerdas.ComputeCellsAndProofsFromStructured(uint64(len(kzgCommitments)), blobAndProofs)
cellsPerBlob, proofsPerBlob, err := peerdas.ComputeCellsAndProofsFromStructured(blobAndProofV2s)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "compute cells and proofs")
}
if included.Count() == uint64(len(kzgCommitments)) {
getBlobsV3CompleteResponsesTotal.Inc()
} else if included.Count() > 0 {
getBlobsV3PartialResponsesTotal.Inc()
return nil, nil, errors.Wrap(err, "compute cells and proofs")
}
return included, cellsPerBlob, proofsPerBlob, nil
return cellsPerBlob, proofsPerBlob, nil
}
// upgradeSidecarsToVerifiedSidecars upgrades a list of data column sidecars into verified data column sidecars.

View File

@@ -2587,7 +2587,7 @@ func TestConstructDataColumnSidecars(t *testing.T) {
ctx := context.Background()
t.Run("GetBlobsV2 is not supported", func(t *testing.T) {
_, _, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
_, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.ErrorContains(t, "engine_getBlobsV2 is not supported", err)
})
@@ -2598,7 +2598,7 @@ func TestConstructDataColumnSidecars(t *testing.T) {
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
dataColumns, _, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
dataColumns, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
require.Equal(t, 0, len(dataColumns))
})
@@ -2611,7 +2611,7 @@ func TestConstructDataColumnSidecars(t *testing.T) {
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
dataColumns, _, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
dataColumns, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
require.Equal(t, 128, len(dataColumns))
})

View File

@@ -0,0 +1,134 @@
package execution
import (
"strings"
"sync"
"github.com/OffchainLabs/prysm/v7/runtime/version"
)
const (
// CLCode is the two-letter client code for Prysm.
CLCode = "PR"
Name = "Prysm"
)
// GraffitiInfo holds version information for generating block graffiti.
// It is thread-safe and can be updated by the execution service and read by the validator server.
type GraffitiInfo struct {
mu sync.RWMutex
elCode string // From engine_getClientVersionV1
elCommit string // From engine_getClientVersionV1
logOnce sync.Once
}
// NewGraffitiInfo creates a new GraffitiInfo.
func NewGraffitiInfo() *GraffitiInfo {
return &GraffitiInfo{}
}
// UpdateFromEngine updates the EL client information.
func (g *GraffitiInfo) UpdateFromEngine(code, commit string) {
g.mu.Lock()
defer g.mu.Unlock()
g.elCode = code
g.elCommit = strings.TrimPrefix(commit, "0x")
}
// GenerateGraffiti generates graffiti using the flexible standard
// with the provided user graffiti from the validator client request.
// It places user graffiti first, then appends as much client info as space allows.
//
// A space separator is added between user graffiti and client info when it
// fits without reducing the client version tier.
//
// Available Space | Format
// ≥13 bytes | user + space + EL(2)+commit(4)+CL(2)+commit(4) e.g. "Sushi GEabcdPRe4f6"
// 12 bytes | user + EL(2)+commit(4)+CL(2)+commit(4) e.g. "12345678901234567890GEabcdPRe4f6"
// 9-11 bytes | user + space + EL(2)+commit(2)+CL(2)+commit(2) e.g. "12345678901234567890123 GEabPRe4"
// 8 bytes | user + EL(2)+commit(2)+CL(2)+commit(2) e.g. "123456789012345678901234GEabPRe4"
// 5-7 bytes | user + space + EL(2)+CL(2) e.g. "123456789012345678901234567 GEPR"
// 4 bytes | user + EL(2)+CL(2) e.g. "1234567890123456789012345678GEPR"
// 3 bytes | user + space + code(2) e.g. "12345678901234567890123456789 GE"
// 2 bytes | user + code(2) e.g. "123456789012345678901234567890GE"
// <2 bytes | user only e.g. "1234567890123456789012345678901x"
func (g *GraffitiInfo) GenerateGraffiti(userGraffiti []byte) [32]byte {
g.mu.RLock()
defer g.mu.RUnlock()
var result [32]byte
userStr := string(userGraffiti)
// Trim trailing null bytes
for len(userStr) > 0 && userStr[len(userStr)-1] == 0 {
userStr = userStr[:len(userStr)-1]
}
available := 32 - len(userStr)
clCommit := version.GetCommitPrefix()
clCommit4 := truncateCommit(clCommit, 4)
clCommit2 := truncateCommit(clCommit, 2)
// If no EL info, clear EL commits but still include CL info
var elCommit4, elCommit2 string
if g.elCode != "" {
elCommit4 = truncateCommit(g.elCommit, 4)
elCommit2 = truncateCommit(g.elCommit, 2)
}
// Add a space separator between user graffiti and client info,
// but only if it won't reduce the space available for client version info.
space := func(minForTier int) string {
if len(userStr) > 0 && available >= minForTier+1 {
return " "
}
return ""
}
var graffiti string
switch {
case available >= 12:
// Full: user+EL(2)+commit(4)+CL(2)+commit(4)
graffiti = userStr + space(12) + g.elCode + elCommit4 + CLCode + clCommit4
case available >= 8:
// Reduced commits: user+EL(2)+commit(2)+CL(2)+commit(2)
graffiti = userStr + space(8) + g.elCode + elCommit2 + CLCode + clCommit2
case available >= 4:
// Codes only: user+EL(2)+CL(2)
graffiti = userStr + space(4) + g.elCode + CLCode
case available >= 2:
// Single code: user+code(2)
if g.elCode != "" {
graffiti = userStr + space(2) + g.elCode
} else {
graffiti = userStr + space(2) + CLCode
}
default:
// User graffiti only
graffiti = userStr
}
g.logOnce.Do(func() {
logGraffitiInfo(graffiti, available)
})
copy(result[:], graffiti)
return result
}
// logGraffitiInfo logs the graffiti that will be used.
func logGraffitiInfo(graffiti string, available int) {
if available >= 2 {
log.WithField("graffiti", graffiti).Info("Graffiti includes client version info appended after user graffiti")
return
}
log.WithField("graffiti", graffiti).Info("Prysm adds consensus and execution debugging information to the end of the graffiti field when possible. To prevent deletion of debugging info, please consider using a shorter graffiti")
}
// truncateCommit returns the first n characters of the commit string.
func truncateCommit(commit string, n int) string {
if len(commit) <= n {
return commit
}
return commit[:n]
}

View File

@@ -0,0 +1,250 @@
package execution
import (
"testing"
"github.com/OffchainLabs/prysm/v7/testing/require"
)
func TestGraffitiInfo_GenerateGraffiti(t *testing.T) {
tests := []struct {
name string
elCode string
elCommit string
userGraffiti []byte
wantPrefix string // user graffiti appears first
wantSuffix string // client version info appended after
}{
// No EL info cases (CL info "PR" + commit still included when space allows)
{
name: "No EL - empty user graffiti",
elCode: "",
elCommit: "",
userGraffiti: []byte{},
wantPrefix: "PR", // Only CL code + commit (no user graffiti to prefix)
},
{
name: "No EL - short user graffiti",
elCode: "",
elCommit: "",
userGraffiti: []byte("my validator"),
wantPrefix: "my validator",
wantSuffix: " PR", // space + CL code appended
},
{
name: "No EL - 28 char user graffiti (4 bytes available)",
elCode: "",
elCommit: "",
userGraffiti: []byte("1234567890123456789012345678"), // 28 chars, 4 bytes available = codes only
wantPrefix: "1234567890123456789012345678",
wantSuffix: "PR", // CL code (no EL, so just PR)
},
{
name: "No EL - 30 char user graffiti (2 bytes available)",
elCode: "",
elCommit: "",
userGraffiti: []byte("123456789012345678901234567890"), // 30 chars, 2 bytes available = fits PR
wantPrefix: "123456789012345678901234567890",
wantSuffix: "PR",
},
{
name: "No EL - 31 char user graffiti (1 byte available)",
elCode: "",
elCommit: "",
userGraffiti: []byte("1234567890123456789012345678901"), // 31 chars, 1 byte available = not enough for code
wantPrefix: "1234567890123456789012345678901", // User only
},
{
name: "No EL - 32 char user graffiti (0 bytes available)",
elCode: "",
elCommit: "",
userGraffiti: []byte("12345678901234567890123456789012"),
wantPrefix: "12345678901234567890123456789012", // User only
},
// With EL info - flexible standard format cases
{
name: "With EL - full format (empty user graffiti)",
elCode: "GE",
elCommit: "abcd1234",
userGraffiti: []byte{},
wantPrefix: "GEabcdPR", // No user graffiti, starts with client info
},
{
name: "With EL - full format (short user graffiti)",
elCode: "GE",
elCommit: "abcd1234",
userGraffiti: []byte("Bob"),
wantPrefix: "Bob",
wantSuffix: " GEabcdPR", // space + EL(2)+commit(4)+CL(2)+commit(4)
},
{
name: "With EL - full format (20 char user, 12 bytes available) - no space, would reduce tier",
elCode: "GE",
elCommit: "abcd1234",
userGraffiti: []byte("12345678901234567890"), // 20 chars, leaves exactly 12 bytes = full format, no room for space
wantPrefix: "12345678901234567890",
wantSuffix: "GEabcdPR",
},
{
name: "With EL - full format (19 char user, 13 bytes available) - space fits",
elCode: "GE",
elCommit: "abcd1234",
userGraffiti: []byte("1234567890123456789"), // 19 chars, leaves 13 bytes = full format + space
wantPrefix: "1234567890123456789",
wantSuffix: " GEabcdPR",
},
{
name: "With EL - reduced commits (24 char user, 8 bytes available) - no space, would reduce tier",
elCode: "GE",
elCommit: "abcd1234",
userGraffiti: []byte("123456789012345678901234"), // 24 chars, leaves exactly 8 bytes = reduced format, no room for space
wantPrefix: "123456789012345678901234",
wantSuffix: "GEabPR",
},
{
name: "With EL - reduced commits (23 char user, 9 bytes available) - space fits",
elCode: "GE",
elCommit: "abcd1234",
userGraffiti: []byte("12345678901234567890123"), // 23 chars, leaves 9 bytes = reduced format + space
wantPrefix: "12345678901234567890123",
wantSuffix: " GEabPR",
},
{
name: "With EL - codes only (28 char user, 4 bytes available) - no space, would reduce tier",
elCode: "GE",
elCommit: "abcd1234",
userGraffiti: []byte("1234567890123456789012345678"), // 28 chars, leaves exactly 4 bytes = codes only, no room for space
wantPrefix: "1234567890123456789012345678",
wantSuffix: "GEPR",
},
{
name: "With EL - codes only (27 char user, 5 bytes available) - space fits",
elCode: "GE",
elCommit: "abcd1234",
userGraffiti: []byte("123456789012345678901234567"), // 27 chars, leaves 5 bytes = codes only + space
wantPrefix: "123456789012345678901234567",
wantSuffix: " GEPR",
},
{
name: "With EL - EL code only (30 char user, 2 bytes available) - no space, would reduce tier",
elCode: "GE",
elCommit: "abcd1234",
userGraffiti: []byte("123456789012345678901234567890"), // 30 chars, leaves exactly 2 bytes = EL code only, no room for space
wantPrefix: "123456789012345678901234567890",
wantSuffix: "GE",
},
{
name: "With EL - EL code only (29 char user, 3 bytes available) - space fits",
elCode: "GE",
elCommit: "abcd1234",
userGraffiti: []byte("12345678901234567890123456789"), // 29 chars, leaves 3 bytes = EL code + space
wantPrefix: "12345678901234567890123456789",
wantSuffix: " GE",
},
{
name: "With EL - user only (31 char user, 1 byte available)",
elCode: "GE",
elCommit: "abcd1234",
userGraffiti: []byte("1234567890123456789012345678901"), // 31 chars, leaves 1 byte = not enough for code
wantPrefix: "1234567890123456789012345678901", // User only
},
{
name: "With EL - user only (32 char user, 0 bytes available)",
elCode: "GE",
elCommit: "abcd1234",
userGraffiti: []byte("12345678901234567890123456789012"),
wantPrefix: "12345678901234567890123456789012",
},
// Null byte handling
{
name: "Null bytes - input with trailing nulls",
elCode: "GE",
elCommit: "abcd1234",
userGraffiti: append([]byte("test"), 0, 0, 0),
wantPrefix: "test",
wantSuffix: " GEabcdPR",
},
// 0x prefix handling - some ELs return 0x-prefixed commits
{
name: "0x prefix - stripped from EL commit",
elCode: "GE",
elCommit: "0xabcd1234",
userGraffiti: []byte{},
wantPrefix: "GEabcdPR",
},
{
name: "No 0x prefix - commit used as-is",
elCode: "NM",
elCommit: "abcd1234",
userGraffiti: []byte{},
wantPrefix: "NMabcdPR",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewGraffitiInfo()
if tt.elCode != "" {
g.UpdateFromEngine(tt.elCode, tt.elCommit)
}
result := g.GenerateGraffiti(tt.userGraffiti)
resultStr := string(result[:])
trimmed := trimNullBytes(resultStr)
// Check prefix (user graffiti comes first)
require.Equal(t, true, len(trimmed) >= len(tt.wantPrefix), "Result too short for prefix check")
require.Equal(t, tt.wantPrefix, trimmed[:len(tt.wantPrefix)], "Prefix mismatch")
// Check suffix if specified (client version info appended)
if tt.wantSuffix != "" {
require.Equal(t, true, len(trimmed) >= len(tt.wantSuffix), "Result too short for suffix check")
// The suffix should appear somewhere after the prefix
afterPrefix := trimmed[len(tt.wantPrefix):]
require.Equal(t, true, len(afterPrefix) >= len(tt.wantSuffix), "Not enough room for suffix after prefix")
require.Equal(t, tt.wantSuffix, afterPrefix[:len(tt.wantSuffix)], "Suffix mismatch")
}
})
}
}
func TestGraffitiInfo_UpdateFromEngine(t *testing.T) {
g := NewGraffitiInfo()
// Initially no EL info - should still have CL info (PR + commit)
result := g.GenerateGraffiti([]byte{})
resultStr := trimNullBytes(string(result[:]))
require.Equal(t, "PR", resultStr[:2], "Expected CL info before update")
// Update with EL info
g.UpdateFromEngine("GE", "1234abcd")
result = g.GenerateGraffiti([]byte{})
resultStr = trimNullBytes(string(result[:]))
require.Equal(t, "GE1234PR", resultStr[:8], "Expected EL+CL info after update")
}
func TestTruncateCommit(t *testing.T) {
tests := []struct {
commit string
n int
want string
}{
{"abcd1234", 4, "abcd"},
{"ab", 4, "ab"},
{"", 4, ""},
{"abcdef", 2, "ab"},
}
for _, tt := range tests {
got := truncateCommit(tt.commit, tt.n)
require.Equal(t, tt.want, got)
}
}
func trimNullBytes(s string) string {
for len(s) > 0 && s[len(s)-1] == 0 {
s = s[:len(s)-1]
}
return s
}

View File

@@ -34,25 +34,6 @@ var (
Buckets: []float64{25, 50, 100, 200, 500, 1000, 2000, 4000},
},
)
getBlobsV3RequestsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "beacon_engine_getBlobsV3_requests_total",
Help: "Total number of engine_getBlobsV3 requests sent",
})
getBlobsV3CompleteResponsesTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "beacon_engine_getBlobsV3_complete_responses_total",
Help: "Total number of complete engine_getBlobsV3 successful responses received",
})
getBlobsV3PartialResponsesTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "beacon_engine_getBlobsV3_partial_responses_total",
Help: "Total number of engine_getBlobsV3 partial responses received",
})
getBlobsV3Latency = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "beacon_engine_getBlobsV3_request_duration_seconds",
Help: "Duration of engine_getBlobsV3 requests in seconds",
Buckets: []float64{0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 4},
},
)
errParseCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "execution_parse_error_count",
Help: "The number of errors that occurred while parsing execution payload",

View File

@@ -124,3 +124,11 @@ func WithVerifierWaiter(v *verification.InitializerWaiter) Option {
return nil
}
}
// WithGraffitiInfo sets the GraffitiInfo for client version tracking.
func WithGraffitiInfo(g *GraffitiInfo) Option {
return func(s *Service) error {
s.graffitiInfo = g
return nil
}
}

View File

@@ -162,6 +162,7 @@ type Service struct {
verifierWaiter *verification.InitializerWaiter
blobVerifier verification.NewBlobVerifier
capabilityCache *capabilityCache
graffitiInfo *GraffitiInfo
}
// NewService sets up a new instance with an ethclient when given a web3 endpoint as a string in the config.
@@ -318,6 +319,28 @@ func (s *Service) updateConnectedETH1(state bool) {
s.updateBeaconNodeStats()
}
// GraffitiInfo returns the GraffitiInfo struct for graffiti generation.
func (s *Service) GraffitiInfo() *GraffitiInfo {
return s.graffitiInfo
}
// updateGraffitiInfo fetches EL client version and updates the graffiti info.
func (s *Service) updateGraffitiInfo() {
if s.graffitiInfo == nil {
return
}
ctx, cancel := context.WithTimeout(s.ctx, time.Second)
defer cancel()
versions, err := s.GetClientVersion(ctx)
if err != nil {
log.WithError(err).Debug("Could not get execution client version for graffiti")
return
}
if len(versions) >= 1 {
s.graffitiInfo.UpdateFromEngine(versions[0].Code, versions[0].Commit)
}
}
// refers to the latest eth1 block which follows the condition: eth1_timestamp +
// SECONDS_PER_ETH1_BLOCK * ETH1_FOLLOW_DISTANCE <= current_unix_time
func (s *Service) followedBlockHeight(ctx context.Context) (uint64, error) {
@@ -598,6 +621,12 @@ func (s *Service) run(done <-chan struct{}) {
chainstartTicker := time.NewTicker(logPeriod)
defer chainstartTicker.Stop()
// Update graffiti info 4 times per epoch (~96 seconds with 12s slots and 32 slots/epoch)
graffitiTicker := time.NewTicker(96 * time.Second)
defer graffitiTicker.Stop()
// Initial update
s.updateGraffitiInfo()
for {
select {
case <-done:
@@ -622,6 +651,8 @@ func (s *Service) run(done <-chan struct{}) {
continue
}
s.logTillChainStart(context.Background())
case <-graffitiTicker.C:
s.updateGraffitiInfo()
}
}
}

View File

@@ -118,8 +118,8 @@ func (e *EngineClient) ReconstructBlobSidecars(context.Context, interfaces.ReadO
}
// ConstructDataColumnSidecars is a mock implementation of the ConstructDataColumnSidecars method.
func (e *EngineClient) ConstructDataColumnSidecars(context.Context, peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, []blocks.PartialDataColumn, error) {
return e.DataColumnSidecars, nil, e.ErrorDataColumnSidecars
func (e *EngineClient) ConstructDataColumnSidecars(context.Context, peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error) {
return e.DataColumnSidecars, e.ErrorDataColumnSidecars
}
// GetTerminalBlockHash --

View File

@@ -6,7 +6,6 @@ go_library(
"doc.go",
"errors.go",
"forkchoice.go",
"last_root.go",
"log.go",
"metrics.go",
"node.go",
@@ -51,7 +50,6 @@ go_test(
srcs = [
"ffg_update_test.go",
"forkchoice_test.go",
"last_root_test.go",
"no_vote_test.go",
"node_test.go",
"on_tick_test.go",

View File

@@ -32,7 +32,6 @@ func New() *ForkChoice {
finalizedCheckpoint: &forkchoicetypes.Checkpoint{},
proposerBoostRoot: [32]byte{},
nodeByRoot: make(map[[fieldparams.RootLength]byte]*Node),
nodeByPayload: make(map[[fieldparams.RootLength]byte]*Node),
slashedIndices: make(map[primitives.ValidatorIndex]bool),
receivedBlocksLastEpoch: [fieldparams.SlotsPerEpoch]primitives.Slot{},
}

View File

@@ -1,26 +0,0 @@
package doublylinkedtree
import (
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/time/slots"
)
// LastRoot returns the last canonical block root in the given epoch
func (f *ForkChoice) LastRoot(epoch primitives.Epoch) [32]byte {
head := f.store.headNode
headEpoch := slots.ToEpoch(head.slot)
epochEnd, err := slots.EpochEnd(epoch)
if err != nil {
return [32]byte{}
}
if headEpoch <= epoch {
return head.root
}
for head != nil && head.slot > epochEnd {
head = head.parent
}
if head == nil {
return [32]byte{}
}
return head.root
}

View File

@@ -1,38 +0,0 @@
package doublylinkedtree
import (
"testing"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/testing/require"
)
func TestLastRoot(t *testing.T) {
f := setup(0, 0)
ctx := t.Context()
st, root, err := prepareForkchoiceState(ctx, 1, [32]byte{'1'}, params.BeaconConfig().ZeroHash, [32]byte{'1'}, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, root))
st, root, err = prepareForkchoiceState(ctx, 2, [32]byte{'2'}, [32]byte{'1'}, [32]byte{'2'}, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, root))
st, root, err = prepareForkchoiceState(ctx, 3, [32]byte{'3'}, [32]byte{'1'}, [32]byte{'3'}, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, root))
st, root, err = prepareForkchoiceState(ctx, 32, [32]byte{'4'}, [32]byte{'3'}, [32]byte{'4'}, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, root))
st, root, err = prepareForkchoiceState(ctx, 33, [32]byte{'5'}, [32]byte{'2'}, [32]byte{'5'}, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, root))
st, root, err = prepareForkchoiceState(ctx, 34, [32]byte{'6'}, [32]byte{'5'}, [32]byte{'6'}, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, st, root))
headNode := f.store.nodeByRoot[[32]byte{'6'}]
f.store.headNode = headNode
require.Equal(t, [32]byte{'6'}, f.store.headNode.root)
require.Equal(t, [32]byte{'2'}, f.LastRoot(0))
require.Equal(t, [32]byte{'6'}, f.LastRoot(1))
require.Equal(t, [32]byte{'6'}, f.LastRoot(2))
}

View File

@@ -94,6 +94,5 @@ func (s *Store) removeNodeAndChildren(ctx context.Context, node *Node, invalidRo
s.previousProposerBoostScore = 0
}
delete(s.nodeByRoot, node.root)
delete(s.nodeByPayload, node.payloadHash)
return invalidRoots, nil
}

View File

@@ -113,7 +113,6 @@ func (s *Store) insert(ctx context.Context,
}
}
s.nodeByPayload[payloadHash] = n
s.nodeByRoot[root] = n
if parent == nil {
if s.treeRootNode == nil {
@@ -122,7 +121,6 @@ func (s *Store) insert(ctx context.Context,
s.highestReceivedNode = n
} else {
delete(s.nodeByRoot, root)
delete(s.nodeByPayload, payloadHash)
return nil, errInvalidParentRoot
}
} else {
@@ -191,7 +189,6 @@ func (s *Store) pruneFinalizedNodeByRootMap(ctx context.Context, node, finalized
node.children = nil
delete(s.nodeByRoot, node.root)
delete(s.nodeByPayload, node.payloadHash)
return nil
}
@@ -273,21 +270,6 @@ func (f *ForkChoice) HighestReceivedBlockSlot() primitives.Slot {
return f.store.highestReceivedNode.slot
}
// HighestReceivedBlockDelay returns the number of slots that the highest
// received block was late when receiving it. For example, a block was late by 12 slots,
// then this method is expected to return 12.
func (f *ForkChoice) HighestReceivedBlockDelay() primitives.Slot {
n := f.store.highestReceivedNode
if n == nil {
return 0
}
sss, err := slots.SinceSlotStart(n.slot, f.store.genesisTime, n.timestamp)
if err != nil {
return 0
}
return primitives.Slot(uint64(sss/time.Second) / params.BeaconConfig().SecondsPerSlot)
}
// ReceivedBlocksLastEpoch returns the number of blocks received in the last epoch
func (f *ForkChoice) ReceivedBlocksLastEpoch() (uint64, error) {
count := uint64(0)

View File

@@ -128,10 +128,9 @@ func TestStore_Insert(t *testing.T) {
// The new node does not have a parent.
treeRootNode := &Node{slot: 0, root: indexToHash(0)}
nodeByRoot := map[[32]byte]*Node{indexToHash(0): treeRootNode}
nodeByPayload := map[[32]byte]*Node{indexToHash(0): treeRootNode}
jc := &forkchoicetypes.Checkpoint{Epoch: 0}
fc := &forkchoicetypes.Checkpoint{Epoch: 0}
s := &Store{nodeByRoot: nodeByRoot, treeRootNode: treeRootNode, nodeByPayload: nodeByPayload, justifiedCheckpoint: jc, finalizedCheckpoint: fc, highestReceivedNode: &Node{}}
s := &Store{nodeByRoot: nodeByRoot, treeRootNode: treeRootNode, justifiedCheckpoint: jc, finalizedCheckpoint: fc, highestReceivedNode: &Node{}}
payloadHash := [32]byte{'a'}
ctx := t.Context()
_, blk, err := prepareForkchoiceState(ctx, 100, indexToHash(100), indexToHash(0), payloadHash, 1, 1)
@@ -238,7 +237,6 @@ func TestStore_Prune_NoDanglingBranch(t *testing.T) {
s.finalizedCheckpoint.Root = indexToHash(1)
require.NoError(t, s.prune(t.Context()))
require.Equal(t, len(s.nodeByRoot), 1)
require.Equal(t, len(s.nodeByPayload), 1)
}
// This test starts with the following branching diagram
@@ -319,8 +317,6 @@ func TestStore_PruneMapsNodes(t *testing.T) {
s.finalizedCheckpoint.Root = indexToHash(1)
require.NoError(t, s.prune(t.Context()))
require.Equal(t, len(s.nodeByRoot), 1)
require.Equal(t, len(s.nodeByPayload), 1)
}
func TestForkChoice_ReceivedBlocksLastEpoch(t *testing.T) {
@@ -339,7 +335,6 @@ func TestForkChoice_ReceivedBlocksLastEpoch(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(1), count)
require.Equal(t, primitives.Slot(1), f.HighestReceivedBlockSlot())
require.Equal(t, primitives.Slot(0), f.HighestReceivedBlockDelay())
// 64
// Received block last epoch is 1
@@ -352,7 +347,6 @@ func TestForkChoice_ReceivedBlocksLastEpoch(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(1), count)
require.Equal(t, primitives.Slot(64), f.HighestReceivedBlockSlot())
require.Equal(t, primitives.Slot(0), f.HighestReceivedBlockDelay())
// 64 65
// Received block last epoch is 2
@@ -365,7 +359,6 @@ func TestForkChoice_ReceivedBlocksLastEpoch(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(2), count)
require.Equal(t, primitives.Slot(65), f.HighestReceivedBlockSlot())
require.Equal(t, primitives.Slot(1), f.HighestReceivedBlockDelay())
// 64 65 66
// Received block last epoch is 3
@@ -717,17 +710,3 @@ func TestStore_CleanupInserting(t *testing.T) {
require.NotNil(t, f.InsertNode(ctx, st, blk))
require.Equal(t, false, f.HasNode(blk.Root()))
}
func TestStore_HighestReceivedBlockDelay(t *testing.T) {
f := ForkChoice{
store: &Store{
genesisTime: time.Unix(0, 0),
highestReceivedNode: &Node{
slot: 10,
timestamp: time.Unix(int64(((10 + 12) * params.BeaconConfig().SecondsPerSlot)), 0), // 12 slots late
},
},
}
require.Equal(t, primitives.Slot(12), f.HighestReceivedBlockDelay())
}

View File

@@ -36,7 +36,6 @@ type Store struct {
treeRootNode *Node // the root node of the store tree.
headNode *Node // last head Node
nodeByRoot map[[fieldparams.RootLength]byte]*Node // nodes indexed by roots.
nodeByPayload map[[fieldparams.RootLength]byte]*Node // nodes indexed by payload Hash
slashedIndices map[primitives.ValidatorIndex]bool // the list of equivocating validator indices
originRoot [fieldparams.RootLength]byte // The genesis block root
genesisTime time.Time

View File

@@ -67,13 +67,11 @@ type FastGetter interface {
HasNode([32]byte) bool
HighestReceivedBlockSlot() primitives.Slot
HighestReceivedBlockRoot() [32]byte
HighestReceivedBlockDelay() primitives.Slot
IsCanonical(root [32]byte) bool
IsOptimistic(root [32]byte) (bool, error)
IsViableForCheckpoint(*forkchoicetypes.Checkpoint) (bool, error)
JustifiedCheckpoint() *forkchoicetypes.Checkpoint
JustifiedPayloadBlockHash() [32]byte
LastRoot(primitives.Epoch) [32]byte
NodeCount() int
PreviousJustifiedCheckpoint() *forkchoicetypes.Checkpoint
ProposerBoost() [fieldparams.RootLength]byte

View File

@@ -121,13 +121,6 @@ func (ro *ROForkChoice) HighestReceivedBlockRoot() [32]byte {
return ro.getter.HighestReceivedBlockRoot()
}
// HighestReceivedBlockDelay delegates to the underlying forkchoice call, under a lock.
func (ro *ROForkChoice) HighestReceivedBlockDelay() primitives.Slot {
ro.l.RLock()
defer ro.l.RUnlock()
return ro.getter.HighestReceivedBlockDelay()
}
// ReceivedBlocksLastEpoch delegates to the underlying forkchoice call, under a lock.
func (ro *ROForkChoice) ReceivedBlocksLastEpoch() (uint64, error) {
ro.l.RLock()
@@ -163,13 +156,6 @@ func (ro *ROForkChoice) Slot(root [32]byte) (primitives.Slot, error) {
return ro.getter.Slot(root)
}
// LastRoot delegates to the underlying forkchoice call, under a lock.
func (ro *ROForkChoice) LastRoot(e primitives.Epoch) [32]byte {
ro.l.RLock()
defer ro.l.RUnlock()
return ro.getter.LastRoot(e)
}
// DependentRoot delegates to the underlying forkchoice call, under a lock.
func (ro *ROForkChoice) DependentRoot(epoch primitives.Epoch) ([32]byte, error) {
ro.l.RLock()

View File

@@ -30,7 +30,6 @@ const (
nodeCountCalled
highestReceivedBlockSlotCalled
highestReceivedBlockRootCalled
highestReceivedBlockDelayCalled
receivedBlocksLastEpochCalled
weightCalled
isOptimisticCalled
@@ -118,11 +117,6 @@ func TestROLocking(t *testing.T) {
call: highestReceivedBlockSlotCalled,
cb: func(g FastGetter) { g.HighestReceivedBlockSlot() },
},
{
name: "highestReceivedBlockDelayCalled",
call: highestReceivedBlockDelayCalled,
cb: func(g FastGetter) { g.HighestReceivedBlockDelay() },
},
{
name: "receivedBlocksLastEpochCalled",
call: receivedBlocksLastEpochCalled,
@@ -148,11 +142,6 @@ func TestROLocking(t *testing.T) {
call: slotCalled,
cb: func(g FastGetter) { _, err := g.Slot([32]byte{}); _discard(t, err) },
},
{
name: "lastRootCalled",
call: lastRootCalled,
cb: func(g FastGetter) { g.LastRoot(0) },
},
{
name: "targetRootForEpochCalled",
call: targetRootForEpochCalled,
@@ -265,11 +254,6 @@ func (ro *mockROForkchoice) HighestReceivedBlockRoot() [32]byte {
return [32]byte{}
}
func (ro *mockROForkchoice) HighestReceivedBlockDelay() primitives.Slot {
ro.calls = append(ro.calls, highestReceivedBlockDelayCalled)
return 0
}
func (ro *mockROForkchoice) ReceivedBlocksLastEpoch() (uint64, error) {
ro.calls = append(ro.calls, receivedBlocksLastEpochCalled)
return 0, nil
@@ -295,11 +279,6 @@ func (ro *mockROForkchoice) Slot(_ [32]byte) (primitives.Slot, error) {
return 0, nil
}
func (ro *mockROForkchoice) LastRoot(_ primitives.Epoch) [32]byte {
ro.calls = append(ro.calls, lastRootCalled)
return [32]byte{}
}
// DependentRoot impoements FastGetter.
func (ro *mockROForkchoice) DependentRoot(_ primitives.Epoch) ([32]byte, error) {
ro.calls = append(ro.calls, dependentRootCalled)

View File

@@ -1,95 +0,0 @@
# Graffiti Version Info Implementation
## Summary
Add automatic EL+CL version info to block graffiti following [ethereum/execution-apis#517](https://github.com/ethereum/execution-apis/pull/517). Uses the [flexible standard](https://hackmd.io/@wmoBhF17RAOH2NZ5bNXJVg/BJX2c9gja) to pack client info into leftover space after user graffiti.
More details: https://github.com/ethereum/execution-apis/blob/main/src/engine/identification.md
## Implementation
### Core Component: GraffitiInfo Struct
Thread-safe struct holding version information:
```go
const clCode = "PR"
type GraffitiInfo struct {
mu sync.RWMutex
userGraffiti string // From --graffiti flag (set once at startup)
clCommit string // From version.GetCommitPrefix() helper function
elCode string // From engine_getClientVersionV1
elCommit string // From engine_getClientVersionV1
}
```
### Flow
1. **Startup**: Parse flags, create GraffitiInfo with user graffiti and CL info.
2. **Wiring**: Pass struct to both execution service and RPC validator server
3. **Runtime**: Execution service goroutine periodically calls `engine_getClientVersionV1` and updates EL fields
4. **Block Proposal**: RPC validator server calls `GenerateGraffiti()` to get formatted graffiti
### Flexible Graffiti Format
Packs as much client info as space allows (after user graffiti):
| Available Space | Format | Example |
|----------------|--------|---------|
| ≥12 bytes | `EL(2)+commit(4)+CL(2)+commit(4)+user` | `GE168dPR63afBob` |
| 8-11 bytes | `EL(2)+commit(2)+CL(2)+commit(2)+user` | `GE16PR63my node here` |
| 4-7 bytes | `EL(2)+CL(2)+user` | `GEPRthis is my graffiti msg` |
| 2-3 bytes | `EL(2)+user` | `GEalmost full graffiti message` |
| <2 bytes | user only | `full 32 byte user graffiti here` |
```go
func (g *GraffitiInfo) GenerateGraffiti() [32]byte {
available := 32 - len(userGraffiti)
if elCode == "" {
elCommit2 = elCommit4 = ""
}
switch {
case available >= 12:
return elCode + elCommit4 + clCode + clCommit4 + userGraffiti
case available >= 8:
return elCode + elCommit2 + clCode + clCommit2 + userGraffiti
case available >= 4:
return elCode + clCode + userGraffiti
case available >= 2:
return elCode + userGraffiti
default:
return userGraffiti
}
}
```
### Update Logic
Single testable function in execution service:
```go
func (s *Service) updateGraffitiInfo() {
versions, err := s.GetClientVersion(ctx)
if err != nil {
return // Keep last good value
}
if len(versions) == 1 {
s.graffitiInfo.UpdateFromEngine(versions[0].Code, versions[0].Commit)
}
}
```
Goroutine calls this on `slot % 8 == 4` timing (4 times per epoch, avoids slot boundaries).
### Files Changes Required
**New:**
- `beacon-chain/execution/graffiti_info.go` - The struct and methods
- `beacon-chain/execution/graffiti_info_test.go` - Unit tests
- `runtime/version/version.go` - Add `GetCommitPrefix()` helper that extracts first 4 hex chars from the git commit injected via Bazel ldflags at build time
**Modified:**
- `beacon-chain/execution/service.go` - Add goroutine + updateGraffitiInfo()
- `beacon-chain/execution/engine_client.go` - Add GetClientVersion() method that does engine call
- `beacon-chain/rpc/.../validator/proposer.go` - Call GenerateGraffiti()
- `beacon-chain/node/node.go` - Wire GraffitiInfo to services
### Testing Strategy
- Unit test GraffitiInfo methods (priority logic, thread safety)
- Unit test updateGraffitiInfo() with mocked engine client

View File

@@ -678,7 +678,6 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
DB: b.db,
StateGen: b.stateGen,
ClockWaiter: b.ClockWaiter,
PartialDataColumns: b.cliCtx.Bool(flags.PartialDataColumns.Name),
})
if err != nil {
return err
@@ -786,6 +785,9 @@ func (b *BeaconNode) registerPOWChainService() error {
return err
}
// Create GraffitiInfo for client version tracking in block graffiti
graffitiInfo := execution.NewGraffitiInfo()
// skipcq: CRT-D0001
opts := append(
b.serviceFlagOpts.executionChainFlagOpts,
@@ -798,6 +800,7 @@ func (b *BeaconNode) registerPOWChainService() error {
execution.WithFinalizedStateAtStartup(b.finalizedStateAtStartUp),
execution.WithJwtId(b.cliCtx.String(flags.JwtId.Name)),
execution.WithVerifierWaiter(b.verifyInitWaiter),
execution.WithGraffitiInfo(graffitiInfo),
)
web3Service, err := execution.NewService(b.ctx, opts...)
if err != nil {
@@ -1004,6 +1007,7 @@ func (b *BeaconNode) registerRPCService(router *http.ServeMux) error {
TrackedValidatorsCache: b.trackedValidatorsCache,
PayloadIDCache: b.payloadIDCache,
LCStore: b.lcStore,
GraffitiInfo: web3Service.GraffitiInfo(),
})
return b.services.RegisterService(rpcService)

View File

@@ -52,7 +52,6 @@ go_library(
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/partialdatacolumnbroadcaster:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",

View File

@@ -343,7 +343,7 @@ func (s *Service) BroadcastLightClientFinalityUpdate(ctx context.Context, update
// there is at least one peer in each needed subnet. If not, it will attempt to find one before broadcasting.
// This function is non-blocking. It stops trying to broadcast a given sidecar when more than one slot has passed, or the context is
// cancelled (whichever comes first).
func (s *Service) BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn, partialColumns []blocks.PartialDataColumn) error {
func (s *Service) BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn) error {
// Increase the number of broadcast attempts.
dataColumnSidecarBroadcastAttempts.Add(float64(len(sidecars)))
@@ -353,15 +353,16 @@ func (s *Service) BroadcastDataColumnSidecars(ctx context.Context, sidecars []bl
return errors.Wrap(err, "current fork digest")
}
go s.broadcastDataColumnSidecars(ctx, forkDigest, sidecars, partialColumns)
go s.broadcastDataColumnSidecars(ctx, forkDigest, sidecars)
return nil
}
// broadcastDataColumnSidecars broadcasts multiple data column sidecars to the p2p network, after ensuring
// there is at least one peer in each needed subnet. If not, it will attempt to find one before broadcasting.
// It returns when all broadcasts are complete, or the context is cancelled (whichever comes first).
func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [fieldparams.VersionLength]byte, sidecars []blocks.VerifiedRODataColumn, partialColumns []blocks.PartialDataColumn) {
// broadcastDataColumnSidecars broadcasts multiple data column sidecars to the p2p network.
// For sidecars with available peers, it uses batch publishing.
// For sidecars without peers, it finds peers first and then publishes individually.
// Both paths run in parallel. It returns when all broadcasts are complete, or the context is cancelled.
func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [fieldparams.VersionLength]byte, sidecars []blocks.VerifiedRODataColumn) {
type rootAndIndex struct {
root [fieldparams.RootLength]byte
index uint64
@@ -371,8 +372,8 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
logLevel := logrus.GetLevel()
slotPerRoot := make(map[[fieldparams.RootLength]byte]primitives.Slot, 1)
topicFunc := func(dcIndex uint64) (topic string, wrappedSubIdx uint64, subnet uint64) {
subnet = peerdas.ComputeSubnetForDataColumnSidecar(dcIndex)
topicFunc := func(sidecar blocks.VerifiedRODataColumn) (topic string, wrappedSubIdx uint64, subnet uint64) {
subnet = peerdas.ComputeSubnetForDataColumnSidecar(sidecar.Index)
topic = dataColumnSubnetToTopic(subnet, forkDigest)
wrappedSubIdx = subnet + dataColumnSubnetVal
return
@@ -385,7 +386,7 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
for _, sidecar := range sidecars {
slotPerRoot[sidecar.BlockRoot()] = sidecar.Slot()
topic, wrappedSubIdx, _ := topicFunc(sidecar.Index)
topic, wrappedSubIdx, _ := topicFunc(sidecar)
// Check if we have a peer for this subnet (use RLock for read-only check).
mu := s.subnetLocker(wrappedSubIdx)
mu.RLock()
@@ -410,7 +411,7 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
ctx := trace.NewContext(s.ctx, span)
defer span.End()
topic, _, _ := topicFunc(sidecar.Index)
topic, _, _ := topicFunc(sidecar)
if err := s.batchObject(ctx, &messageBatch, sidecar, topic); err != nil {
tracing.AnnotateError(span, err)
@@ -418,10 +419,6 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
return
}
// Increase the number of successful broadcasts.
dataColumnSidecarBroadcasts.Inc()
// Record the timing for log purposes.
if logLevel >= logrus.DebugLevel {
root := sidecar.BlockRoot()
timings.Store(rootAndIndex{root: root, index: sidecar.Index}, time.Now())
@@ -436,7 +433,7 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
ctx := trace.NewContext(s.ctx, span)
defer span.End()
topic, wrappedSubIdx, subnet := topicFunc(sidecar.Index)
topic, wrappedSubIdx, subnet := topicFunc(sidecar)
// Find peers for this sidecar's subnet.
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, DataColumnSubnetTopicFormat, forkDigest, subnet); err != nil {
@@ -461,32 +458,6 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
})
}
if s.partialColumnBroadcaster != nil {
// Note: There is not batch publish for partial columns.
for _, partialColumn := range partialColumns {
individualWg.Go(func() {
_, span := trace.StartSpan(ctx, "p2p.broadcastPartialDataColumn")
ctx := trace.NewContext(s.ctx, span)
defer span.End()
topic, wrappedSubIdx, subnet := topicFunc(partialColumn.Index)
// Find peers for this sidecar's subnet.
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, DataColumnSubnetTopicFormat, forkDigest, subnet); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot find peers if needed")
return
}
fullTopicStr := topic + s.Encoding().ProtocolSuffix()
if err := s.partialColumnBroadcaster.Publish(fullTopicStr, partialColumn); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot partial broadcast data column sidecar")
}
})
}
}
// Wait for batch to be populated, then publish.
batchWg.Wait()
if len(sidecarsWithPeers) > 0 {

View File

@@ -803,7 +803,7 @@ func TestService_BroadcastDataColumn(t *testing.T) {
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Broadcast to peers and wait.
err = service.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{verifiedRoSidecar}, nil)
err = service.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{verifiedRoSidecar})
require.NoError(t, err)
// Receive the message.
@@ -867,7 +867,7 @@ func (*rpcOrderTracer) DeliverMessage(*pubsub.Message) {}
func (*rpcOrderTracer) RejectMessage(*pubsub.Message, string) {}
func (*rpcOrderTracer) DuplicateMessage(*pubsub.Message) {}
func (*rpcOrderTracer) ThrottlePeer(peer.ID) {}
func (*rpcOrderTracer) RecvRPC(*pubsub.RPC, peer.ID) {}
func (*rpcOrderTracer) RecvRPC(*pubsub.RPC) {}
func (*rpcOrderTracer) DropRPC(*pubsub.RPC, peer.ID) {}
func (*rpcOrderTracer) UndeliverableMessage(*pubsub.Message) {}
@@ -969,7 +969,7 @@ func TestService_BroadcastDataColumnRoundRobin(t *testing.T) {
time.Sleep(100 * time.Millisecond)
// Broadcast all sidecars.
err = service.BroadcastDataColumnSidecars(ctx, verifiedRoSidecars, nil)
err = service.BroadcastDataColumnSidecars(ctx, verifiedRoSidecars)
require.NoError(t, err)
// Give some time for messages to be sent.
time.Sleep(100 * time.Millisecond)

View File

@@ -26,7 +26,6 @@ const (
// Config for the p2p service. These parameters are set from application level flags
// to initialize the p2p service.
type Config struct {
PartialDataColumns bool
NoDiscovery bool
EnableUPnP bool
StaticPeerID bool

View File

@@ -25,6 +25,8 @@ var gossipTopicMappings = map[string]func() proto.Message{
LightClientOptimisticUpdateTopicFormat: func() proto.Message { return &ethpb.LightClientOptimisticUpdateAltair{} },
LightClientFinalityUpdateTopicFormat: func() proto.Message { return &ethpb.LightClientFinalityUpdateAltair{} },
DataColumnSubnetTopicFormat: func() proto.Message { return &ethpb.DataColumnSidecar{} },
PayloadAttestationMessageTopicFormat: func() proto.Message { return &ethpb.PayloadAttestationMessage{} },
ExecutionPayloadEnvelopeTopicFormat: func() proto.Message { return &ethpb.SignedExecutionPayloadEnvelope{} },
}
// GossipTopicMappings is a function to return the assigned data type
@@ -144,4 +146,7 @@ func init() {
// Specially handle Fulu objects.
GossipTypeMapping[reflect.TypeFor[*ethpb.SignedBeaconBlockFulu]()] = BlockSubnetTopicFormat
// Payload attestation messages.
GossipTypeMapping[reflect.TypeFor[*ethpb.PayloadAttestationMessage]()] = PayloadAttestationMessageTopicFormat
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
@@ -29,7 +28,6 @@ type (
Broadcaster
SetStreamHandler
PubSubProvider
PartialColumnBroadcasterProvider
PubSubTopicUser
SenderEncoder
PeerManager
@@ -54,7 +52,7 @@ type (
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
BroadcastLightClientOptimisticUpdate(ctx context.Context, update interfaces.LightClientOptimisticUpdate) error
BroadcastLightClientFinalityUpdate(ctx context.Context, update interfaces.LightClientFinalityUpdate) error
BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn, partialColumns []blocks.PartialDataColumn) error
BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn) error
}
// SetStreamHandler configures p2p to handle streams of a certain topic ID.
@@ -94,11 +92,6 @@ type (
PubSub() *pubsub.PubSub
}
// PubSubProvider provides the p2p pubsub protocol.
PartialColumnBroadcasterProvider interface {
PartialColumnBroadcaster() *partialdatacolumnbroadcaster.PartialColumnBroadcaster
}
// PeerManager abstracts some peer management methods from libp2p.
PeerManager interface {
Disconnect(peer.ID) error

View File

@@ -157,11 +157,6 @@ var (
Help: "The number of publish messages received via rpc for a particular topic",
},
[]string{"topic"})
pubsubRPCPubRecvSize = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_recv_pub_size_total",
Help: "The total size of publish messages received via rpc for a particular topic",
},
[]string{"topic", "is_partial"})
pubsubRPCDrop = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_drop_total",
Help: "The number of messages dropped via rpc for a particular control message",
@@ -176,11 +171,6 @@ var (
Help: "The number of publish messages dropped via rpc for a particular topic",
},
[]string{"topic"})
pubsubRPCPubDropSize = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_drop_pub_size_total",
Help: "The total size of publish messages dropped via rpc for a particular topic",
},
[]string{"topic", "is_partial"})
pubsubRPCSent = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_sent_total",
Help: "The number of messages sent via rpc for a particular control message",
@@ -195,16 +185,6 @@ var (
Help: "The number of publish messages sent via rpc for a particular topic",
},
[]string{"topic"})
pubsubRPCPubSentSize = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gossipsub_pubsub_rpc_sent_pub_size_total",
Help: "The total size of publish messages sent via rpc for a particular topic",
},
[]string{"topic", "is_partial"})
pubsubMeshPeers = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "gossipsub_mesh_peers",
Help: "The number of capable peers in mesh",
},
[]string{"topic", "supports_partial"})
)
func (s *Service) updateMetrics() {

View File

@@ -1,27 +0,0 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"log.go",
"metrics.go",
"partial.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster",
visibility = ["//visibility:public"],
deps = [
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//internal/logrusadapter:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages/bitmap:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -1,25 +0,0 @@
load("@prysm//tools/go:def.bzl", "go_test")
go_test(
name = "go_default_test",
size = "medium",
srcs = ["two_node_test.go"],
deps = [
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/partialdatacolumnbroadcaster:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p//x/simlibp2p:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_marcopolo_simnet//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -1,239 +0,0 @@
package integrationtest
import (
"context"
"crypto/rand"
"fmt"
"testing"
"testing/synctest"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
"github.com/OffchainLabs/prysm/v7/testing/util"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
simlibp2p "github.com/libp2p/go-libp2p/x/simlibp2p"
"github.com/marcopolo/simnet"
"github.com/sirupsen/logrus"
)
// TestTwoNodePartialColumnExchange tests that two nodes can exchange partial columns
// and reconstruct the complete column. Node 1 has cells 0-2, Node 2 has cells 3-5.
// After exchange, both should have all cells.
func TestTwoNodePartialColumnExchange(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
// Create a simulated libp2p network
latency := time.Millisecond * 10
network, meta, err := simlibp2p.SimpleLibp2pNetwork([]simlibp2p.NodeLinkSettingsAndCount{
{LinkSettings: simnet.NodeBiDiLinkSettings{
Downlink: simnet.LinkSettings{BitsPerSecond: 20 * simlibp2p.OneMbps, Latency: latency / 2},
Uplink: simnet.LinkSettings{BitsPerSecond: 20 * simlibp2p.OneMbps, Latency: latency / 2},
}, Count: 2},
}, simlibp2p.NetworkSettings{UseBlankHost: true})
require.NoError(t, err)
require.NoError(t, network.Start())
defer func() {
require.NoError(t, network.Close())
}()
defer func() {
for _, node := range meta.Nodes {
err := node.Close()
if err != nil {
panic(err)
}
}
}()
h1 := meta.Nodes[0]
h2 := meta.Nodes[1]
logger := logrus.New()
logger.SetLevel(logrus.DebugLevel)
broadcaster1 := partialdatacolumnbroadcaster.NewBroadcaster(logger)
broadcaster2 := partialdatacolumnbroadcaster.NewBroadcaster(logger)
opts1 := broadcaster1.AppendPubSubOpts([]pubsub.Option{
pubsub.WithMessageSigning(false),
pubsub.WithStrictSignatureVerification(false),
})
opts2 := broadcaster2.AppendPubSubOpts([]pubsub.Option{
pubsub.WithMessageSigning(false),
pubsub.WithStrictSignatureVerification(false),
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ps1, err := pubsub.NewGossipSub(ctx, h1, opts1...)
require.NoError(t, err)
ps2, err := pubsub.NewGossipSub(ctx, h2, opts2...)
require.NoError(t, err)
go broadcaster1.Start()
go broadcaster2.Start()
defer func() {
broadcaster1.Stop()
broadcaster2.Stop()
}()
// Generate Test Data
var blockRoot [fieldparams.RootLength]byte
copy(blockRoot[:], []byte("test-block-root"))
numCells := 6
commitments := make([][]byte, numCells)
cells := make([][]byte, numCells)
proofs := make([][]byte, numCells)
for i := range numCells {
commitments[i] = make([]byte, 48)
cells[i] = make([]byte, 2048)
_, err := rand.Read(cells[i])
require.NoError(t, err)
proofs[i] = make([]byte, 48)
_ = fmt.Appendf(proofs[i][:0], "proof %d", i)
}
roDC, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{
{
BodyRoot: blockRoot[:],
KzgCommitments: commitments,
Column: cells,
KzgProofs: proofs,
},
})
pc1, err := blocks.NewPartialDataColumn(roDC[0].DataColumnSidecar.SignedBlockHeader, roDC[0].Index, roDC[0].KzgCommitments, roDC[0].KzgCommitmentsInclusionProof)
require.NoError(t, err)
pc2, err := blocks.NewPartialDataColumn(roDC[0].DataColumnSidecar.SignedBlockHeader, roDC[0].Index, roDC[0].KzgCommitments, roDC[0].KzgCommitmentsInclusionProof)
require.NoError(t, err)
// Split data
for i := range numCells {
if i%2 == 0 {
pc1.ExtendFromVerfifiedCell(uint64(i), roDC[0].Column[i], roDC[0].KzgProofs[i])
} else {
pc2.ExtendFromVerfifiedCell(uint64(i), roDC[0].Column[i], roDC[0].KzgProofs[i])
}
}
// Setup Topic and Subscriptions
digest := params.ForkDigest(0)
columnIndex := uint64(12)
subnet := peerdas.ComputeSubnetForDataColumnSidecar(columnIndex)
topicStr := fmt.Sprintf(p2p.DataColumnSubnetTopicFormat, digest, subnet) +
encoder.SszNetworkEncoder{}.ProtocolSuffix()
time.Sleep(100 * time.Millisecond)
topic1, err := ps1.Join(topicStr, pubsub.RequestPartialMessages())
require.NoError(t, err)
topic2, err := ps2.Join(topicStr, pubsub.RequestPartialMessages())
require.NoError(t, err)
// Header validator that verifies the inclusion proof
headerValidator := func(header *ethpb.PartialDataColumnHeader) (reject bool, err error) {
if header == nil {
return false, fmt.Errorf("nil header")
}
if header.SignedBlockHeader == nil || header.SignedBlockHeader.Header == nil {
return true, fmt.Errorf("nil signed block header")
}
if len(header.KzgCommitments) == 0 {
return true, fmt.Errorf("empty kzg commitments")
}
// Verify inclusion proof
if err := peerdas.VerifyPartialDataColumnHeaderInclusionProof(header); err != nil {
return true, fmt.Errorf("invalid inclusion proof: %w", err)
}
t.Log("Header validation passed")
return false, nil
}
cellValidator := func(_ []blocks.CellProofBundle) error {
return nil
}
node1Complete := make(chan blocks.VerifiedRODataColumn, 1)
node2Complete := make(chan blocks.VerifiedRODataColumn, 1)
handler1 := func(topic string, col blocks.VerifiedRODataColumn) {
t.Logf("Node 1: Completed! Column has %d cells", len(col.Column))
node1Complete <- col
}
handler2 := func(topic string, col blocks.VerifiedRODataColumn) {
t.Logf("Node 2: Completed! Column has %d cells", len(col.Column))
node2Complete <- col
}
// Connect hosts
err = h1.Connect(context.Background(), peer.AddrInfo{
ID: h2.ID(),
Addrs: h2.Addrs(),
})
require.NoError(t, err)
time.Sleep(300 * time.Millisecond)
// Subscribe to regular GossipSub (critical for partial message RPC exchange!)
sub1, err := topic1.Subscribe()
require.NoError(t, err)
defer sub1.Cancel()
sub2, err := topic2.Subscribe()
require.NoError(t, err)
defer sub2.Cancel()
err = broadcaster1.Subscribe(topic1, headerValidator, cellValidator, handler1)
require.NoError(t, err)
err = broadcaster2.Subscribe(topic2, headerValidator, cellValidator, handler2)
require.NoError(t, err)
// Wait for mesh to form
time.Sleep(2 * time.Second)
// Publish
t.Log("Publishing from Node 1")
err = broadcaster1.Publish(topicStr, pc1)
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
t.Log("Publishing from Node 2")
err = broadcaster2.Publish(topicStr, pc2)
require.NoError(t, err)
// Wait for Completion
timeout := time.After(10 * time.Second)
var col1, col2 blocks.VerifiedRODataColumn
receivedCount := 0
for receivedCount < 2 {
select {
case col1 = <-node1Complete:
t.Log("Node 1 completed reconstruction")
receivedCount++
case col2 = <-node2Complete:
t.Log("Node 2 completed reconstruction")
receivedCount++
case <-timeout:
t.Fatalf("Timeout: Only %d/2 nodes completed", receivedCount)
}
}
// Verify both columns have all cells
assert.Equal(t, numCells, len(col1.Column), "Node 1 should have all cells")
assert.Equal(t, numCells, len(col2.Column), "Node 2 should have all cells")
assert.DeepSSZEqual(t, cells, col1.Column, "Node 1 cell mismatch")
assert.DeepSSZEqual(t, cells, col2.Column, "Node 2 cell mismatch")
})
}

View File

@@ -1,9 +0,0 @@
// Code generated by hack/gen-logs.sh; DO NOT EDIT.
// This file is created and regenerated automatically. Anything added here might get removed.
package partialdatacolumnbroadcaster
import "github.com/sirupsen/logrus"
// The prefix for logs from this package will be the text after the last slash in the package path.
// If you wish to change this, you should add your desired name in the runtime/logging/logrus-prefixed-formatter/prefix-replacement.go file.
var log = logrus.WithField("package", "beacon-chain/p2p/partialdatacolumnbroadcaster")

View File

@@ -1,18 +0,0 @@
package partialdatacolumnbroadcaster
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
partialMessageUsefulCellsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "beacon_partial_message_useful_cells_total",
Help: "Number of useful cells received via a partial message",
}, []string{"column_index"})
partialMessageCellsReceivedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "beacon_partial_message_cells_received_total",
Help: "Number of total cells received via a partial message",
}, []string{"column_index"})
)

View File

@@ -1,540 +0,0 @@
package partialdatacolumnbroadcaster
import (
"bytes"
"log/slog"
"regexp"
"strconv"
"time"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/internal/logrusadapter"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p-pubsub/partialmessages"
"github.com/libp2p/go-libp2p-pubsub/partialmessages/bitmap"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// TODOs:
// different eager push strategies:
// - no eager push
// - full column eager push
// - With debouncing - some factor of RTT
// - eager push missing cells
const TTLInSlots = 3
const maxConcurrentValidators = 128
var dataColumnTopicRegex = regexp.MustCompile(`data_column_sidecar_(\d+)`)
func extractColumnIndexFromTopic(topic string) (uint64, error) {
matches := dataColumnTopicRegex.FindStringSubmatch(topic)
if len(matches) < 2 {
return 0, errors.New("could not extract column index from topic")
}
return strconv.ParseUint(matches[1], 10, 64)
}
// HeaderValidator validates a PartialDataColumnHeader.
// Returns (reject, err) where:
// - reject=true, err!=nil: REJECT - peer should be penalized
// - reject=false, err!=nil: IGNORE - don't penalize, just ignore
// - reject=false, err=nil: valid header
type HeaderValidator func(header *ethpb.PartialDataColumnHeader) (reject bool, err error)
type ColumnValidator func(cells []blocks.CellProofBundle) error
type PartialColumnBroadcaster struct {
logger *logrus.Logger
ps *pubsub.PubSub
stop chan struct{}
// map topic -> headerValidators
headerValidators map[string]HeaderValidator
// map topic -> Validator
validators map[string]ColumnValidator
// map topic -> handler
handlers map[string]SubHandler
// map topic -> *pubsub.Topic
topics map[string]*pubsub.Topic
concurrentValidatorSemaphore chan struct{}
// map topic -> map[groupID]PartialColumn
partialMsgStore map[string]map[string]*blocks.PartialDataColumn
groupTTL map[string]int8
// validHeaderCache caches validated headers by group ID (works across topics)
validHeaderCache map[string]*ethpb.PartialDataColumnHeader
incomingReq chan request
}
type requestKind uint8
const (
requestKindPublish requestKind = iota
requestKindSubscribe
requestKindUnsubscribe
requestKindHandleIncomingRPC
requestKindCellsValidated
)
type request struct {
kind requestKind
response chan error
sub subscribe
unsub unsubscribe
publish publish
incomingRPC rpcWithFrom
cellsValidated *cellsValidated
}
type publish struct {
topic string
c blocks.PartialDataColumn
}
type subscribe struct {
t *pubsub.Topic
headerValidator HeaderValidator
validator ColumnValidator
handler SubHandler
}
type unsubscribe struct {
topic string
}
type rpcWithFrom struct {
*pubsub_pb.PartialMessagesExtension
from peer.ID
}
type cellsValidated struct {
validationTook time.Duration
topic string
group []byte
cellIndices []uint64
cells []blocks.CellProofBundle
}
func NewBroadcaster(logger *logrus.Logger) *PartialColumnBroadcaster {
return &PartialColumnBroadcaster{
validators: make(map[string]ColumnValidator),
headerValidators: make(map[string]HeaderValidator),
handlers: make(map[string]SubHandler),
topics: make(map[string]*pubsub.Topic),
partialMsgStore: make(map[string]map[string]*blocks.PartialDataColumn),
groupTTL: make(map[string]int8),
validHeaderCache: make(map[string]*ethpb.PartialDataColumnHeader),
// GossipSub sends the messages to this channel. The buffer should be
// big enough to avoid dropping messages. We don't want to block the gossipsub event loop for this.
incomingReq: make(chan request, 128*16),
logger: logger,
concurrentValidatorSemaphore: make(chan struct{}, maxConcurrentValidators),
}
}
// AppendPubSubOpts adds the necessary pubsub options to enable partial messages.
func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubsub.Option {
slogger := slog.New(logrusadapter.Handler{Logger: p.logger})
opts = append(opts,
pubsub.WithPartialMessagesExtension(&partialmessages.PartialMessagesExtension{
Logger: slogger,
MergePartsMetadata: func(topic string, left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
merged, err := blocks.MergePartsMetadata(left, right)
if err != nil {
p.logger.Warn("Failed to merge bitfields", "err", err, "left", left, "right", right)
return left
}
return merged
},
ValidateRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
// TODO. Add some basic and fast sanity checks
return nil
},
OnIncomingRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
select {
case p.incomingReq <- request{
kind: requestKindHandleIncomingRPC,
incomingRPC: rpcWithFrom{rpc, from},
}:
default:
p.logger.Warn("Dropping incoming partial RPC", "rpc", rpc)
}
return nil
},
}),
func(ps *pubsub.PubSub) error {
p.ps = ps
return nil
},
)
return opts
}
// Start starts the event loop of the PartialColumnBroadcaster. Should be called
// within a goroutine (go p.Start())
func (p *PartialColumnBroadcaster) Start() {
if p.stop != nil {
return
}
p.stop = make(chan struct{})
p.loop()
}
func (p *PartialColumnBroadcaster) loop() {
cleanup := time.NewTicker(time.Second * time.Duration(params.BeaconConfig().SecondsPerSlot))
defer cleanup.Stop()
for {
select {
case <-p.stop:
return
case <-cleanup.C:
for groupID, ttl := range p.groupTTL {
if ttl > 0 {
p.groupTTL[groupID] = ttl - 1
continue
}
delete(p.groupTTL, groupID)
delete(p.validHeaderCache, groupID)
for topic, msgStore := range p.partialMsgStore {
delete(msgStore, groupID)
if len(msgStore) == 0 {
delete(p.partialMsgStore, topic)
}
}
}
case req := <-p.incomingReq:
switch req.kind {
case requestKindPublish:
req.response <- p.publish(req.publish.topic, req.publish.c)
case requestKindSubscribe:
req.response <- p.subscribe(req.sub.t, req.sub.headerValidator, req.sub.validator, req.sub.handler)
case requestKindUnsubscribe:
req.response <- p.unsubscribe(req.unsub.topic)
case requestKindHandleIncomingRPC:
err := p.handleIncomingRPC(req.incomingRPC)
if err != nil {
p.logger.Error("Failed to handle incoming partial RPC", "err", err)
}
case requestKindCellsValidated:
err := p.handleCellsValidated(req.cellsValidated)
if err != nil {
p.logger.Error("Failed to handle cells validated", "err", err)
}
default:
p.logger.Error("Unknown request kind", "kind", req.kind)
}
}
}
}
func (p *PartialColumnBroadcaster) getDataColumn(topic string, group []byte) *blocks.PartialDataColumn {
topicStore, ok := p.partialMsgStore[topic]
if !ok {
return nil
}
msg, ok := topicStore[string(group)]
if !ok {
return nil
}
return msg
}
func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) error {
if p.ps == nil {
return errors.New("pubsub not initialized")
}
hasMessage := len(rpcWithFrom.PartialMessage) > 0
var message ethpb.PartialDataColumnSidecar
if hasMessage {
err := message.UnmarshalSSZ(rpcWithFrom.PartialMessage)
if err != nil {
return errors.Wrap(err, "failed to unmarshal partial message data")
}
}
topicID := rpcWithFrom.GetTopicID()
groupID := rpcWithFrom.GroupID
ourDataColumn := p.getDataColumn(topicID, groupID)
var shouldRepublish bool
if ourDataColumn == nil && hasMessage {
var header *ethpb.PartialDataColumnHeader
// Check cache first for this group
if cachedHeader, ok := p.validHeaderCache[string(groupID)]; ok {
header = cachedHeader
} else {
// We haven't seen this group before. Check if we have a valid header.
if len(message.Header) == 0 {
p.logger.Debug("No partial column found and no header in message, ignoring")
return nil
}
header = message.Header[0]
headerValidator, ok := p.headerValidators[topicID]
if !ok || headerValidator == nil {
p.logger.Debug("No header validator registered for topic")
return nil
}
reject, err := headerValidator(header)
if err != nil {
p.logger.Debug("Header validation failed", "err", err, "reject", reject)
if reject {
// REJECT case: penalize the peer
_ = p.ps.PeerFeedback(topicID, rpcWithFrom.from, pubsub.PeerFeedbackInvalidMessage)
}
// Both REJECT and IGNORE: don't process further
return nil
}
// Cache the valid header
p.validHeaderCache[string(groupID)] = header
// TODO: We now have the information we need to call GetBlobsV3, we should do that to see what we have locally.
}
columnIndex, err := extractColumnIndexFromTopic(topicID)
if err != nil {
return err
}
newColumn, err := blocks.NewPartialDataColumn(
header.SignedBlockHeader,
columnIndex,
header.KzgCommitments,
header.KzgCommitmentsInclusionProof,
)
if err != nil {
p.logger.WithError(err).WithFields(logrus.Fields{
"topic": topicID,
"columnIndex": columnIndex,
"numCommitments": len(header.KzgCommitments),
}).Error("Failed to create partial data column from header")
return err
}
// Save to store
topicStore, ok := p.partialMsgStore[topicID]
if !ok {
topicStore = make(map[string]*blocks.PartialDataColumn)
p.partialMsgStore[topicID] = topicStore
}
topicStore[string(newColumn.GroupID())] = &newColumn
p.groupTTL[string(newColumn.GroupID())] = TTLInSlots
ourDataColumn = &newColumn
shouldRepublish = true
}
if ourDataColumn == nil {
// We don't have a partial column for this. Can happen if we got cells
// without a header.
return nil
}
logger := p.logger.WithFields(logrus.Fields{
"from": rpcWithFrom.from,
"topic": topicID,
"group": groupID,
})
validator, validatorOK := p.validators[topicID]
if len(rpcWithFrom.PartialMessage) > 0 && validatorOK {
// TODO: is there any penalty we want to consider for giving us data we didn't request?
// Note that we need to be careful around race conditions and eager data.
// Also note that protobufs by design allow extra data that we don't parse.
// Marco's thoughts. No, we don't need to do anything else here.
cellIndices, cellsToVerify, err := ourDataColumn.CellsToVerifyFromPartialMessage(&message)
if err != nil {
return err
}
// Track cells received via partial message
if len(cellIndices) > 0 {
columnIndexStr := strconv.FormatUint(ourDataColumn.Index, 10)
partialMessageCellsReceivedTotal.WithLabelValues(columnIndexStr).Add(float64(len(cellIndices)))
}
if len(cellsToVerify) > 0 {
p.concurrentValidatorSemaphore <- struct{}{}
go func() {
defer func() {
<-p.concurrentValidatorSemaphore
}()
start := time.Now()
err := validator(cellsToVerify)
if err != nil {
logger.Error("failed to validate cells", "err", err)
_ = p.ps.PeerFeedback(topicID, rpcWithFrom.from, pubsub.PeerFeedbackInvalidMessage)
return
}
_ = p.ps.PeerFeedback(topicID, rpcWithFrom.from, pubsub.PeerFeedbackUsefulMessage)
p.incomingReq <- request{
kind: requestKindCellsValidated,
cellsValidated: &cellsValidated{
validationTook: time.Since(start),
topic: topicID,
group: groupID,
cells: cellsToVerify,
cellIndices: cellIndices,
},
}
}()
}
}
peerHas := bitmap.Bitmap(rpcWithFrom.PartsMetadata)
iHave := bitmap.Bitmap(ourDataColumn.PartsMetadata())
if !shouldRepublish && len(peerHas) > 0 && !bytes.Equal(peerHas, iHave) {
// Either we have something they don't or vice versa
shouldRepublish = true
logger.Debug("republishing due to parts metadata difference")
}
if shouldRepublish {
err := p.ps.PublishPartialMessage(topicID, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
return err
}
}
return nil
}
func (p *PartialColumnBroadcaster) handleCellsValidated(cells *cellsValidated) error {
ourDataColumn := p.getDataColumn(cells.topic, cells.group)
if ourDataColumn == nil {
return errors.New("data column not found for verified cells")
}
extended := ourDataColumn.ExtendFromVerfifiedCells(cells.cellIndices, cells.cells)
p.logger.Debug("Extended partial message", "duration", cells.validationTook, "extended", extended)
columnIndexStr := strconv.FormatUint(ourDataColumn.Index, 10)
if extended {
// Track useful cells (cells that extended our data)
partialMessageUsefulCellsTotal.WithLabelValues(columnIndexStr).Add(float64(len(cells.cells)))
// TODO: we could use the heuristic here that if this data was
// useful to us, it's likely useful to our peers and we should
// republish eagerly
if col, ok := ourDataColumn.Complete(p.logger); ok {
p.logger.Info("Completed partial column", "topic", cells.topic, "group", cells.group)
handler, handlerOK := p.handlers[cells.topic]
if handlerOK {
go handler(cells.topic, col)
}
} else {
p.logger.Info("Extended partial column", "topic", cells.topic, "group", cells.group)
}
err := p.ps.PublishPartialMessage(cells.topic, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
return err
}
}
return nil
}
func (p *PartialColumnBroadcaster) Stop() {
if p.stop != nil {
close(p.stop)
p.stop = nil
}
}
// Publish publishes the partial column.
func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataColumn) error {
if p.ps == nil {
return errors.New("pubsub not initialized")
}
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindPublish,
response: respCh,
publish: publish{
topic: topic,
c: c,
},
}
return <-respCh
}
func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataColumn) error {
topicStore, ok := p.partialMsgStore[topic]
if !ok {
topicStore = make(map[string]*blocks.PartialDataColumn)
p.partialMsgStore[topic] = topicStore
}
topicStore[string(c.GroupID())] = &c
p.groupTTL[string(c.GroupID())] = TTLInSlots
return p.ps.PublishPartialMessage(topic, &c, partialmessages.PublishOptions{})
}
type SubHandler func(topic string, col blocks.VerifiedRODataColumn)
func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindSubscribe,
sub: subscribe{
t: t,
headerValidator: headerValidator,
validator: validator,
handler: handler,
},
response: respCh,
}
return <-respCh
}
func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
topic := t.String()
if _, ok := p.topics[topic]; ok {
return errors.New("already subscribed")
}
p.topics[topic] = t
p.headerValidators[topic] = headerValidator
p.validators[topic] = validator
p.handlers[topic] = handler
return nil
}
func (p *PartialColumnBroadcaster) Unsubscribe(topic string) error {
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindUnsubscribe,
unsub: unsubscribe{
topic: topic,
},
response: respCh,
}
return <-respCh
}
func (p *PartialColumnBroadcaster) unsubscribe(topic string) error {
t, ok := p.topics[topic]
if !ok {
return errors.New("topic not found")
}
delete(p.topics, topic)
delete(p.partialMsgStore, topic)
delete(p.headerValidators, topic)
delete(p.validators, topic)
delete(p.handlers, topic)
return t.Close()
}

View File

@@ -58,7 +58,7 @@ func TestPeerExplicitAdd(t *testing.T) {
resAddress, err := p.Address(id)
require.NoError(t, err)
assert.Equal(t, address.Equal(resAddress), true, "Unexpected address")
assert.Equal(t, address, resAddress, "Unexpected address")
resDirection, err := p.Direction(id)
require.NoError(t, err)
@@ -72,7 +72,7 @@ func TestPeerExplicitAdd(t *testing.T) {
resAddress2, err := p.Address(id)
require.NoError(t, err)
assert.Equal(t, address2.Equal(resAddress2), true, "Unexpected address")
assert.Equal(t, address2, resAddress2, "Unexpected address")
resDirection2, err := p.Direction(id)
require.NoError(t, err)

View File

@@ -170,7 +170,7 @@ func (s *Service) pubsubOptions() []pubsub.Option {
pubsub.WithPeerScore(peerScoringParams(s.cfg.IPColocationWhitelist)),
pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute),
pubsub.WithGossipSubParams(pubsubGossipParam()),
pubsub.WithRawTracer(&gossipTracer{host: s.host}),
pubsub.WithRawTracer(gossipTracer{host: s.host}),
}
if len(s.cfg.StaticPeers) > 0 {
@@ -181,9 +181,6 @@ func (s *Service) pubsubOptions() []pubsub.Option {
}
psOpts = append(psOpts, pubsub.WithDirectPeers(directPeersAddrInfos))
}
if s.partialColumnBroadcaster != nil {
psOpts = s.partialColumnBroadcaster.AppendPubSubOpts(psOpts)
}
return psOpts
}

View File

@@ -1,8 +1,6 @@
package p2p
import (
"sync"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
@@ -10,7 +8,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
var _ = pubsub.RawTracer(&gossipTracer{})
var _ = pubsub.RawTracer(gossipTracer{})
// Initializes the values for the pubsub rpc action.
type action int
@@ -25,146 +23,85 @@ const (
// and broadcasted through gossipsub.
type gossipTracer struct {
host host.Host
mu sync.Mutex
// map topic -> Set(peerID). Peer is in set if it supports partial messages.
partialMessagePeers map[string]map[peer.ID]struct{}
// map topic -> Set(peerID). Peer is in set if in the mesh.
meshPeers map[string]map[peer.ID]struct{}
}
// AddPeer .
func (g *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {
func (g gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {
// no-op
}
// RemovePeer .
func (g *gossipTracer) RemovePeer(p peer.ID) {
g.mu.Lock()
defer g.mu.Unlock()
for _, peers := range g.partialMessagePeers {
delete(peers, p)
}
for topic, peers := range g.meshPeers {
if _, ok := peers[p]; ok {
delete(peers, p)
g.updateMeshPeersMetric(topic)
}
}
func (g gossipTracer) RemovePeer(p peer.ID) {
// no-op
}
// Join .
func (g *gossipTracer) Join(topic string) {
func (g gossipTracer) Join(topic string) {
pubsubTopicsActive.WithLabelValues(topic).Set(1)
g.mu.Lock()
defer g.mu.Unlock()
if g.partialMessagePeers == nil {
g.partialMessagePeers = make(map[string]map[peer.ID]struct{})
}
if g.partialMessagePeers[topic] == nil {
g.partialMessagePeers[topic] = make(map[peer.ID]struct{})
}
if g.meshPeers == nil {
g.meshPeers = make(map[string]map[peer.ID]struct{})
}
if g.meshPeers[topic] == nil {
g.meshPeers[topic] = make(map[peer.ID]struct{})
}
}
// Leave .
func (g *gossipTracer) Leave(topic string) {
func (g gossipTracer) Leave(topic string) {
pubsubTopicsActive.WithLabelValues(topic).Set(0)
g.mu.Lock()
defer g.mu.Unlock()
delete(g.partialMessagePeers, topic)
delete(g.meshPeers, topic)
}
// Graft .
func (g *gossipTracer) Graft(p peer.ID, topic string) {
func (g gossipTracer) Graft(p peer.ID, topic string) {
pubsubTopicsGraft.WithLabelValues(topic).Inc()
g.mu.Lock()
defer g.mu.Unlock()
if m, ok := g.meshPeers[topic]; ok {
m[p] = struct{}{}
}
g.updateMeshPeersMetric(topic)
}
// Prune .
func (g *gossipTracer) Prune(p peer.ID, topic string) {
func (g gossipTracer) Prune(p peer.ID, topic string) {
pubsubTopicsPrune.WithLabelValues(topic).Inc()
g.mu.Lock()
defer g.mu.Unlock()
if m, ok := g.meshPeers[topic]; ok {
delete(m, p)
}
g.updateMeshPeersMetric(topic)
}
// ValidateMessage .
func (g *gossipTracer) ValidateMessage(msg *pubsub.Message) {
func (g gossipTracer) ValidateMessage(msg *pubsub.Message) {
pubsubMessageValidate.WithLabelValues(*msg.Topic).Inc()
}
// DeliverMessage .
func (g *gossipTracer) DeliverMessage(msg *pubsub.Message) {
func (g gossipTracer) DeliverMessage(msg *pubsub.Message) {
pubsubMessageDeliver.WithLabelValues(*msg.Topic).Inc()
}
// RejectMessage .
func (g *gossipTracer) RejectMessage(msg *pubsub.Message, reason string) {
func (g gossipTracer) RejectMessage(msg *pubsub.Message, reason string) {
pubsubMessageReject.WithLabelValues(*msg.Topic, reason).Inc()
}
// DuplicateMessage .
func (g *gossipTracer) DuplicateMessage(msg *pubsub.Message) {
func (g gossipTracer) DuplicateMessage(msg *pubsub.Message) {
pubsubMessageDuplicate.WithLabelValues(*msg.Topic).Inc()
}
// UndeliverableMessage .
func (g *gossipTracer) UndeliverableMessage(msg *pubsub.Message) {
func (g gossipTracer) UndeliverableMessage(msg *pubsub.Message) {
pubsubMessageUndeliverable.WithLabelValues(*msg.Topic).Inc()
}
// ThrottlePeer .
func (g *gossipTracer) ThrottlePeer(p peer.ID) {
func (g gossipTracer) ThrottlePeer(p peer.ID) {
agent := agentFromPid(p, g.host.Peerstore())
pubsubPeerThrottle.WithLabelValues(agent).Inc()
}
// RecvRPC .
func (g *gossipTracer) RecvRPC(rpc *pubsub.RPC, from peer.ID) {
g.setMetricFromRPC(recv, pubsubRPCSubRecv, pubsubRPCPubRecv, pubsubRPCPubRecvSize, pubsubRPCRecv, rpc)
g.mu.Lock()
defer g.mu.Unlock()
for _, sub := range rpc.Subscriptions {
m, ok := g.partialMessagePeers[sub.GetTopicid()]
if !ok {
continue
}
if sub.GetSubscribe() && sub.GetRequestsPartial() {
m[from] = struct{}{}
} else {
delete(m, from)
}
}
func (g gossipTracer) RecvRPC(rpc *pubsub.RPC) {
g.setMetricFromRPC(recv, pubsubRPCSubRecv, pubsubRPCPubRecv, pubsubRPCRecv, rpc)
}
// SendRPC .
func (g *gossipTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {
g.setMetricFromRPC(send, pubsubRPCSubSent, pubsubRPCPubSent, pubsubRPCPubSentSize, pubsubRPCSent, rpc)
func (g gossipTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {
g.setMetricFromRPC(send, pubsubRPCSubSent, pubsubRPCPubSent, pubsubRPCSent, rpc)
}
// DropRPC .
func (g *gossipTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {
g.setMetricFromRPC(drop, pubsubRPCSubDrop, pubsubRPCPubDrop, pubsubRPCPubDropSize, pubsubRPCDrop, rpc)
func (g gossipTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {
g.setMetricFromRPC(drop, pubsubRPCSubDrop, pubsubRPCPubDrop, pubsubRPCDrop, rpc)
}
func (g *gossipTracer) setMetricFromRPC(act action, subCtr prometheus.Counter, pubCtr, pubSizeCtr, ctrlCtr *prometheus.CounterVec, rpc *pubsub.RPC) {
func (g gossipTracer) setMetricFromRPC(act action, subCtr prometheus.Counter, pubCtr, ctrlCtr *prometheus.CounterVec, rpc *pubsub.RPC) {
subCtr.Add(float64(len(rpc.Subscriptions)))
if rpc.Control != nil {
ctrlCtr.WithLabelValues("graft").Add(float64(len(rpc.Control.Graft)))
@@ -173,41 +110,12 @@ func (g *gossipTracer) setMetricFromRPC(act action, subCtr prometheus.Counter, p
ctrlCtr.WithLabelValues("iwant").Add(float64(len(rpc.Control.Iwant)))
ctrlCtr.WithLabelValues("idontwant").Add(float64(len(rpc.Control.Idontwant)))
}
// For incoming messages from pubsub, we do not record metrics for them as these values
// could be junk.
if act == recv {
return
}
for _, msg := range rpc.Publish {
pubCtr.WithLabelValues(msg.GetTopic()).Inc()
pubSizeCtr.WithLabelValues(msg.GetTopic(), "false").Add(float64(msg.Size()))
}
if rpc.Partial != nil {
pubCtr.WithLabelValues(rpc.Partial.GetTopicID()).Inc()
pubSizeCtr.WithLabelValues(rpc.Partial.GetTopicID(), "true").Add(float64(rpc.Partial.Size()))
}
}
// updateMeshPeersMetric requires the caller to hold the state mutex
func (g *gossipTracer) updateMeshPeersMetric(topic string) {
meshPeers, ok := g.meshPeers[topic]
if !ok {
return
}
partialPeers, ok := g.partialMessagePeers[topic]
if !ok {
return
}
var supportsPartial, doesNotSupportPartial float64
for p := range meshPeers {
if _, ok := partialPeers[p]; ok {
supportsPartial++
} else {
doesNotSupportPartial++
// For incoming messages from pubsub, we do not record metrics for them as these values
// could be junk.
if act == recv {
continue
}
pubCtr.WithLabelValues(*msg.Topic).Inc()
}
pubsubMeshPeers.WithLabelValues(topic, "true").Set(supportsPartial)
pubsubMeshPeers.WithLabelValues(topic, "false").Set(doesNotSupportPartial)
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/OffchainLabs/prysm/v7/async"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers/scorers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/types"
@@ -78,7 +77,6 @@ type Service struct {
privKey *ecdsa.PrivateKey
metaData metadata.Metadata
pubsub *pubsub.PubSub
partialColumnBroadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster
joinedTopics map[string]*pubsub.Topic
joinedTopicsLock sync.RWMutex
subnetsLock map[uint64]*sync.RWMutex
@@ -149,10 +147,6 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
custodyInfoSet: make(chan struct{}),
}
if cfg.PartialDataColumns {
s.partialColumnBroadcaster = partialdatacolumnbroadcaster.NewBroadcaster(log.Logger)
}
ipAddr := prysmnetwork.IPAddr()
opts, err := s.buildOptions(ipAddr, s.privKey)
@@ -311,10 +305,6 @@ func (s *Service) Start() {
logExternalDNSAddr(s.host.ID(), p2pHostDNS, p2pTCPPort)
}
go s.forkWatcher()
if s.partialColumnBroadcaster != nil {
go s.partialColumnBroadcaster.Start()
}
}
// Stop the p2p service and terminate all peer connections.
@@ -324,10 +314,6 @@ func (s *Service) Stop() error {
if s.dv5Listener != nil {
s.dv5Listener.Close()
}
if s.partialColumnBroadcaster != nil {
s.partialColumnBroadcaster.Stop()
}
return nil
}
@@ -364,10 +350,6 @@ func (s *Service) PubSub() *pubsub.PubSub {
return s.pubsub
}
func (s *Service) PartialColumnBroadcaster() *partialdatacolumnbroadcaster.PartialColumnBroadcaster {
return s.partialColumnBroadcaster
}
// Host returns the currently running libp2p
// host of the service.
func (s *Service) Host() host.Host {

Some files were not shown because too many files have changed in this diff Show More