Compare commits

..

18 Commits

Author SHA1 Message Date
Manu NALEPA
6397093627 foobar 2026-01-04 23:44:38 +01:00
Manu NALEPA
0db74365e0 Summarize "Accepted data column sidecars summary" log. (#16210)
**What type of PR is this?**
Other

**What does this PR do? Why is it needed?**

**Before:**
```
[2026-01-02 13:29:50.13] DEBUG sync: Accepted data column sidecars summary columnIndices=[0 1 6 7 8 9 10 11 12 13 14 15 16 18 23 28 29 31 32 35 37 38 39 40 41 42 43 45 47 48 49 50 51 52 55 58 59 60 62 65 66 68 70 73 74 75 76 78 79 81 83 84 88 89 90 93 94 95 96 98 99 103 105 106 107 108 109 110 111 113 114 115 117 118 119 121 122] gossipScores=[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] peers=[rjzcRC oxj6o4 HCT2LE HCT2LE oxj6o4 oxj6o4 oxj6o4 oxj6o4 oxj6o4 oxj6o4 oxj6o4 oxj6o4 HCT2LE HCT2LE aZAzfp HCT2LE HCT2LE oxj6o4 oxj6o4 oxj6o4 HCT2LE oxj6o4 oxj6o4 HCT2LE oxj6o4 HCT2LE oxj6o4 oxj6o4 oxj6o4 HCT2LE oxj6o4 oxj6o4 HCT2LE HCT2LE oxj6o4 oxj6o4 oxj6o4 oxj6o4 oxj6o4 HCT2LE oxj6o4 HCT2LE oxj6o4 oxj6o4 HCT2LE aZAzfp oxj6o4 oxj6o4 YdJQCg oxj6o4 oxj6o4 oxj6o4 HCT2LE oxj6o4 HCT2LE HCT2LE 5jMhEK HCT2LE oxj6o4 oxj6o4 oxj6o4 oxj6o4 oxj6o4 oxj6o4 HCT2LE rjzcRC oxj6o4 HCT2LE oxj6o4 oxj6o4 HCT2LE oxj6o4 oxj6o4 HCT2LE HCT2LE HCT2LE oxj6o4] receivedCount=77 sinceStartTimes=[869.00ms 845.00ms 797.00ms 795.00ms 805.00ms 906.00ms 844.00ms 849.00ms 843.00ms 844.00ms 821.00ms 796.00ms 794.00ms 796.00ms 838.00ms 842.00ms 843.00ms 848.00ms 795.00ms 820.00ms 797.00ms 830.00ms 801.00ms 794.00ms 925.00ms 924.00ms 935.00ms 843.00ms 802.00ms 796.00ms 802.00ms 798.00ms 794.00ms 796.00ms 796.00ms 843.00ms 802.00ms 830.00ms 826.00ms 796.00ms 819.00ms 801.00ms 852.00ms 877.00ms 876.00ms 843.00ms 843.00ms 844.00ms 1138.00ms 843.00ms 886.00ms 805.00ms 794.00ms 844.00ms 909.00ms 845.00ms 889.00ms 798.00ms 792.00ms 843.00ms 878.00ms 802.00ms 798.00ms 849.00ms 826.00ms 815.00ms 844.00ms 797.00ms 795.00ms 798.00ms 843.00ms 844.00ms 845.00ms 845.00ms 867.00ms 805.00ms 800.00ms] slot=2095599 validationTimes=[399.00ms 423.00ms 470.00ms 472.00ms 463.00ms 362.00ms 423.00ms 419.00ms 425.00ms 423.00ms 446.00ms 471.00ms 473.00ms 471.00ms 429.00ms 425.00ms 424.00ms 419.00ms 471.00ms 448.00ms 470.00ms 437.00ms 467.00ms 472.00ms 342.00ms 343.00ms 332.00ms 424.00ms 465.00ms 471.00ms 465.00ms 469.00ms 473.00ms 470.00ms 470.00ms 424.00ms 466.00ms 438.00ms 442.00ms 471.00ms 448.00ms 467.00ms 416.00ms 390.00ms 392.00ms 424.00ms 425.00ms 423.00ms 140.00ms 424.00ms 381.00ms 462.00ms 473.00ms 423.00ms 359.00ms 423.00ms 378.00ms 469.00ms 475.00ms 425.00ms 390.00ms 465.00ms 469.00ms 419.00ms 442.00ms 452.00ms 423.00ms 470.00ms 473.00ms 469.00ms 424.00ms 423.00ms 423.00ms 423.00ms 400.00ms 462.00ms 467.00ms]
```


**After:**
```
[2026-01-02 16:48:48.61] DEBUG sync: Accepted data column sidecars summary count=31 indices=0-1,3-5,7,21,24,27,29,36-37,46,48,55,57,66,70,76,82,89,93-94,97,99-101,113,120,124,126 root=0x409a4eac4761a3199f60dec0dfe50b6eed91e29d6c3671bb61704401906d2b69 sinceStartTime=[min: 512.181127ms, avg: 541.358688ms, max: 557.074707ms] slot=2096594 validationTime=[min: 13.357515ms, avg: 55.1343ms, max: 73.909889ms]
```

Distributions are still available on metrics:
<img width="792" height="309" alt="image"
src="https://github.com/user-attachments/assets/15128283-6740-4387-b205-41fb18205f54"
/>

<img width="799" height="322" alt="image"
src="https://github.com/user-attachments/assets/e0d602fa-db06-4cd3-8ec7-1ee2671c9921"
/>


**Which issues(s) does this PR fix?**

Fixes:
- https://github.com/OffchainLabs/prysm/issues/16208

**Other notes for review**

**Acknowledgements**
- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-01-02 17:09:30 +00:00
Potuz
6f90101364 Use proposer lookahead for data column verification (#16202)
Replace the proposer indices cache usage in data column sidecar
verification with direct state lookahead access. Since data column
sidecars require the Fulu fork, the state always has a ProposerLookahead
field that provides O(1) proposer index lookups for current and next
epoch.

This simplifies SidecarProposerExpected() by removing:
- Checkpoint-based proposer cache lookup
- Singleflight wrapper (not needed for O(1) access)
- Target root computation for cache keys

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-02 17:01:53 +00:00
Manu NALEPA
49e1763ec2 Data columns cache warmup: Parallelize computation of all files for a given epoch. (#16207)
**What type of PR is this?**
Other

**What does this PR do? Why is it needed?**
Before this PR, all `.sszs` files containing the data column sidecars
were read an process sequentially, taking some time.
After this PR, every `.sszs` files of a given epoch (so, up to 32 files
with the current `SLOT_PER_EPOCHS` value) are processed in parallel.

**Which issues(s) does this PR fix?**
- https://github.com/OffchainLabs/prysm/issues/16204

Tested on - [Netcup VPS 4000 G11](https://www.netcup.com/en/server/vps).
**Before this PR (3 trials)**:
```
[2026-01-02 08:55:12.71]  INFO filesystem: Data column filesystem cache warm-up complete elapsed=1m22.894007534s
[2026-01-02 12:59:33.62]  INFO filesystem: Data column filesystem cache warm-up complete elapsed=42.346732863s
[2026-01-02 13:03:13.65]  INFO filesystem: Data column filesystem cache warm-up complete elapsed=56.143565960s
```

**After this PR (3 trials)**:
```
[2026-01-02 12:50:07.53]  INFO filesystem: Data column filesystem cache warm-up complete elapsed=2.019424193s
[2026-01-02 12:52:01.34]  INFO filesystem: Data column filesystem cache warm-up complete elapsed=1.960671225s
[2026-01-02 12:53:34.66]  INFO filesystem: Data column filesystem cache warm-up complete elapsed=2.549555363s
```


**Acknowledgements**
- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-01-02 16:59:55 +00:00
Potuz
c2527c82cd Use a separate context when updating the NSC (#16209)
There is a race condition introduced in #16149 in which the update to
the NSC happens with a context that may be cancelled by the time the
routine is called. This PR starts a new context with a deadline to call
the routine in the background.

fixes #16205
2026-01-02 16:43:34 +00:00
Potuz
d4ea8fafd6 Call FCU in the background (#16149)
This PR introduces several simplifications to block processing.

It calls to notify the engine in the background when forkchoice needs to
be updated.

It no longer updates the caches and process epoch transition before
computing payload attributes, since this is no longer needed after Fulu.

It removes a complicated second call to FCU with the same head after
processing the last slot of the epoch.

Some checks for reviewers:

- the single caller of sendFCU held a lock to forkchoice. Since the call
now is in the background this helper can aquire the lock.
- All paths to handleEpochBoundary are now **NOT** locked. This allows
the lock to get the target root to be taken locally in place.
- The checkpoint cache is completely useless and thus the target root
call could be removed. But removing the proposer ID cache is more
complicated and out of scope for this PR.
- lateBlockTasks has pre and post-fulu cased, we could remove pre-fulu
checks and defer to the update function if deemed cleaner.
- Conversely, postBlockProcess does not have this casing and thus
pre-Fulu blocks on gossip may fail to get proposed correctly because of
the lack of the proposer being correctly computed.
2025-12-30 21:01:34 +00:00
kasey
07d1d6bdf9 Fix validation bug in --backfill-oldest-slot (#16173)
**What type of PR is this?**

Bug fix


**What does this PR do? Why is it needed?**

Validation of `--backfill-oldest-slot` fails for values > 1056767,
because the validation code is comparing the slot/32 to
`MIN_EPOCHS_FOR_BLOCK_REQUESTS` (33024), instead of comparing it to
`current_epoch - MIN_EPOCHS_FOR_BLOCK_REQUESTS`.

**Which issues(s) does this PR fix?**

Fixes #

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
2025-12-29 20:35:46 +00:00
Potuz
f938da99d9 Use head to validate atts for previous epoch (#16109)
In the event that the target checkpoint of an attestation is for the
previous epoch, and the head state has the same dependent root at that
epoch. The reason being that this guarantees that both seed and active
validator indices are guaranteed to be the same at the checkpoint's
epoch, from the point of view of the attester (even on a different
branch) and the head view.
2025-12-29 20:07:21 +00:00
Potuz
9deec69cc7 Do not verify block signature on block processing (#14820)
Verifying the block signature adds a batch and performs a full hash of
the block unnecessarily.
2025-12-29 19:52:38 +00:00
Potuz
2767f08f4d Do not send FCU on block batches (#16199)
On block batches the engine does not need to be notified of FCU, only on
regular sync at the end of sync it's useful to notify the engine.
2025-12-29 11:39:12 +00:00
Radosław Kapka
d46c620783 Extend httperror analyzer to more functions (#16186)
**What type of PR is this?**

Tooling

**What does this PR do? Why is it needed?**

Renames `httperror` analyzer to `httpwriter` and extends it to the
following functions:
- `WriteError`
- `WriteJson`
- `WriteSsz`

_**NOTE: The PR is currently red because the fix in
https://github.com/OffchainLabs/prysm/pull/16175 must be merged first**_

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2025-12-23 16:53:01 +00:00
sashass1315
dd05e44ef3 fix: avoid panic when fork schedule is empty (#16175)
SortedForkSchedule should never be empty for a properly initialized
network schedule, but the handler already had a branch to support an
empty result. Without an early return, we wrote a JSON response and then
still accessed schedule[0], which could panic and double-write the HTTP
response in misconfigured setups.

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2025-12-23 15:46:21 +00:00
satushh
9da36a5de6 Use HasPendingBalanceToWithdraw instead of PendingBalanceToWithdraw in ProcessConsolidationRequests (#16189)
<!-- Thanks for sending a PR! Before submitting:

1. If this is your first PR, check out our contribution guide here
https://docs.prylabs.network/docs/contribute/contribution-guidelines
You will then need to sign our Contributor License Agreement (CLA),
which will show up as a comment from a bot in this pull request after
you open it. We cannot review code without a signed CLA.
2. Please file an associated tracking issue if this pull request is
non-trivial and requires context for our team to understand. All
features and most bug fixes should have
an associated issue with a design discussed and decided upon. Small bug
   fixes and documentation improvements don't need issues.
3. New features and bug fixes must have tests. Documentation may need to
be updated. If you're unsure what to update, send the PR, and we'll
discuss
   in review.
4. Note that PRs updating dependencies and new Go versions are not
accepted.
   Please file an issue instead.
5. A changelog entry is required for user facing issues.
-->

**What type of PR is this?**

Performance

**What does this PR do? Why is it needed?**

`PendingBalanceToWithdraw` was used to find the `bal` only to check
later if `bal` is greater than 0 or not. No need to calculate the full
balance and we could just check if `bal` is greater than 0 or not by
using an existing function `HasPendingBalanceToWithdraw`. So this should
help in reducing some unnecessary computation.

`HasPendingBalanceToWithdraw` returns immediately on first match of
non-zero instance, while `PendingBalanceToWithdraw` always iterates
through all entries to compute the sum.

**Which issues(s) does this PR fix?**

Fixes #

**Other notes for review**

**Acknowledgements**

- [ ] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [ ] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [ ] I have added a description with sufficient context for reviewers
to understand this PR.
- [ ] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2025-12-22 18:16:14 +00:00
terence
7950a24926 feat(primitives): add BuilderIndex SSZ type (#16169)
This pr adds primitives.BuilderIndex for builder registry indexing in
Gloas
2025-12-20 04:29:42 +00:00
Potuz
ea51253be9 Do not process slots and copy state for payload attributes post Fulu (#16168)
When computing payload attributes post-Fulu, we do not need to process
slots, nor copy the state if we need to find out if the node is
proposing in the next slot. This prevents an immediate epoch processing
after block 31 is processed unless we are actually proposing.
2025-12-19 22:03:52 +00:00
Manu NALEPA
2ac30f5ce6 Pending aggregates: When multiple aggregated attestations only differing by the aggregator index are in the pending queue, only process one of them. (#16153)
**What type of PR is this?**
Other

**What does this PR do? Why is it needed?**
When an (potentially aggregated) attestation is received **before** the
block being voted for, Prysm queues this attestation, then processes the
queue when the block has been received.

This behavior is consistent with the [Phase0 specification
](https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id).

> [IGNORE] The block being voted for
(attestation.data.beacon_block_root) has been seen (via gossip or
non-gossip sources) (a client MAY queue attestations for processing once
block is retrieved).

Once the block being voted for is processed, previously queued
(potentially aggregated) attestations are then processed, and
broadcasted.

Processing (potentially aggregated) attestations takes some non
negligible time. For this reason, (potentially aggregated) attestations
are deduplicated before being introduced into the pending queue, to
avoid eventually processing duplicates.

Before this PR, two aggregated attestations were considered duplicated
if all of the following conditions were gathered:
1. Attestations have the same version, 
2. **Attestations have the same aggregator index (aka., the same
validator aggregated them)**,
3. Attestations have the same slot, 
4. Attestations have the same committee index, and
5. Attestations have the same aggregation bits

Aggregated attestations are then broadcasted.
The final purpose of aggregated attestations is to be packed into the
next block by the next proposer.
When packing attestations, the aggregator index is not used any more.

This pull request modifies the deduplication function used in the
pending aggregated attestations queue by considering that multiple
aggregated attestations only differing by the aggregator index are
equivalent (removing `2.` of the previous list.)

As a consequence, the count of aggregated attestations to be introduced
in the pending queue is reduced from 1 aggregated attestation by
aggregator to, in the best case,
[MAX_COMMITTEE_PER_SLOT=64](https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#misc-1).

Also, only a single aggregated attestation for a given version, slot,
committee index and aggregation bits will be re-broadcasted. This is a
correct behavior, since no data to be included in a block will be lost.
(We can even say that this will reduce by a bit the total networking
volume.)

**How to test**:
1. Start a beacon node (preferably, on a slow computer) from a
checkpoint.
2. Filter logs containing `Synced new block` and `Verified and saved
pending attestations to pool`. (You can pipe logs into `grep -E "Synced
new block|Verified and saved pending attestations to pool"`.

- In `Synced new block` logs, monitor the `sinceSlotStartTime` value.
This should monotonically decrease.
- In `Verified and saved pending attestations to pool`, monitor the
`pendingAggregateAttAndProofCount` value. It should be a "honest" value.
"honest" is not really quantifiable here, since it depends on the
aggregators. But it's likely to be less than
`5*MAX_COMMITTEE_PER_SLOT=320`.

**Which issues(s) does this PR fix?**

Partially fixes:
- https://github.com/OffchainLabs/prysm/issues/16160

**Other notes for review**
Please read commit by commit, with commit messages.
The important commit is b748c04a67.

**Acknowledgements**
- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2025-12-19 14:05:50 +00:00
Manu NALEPA
7418c00ad6 validateDataColumn: Remove error logs. (#16157)
**What type of PR is this?**
Other

**What does this PR do? Why is it needed?**
When we receive data column sidecars via gossip, if the sidecar does not
respect the validation rules, a scary ERROR log is displayed. We can't
to anything about it, since the error comes from an invalid incoming
sidecar, so there is no need to print an ERROR message.

Node: As all REJECTED gossip message, a DEBUG log is also always
displayed.

Example of ERROR log:
```
[2025-12-18 15:38:26.46] ERROR sync: Failed to decode message error=invalid ssz encoding. first variable element offset indexes into fixed value data
[2025-12-18 15:38:26.46] DEBUG sync: Gossip message was rejected agent=erigon/caplin error=invalid ssz encoding. first variable element offset indexes into fixed value data gossipScore=0 multiaddress=/ip4/141.147.32.105/tcp/9000 peerID=16Uiu2HAmHu88k97iBist1vJg7cPNuTjJFRARKvDF7yaH3Pv3Vmso topic=/eth2/c6ecb76c/data_column_sidecar_30/ssz_snappy
```

(After this PR, the DEBUG one will still be printed.)

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2025-12-18 16:18:02 +00:00
james-prysm
66342655fd throw 503 error when submit attestation and sync committee are called on syncing node + align changes to gRPC (#16152)
<!-- Thanks for sending a PR! Before submitting:

1. If this is your first PR, check out our contribution guide here
https://docs.prylabs.network/docs/contribute/contribution-guidelines
You will then need to sign our Contributor License Agreement (CLA),
which will show up as a comment from a bot in this pull request after
you open it. We cannot review code without a signed CLA.
2. Please file an associated tracking issue if this pull request is
non-trivial and requires context for our team to understand. All
features and most bug fixes should have
an associated issue with a design discussed and decided upon. Small bug
   fixes and documentation improvements don't need issues.
3. New features and bug fixes must have tests. Documentation may need to
be updated. If you're unsure what to update, send the PR, and we'll
discuss
   in review.
4. Note that PRs updating dependencies and new Go versions are not
accepted.
   Please file an issue instead.
5. A changelog entry is required for user facing issues.
-->

**What type of PR is this?**

Bug fix


**What does this PR do? Why is it needed?**

Prysm starting throwing this error `Could not write response message"
error="write tcp 10.104.92.212:5052->10.104.92.196:41876: write: broken
pipe` because a validator got attestation data from a synced node and
submitted attestation to a syncing node, when the node couldn't replay
the state the validator context deadlined and disconnected but the
writer when it finally responded tries to write it gets this broken pipe
error.

applies to `/eth/v2/beacon/pool/attestations` and
`/eth/v1/beacon/pool/sync_committees`

the solution is 2 part.
1. we shouldn't allow submission of an attestation if the node is
syncing because we can't save the attestation without the state
information.
2. we were doing the expensive state call before broadcast before in
rest and now it should match gRPC where it happens afterward in its own
go routine.

Tested manually running kurtosis with rest validators

```
participants:
 # Super-nodes
 - el_type: nethermind
   cl_type: prysm
   cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest
   count: 2
   supernode: true
   cl_extra_params:
     - --subscribe-all-subnets
     - --verbosity=debug
   vc_extra_params:
     - --enable-beacon-rest-api
     - --verbosity=debug

 # Full-nodes
 - el_type: nethermind
   cl_type: prysm
   cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest
   validator_count: 63
   cl_extra_params:
     - --verbosity=debug
   vc_extra_params:
     - --enable-beacon-rest-api
     - --verbosity=debug

 - el_type: nethermind
   cl_type: prysm
   cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest
   cl_extra_params:
     - --verbosity=debug
   vc_extra_params:
     - --enable-beacon-rest-api
     - --verbosity=debug
   validator_count: 13

additional_services:
 - dora
 - spamoor

spamoor_params:
 image: ethpandaops/spamoor:master
 max_mem: 4000
 spammers:
   - scenario: eoatx
     config:
       throughput: 200
   - scenario: blobs
     config:
       throughput: 20

network_params:
  fulu_fork_epoch: 2
  bpo_1_epoch: 8
  bpo_1_max_blobs: 21
  withdrawal_type: "0x02"
  preset: mainnet
  seconds_per_slot: 6

global_log_level: debug
```

**Which issues(s) does this PR fix?**

Fixes #

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2025-12-18 15:07:09 +00:00
121 changed files with 1575 additions and 4211 deletions

View File

@@ -193,7 +193,7 @@ nogo(
"//tools/analyzers/featureconfig:go_default_library",
"//tools/analyzers/gocognit:go_default_library",
"//tools/analyzers/ineffassign:go_default_library",
"//tools/analyzers/httperror:go_default_library",
"//tools/analyzers/httpwriter:go_default_library",
"//tools/analyzers/interfacechecker:go_default_library",
"//tools/analyzers/logcapitalization:go_default_library",
"//tools/analyzers/logruswitherror:go_default_library",

View File

@@ -323,14 +323,17 @@ func (s *Service) getPayloadAttribute(ctx context.Context, st state.BeaconState,
var ok bool
e := slots.ToEpoch(slot)
stateEpoch := slots.ToEpoch(st.Slot())
if e == stateEpoch {
fuluAndNextEpoch := st.Version() >= version.Fulu && e == stateEpoch+1
if e == stateEpoch || fuluAndNextEpoch {
val, ok = s.trackedProposer(st, slot)
if !ok {
return emptyAttri
}
}
st = st.Copy()
if slot > st.Slot() {
// At this point either we know we are proposing on a future slot or we need to still compute the
// right proposer index pre-Fulu, either way we need to copy the state to process it.
st = st.Copy()
var err error
st, err = transition.ProcessSlotsUsingNextSlotCache(ctx, st, headRoot, slot)
if err != nil {
@@ -338,7 +341,7 @@ func (s *Service) getPayloadAttribute(ctx context.Context, st state.BeaconState,
return emptyAttri
}
}
if e > stateEpoch {
if e > stateEpoch && !fuluAndNextEpoch {
emptyAttri := payloadattribute.EmptyWithVersion(st.Version())
val, ok = s.trackedProposer(st, slot)
if !ok {

View File

@@ -1053,40 +1053,3 @@ func TestKZGCommitmentToVersionedHashes(t *testing.T) {
require.Equal(t, vhs[0].String(), vh0)
require.Equal(t, vhs[1].String(), vh1)
}
func TestComputePayloadAttribute(t *testing.T) {
service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache()))
ctx := tr.ctx
st, _ := util.DeterministicGenesisStateBellatrix(t, 1)
service.cfg.TrackedValidatorsCache.Set(cache.TrackedValidator{Active: true, Index: 0})
// Cache hit, advance state, no fee recipient
slot := primitives.Slot(1)
service.cfg.PayloadIDCache.Set(slot, [32]byte{}, [8]byte{})
blk := util.NewBeaconBlockBellatrix()
signed, err := consensusblocks.NewSignedBeaconBlock(blk)
require.NoError(t, err)
roblock, err := consensusblocks.NewROBlockWithRoot(signed, [32]byte{'a'})
require.NoError(t, err)
cfg := &postBlockProcessConfig{
ctx: ctx,
roblock: roblock,
}
fcu := &fcuConfig{
headState: st,
proposingSlot: slot,
headRoot: [32]byte{},
}
require.NoError(t, service.computePayloadAttributes(cfg, fcu))
require.Equal(t, false, fcu.attributes.IsEmpty())
require.Equal(t, params.BeaconConfig().EthBurnAddressHex, common.BytesToAddress(fcu.attributes.SuggestedFeeRecipient()).String())
// Cache hit, advance state, has fee recipient
suggestedAddr := common.HexToAddress("123")
service.cfg.TrackedValidatorsCache.Set(cache.TrackedValidator{Active: true, FeeRecipient: primitives.ExecutionAddress(suggestedAddr), Index: 0})
service.cfg.PayloadIDCache.Set(slot, [32]byte{}, [8]byte{})
require.NoError(t, service.computePayloadAttributes(cfg, fcu))
require.Equal(t, false, fcu.attributes.IsEmpty())
require.Equal(t, suggestedAddr, common.BytesToAddress(fcu.attributes.SuggestedFeeRecipient()))
}

View File

@@ -12,6 +12,7 @@ import (
payloadattribute "github.com/OffchainLabs/prysm/v7/consensus-types/payload-attribute"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -53,58 +54,53 @@ type fcuConfig struct {
}
// sendFCU handles the logic to notify the engine of a forckhoice update
// for the first time when processing an incoming block during regular sync. It
// always updates the shuffling caches and handles epoch transitions when the
// incoming block is late, preparing payload attributes in this case while it
// only sends a message with empty attributes for early blocks.
func (s *Service) sendFCU(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) error {
if !s.isNewHead(cfg.headRoot) {
return nil
// when processing an incoming block during regular sync. It
// always updates the shuffling caches and handles epoch transitions .
func (s *Service) sendFCU(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) {
if cfg.postState.Version() < version.Fulu {
// update the caches to compute the right proposer index
// this function is called under a forkchoice lock which we need to release.
s.ForkChoicer().Unlock()
s.updateCachesPostBlockProcessing(cfg)
s.ForkChoicer().Lock()
}
if err := s.getFCUArgs(cfg, fcuArgs); err != nil {
log.WithError(err).Error("Could not get forkchoice update argument")
return
}
// If head has not been updated and attributes are nil, we can skip the FCU.
if !s.isNewHead(cfg.headRoot) && (fcuArgs.attributes == nil || fcuArgs.attributes.IsEmpty()) {
return
}
// If we are proposing and we aim to reorg the block, we have already sent FCU with attributes on lateBlockTasks
if fcuArgs.attributes != nil && !fcuArgs.attributes.IsEmpty() && s.shouldOverrideFCU(cfg.headRoot, s.CurrentSlot()+1) {
return nil
return
}
if s.inRegularSync() {
go s.forkchoiceUpdateWithExecution(cfg.ctx, fcuArgs)
}
return s.forkchoiceUpdateWithExecution(cfg.ctx, fcuArgs)
}
// sendFCUWithAttributes computes the payload attributes and sends an FCU message
// to the engine if needed
func (s *Service) sendFCUWithAttributes(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) {
slotCtx, cancel := context.WithTimeout(context.Background(), slotDeadline)
defer cancel()
cfg.ctx = slotCtx
s.cfg.ForkChoiceStore.RLock()
defer s.cfg.ForkChoiceStore.RUnlock()
if err := s.computePayloadAttributes(cfg, fcuArgs); err != nil {
log.WithError(err).Error("Could not compute payload attributes")
return
}
if fcuArgs.attributes.IsEmpty() {
return
}
if _, err := s.notifyForkchoiceUpdate(cfg.ctx, fcuArgs); err != nil {
log.WithError(err).Error("Could not update forkchoice with payload attributes for proposal")
if s.isNewHead(fcuArgs.headRoot) {
if err := s.saveHead(cfg.ctx, fcuArgs.headRoot, fcuArgs.headBlock, fcuArgs.headState); err != nil {
log.WithError(err).Error("Could not save head")
}
s.pruneAttsFromPool(s.ctx, fcuArgs.headState, fcuArgs.headBlock)
}
}
// fockchoiceUpdateWithExecution is a wrapper around notifyForkchoiceUpdate. It decides whether a new call to FCU should be made.
func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuConfig) error {
// fockchoiceUpdateWithExecution is a wrapper around notifyForkchoiceUpdate. It gets a forkchoice lock and calls the engine.
// The caller of this function should NOT have a lock in forkchoice store.
func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuConfig) {
_, span := trace.StartSpan(ctx, "beacon-chain.blockchain.forkchoiceUpdateWithExecution")
defer span.End()
// Note: Use the service context here to avoid the parent context being ended during a forkchoice update.
ctx = trace.NewContext(s.ctx, span)
s.ForkChoicer().Lock()
defer s.ForkChoicer().Unlock()
_, err := s.notifyForkchoiceUpdate(ctx, args)
if err != nil {
return errors.Wrap(err, "could not notify forkchoice update")
log.WithError(err).Error("Could not notify forkchoice update")
}
if err := s.saveHead(ctx, args.headRoot, args.headBlock, args.headState); err != nil {
log.WithError(err).Error("Could not save head")
}
// Only need to prune attestations from pool if the head has changed.
s.pruneAttsFromPool(s.ctx, args.headState, args.headBlock)
return nil
}
// shouldOverrideFCU checks whether the incoming block is still subject to being

View File

@@ -97,7 +97,7 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) {
headBlock: wsb,
proposingSlot: service.CurrentSlot() + 1,
}
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, args))
service.forkchoiceUpdateWithExecution(ctx, args)
payloadID, has := service.cfg.PayloadIDCache.PayloadID(2, [32]byte{2})
require.Equal(t, true, has)
@@ -151,7 +151,7 @@ func TestService_forkchoiceUpdateWithExecution_SameHeadRootNewProposer(t *testin
headRoot: r,
proposingSlot: service.CurrentSlot() + 1,
}
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, args))
service.forkchoiceUpdateWithExecution(ctx, args)
}
func TestShouldOverrideFCU(t *testing.T) {

View File

@@ -110,7 +110,7 @@ func VerifyCellKZGProofBatch(commitmentsBytes []Bytes48, cellIndices []uint64, c
ckzgCells := make([]ckzg4844.Cell, len(cells))
for i := range cells {
ckzgCells[i] = ckzg4844.Cell(cells[i])
copy(ckzgCells[i][:], cells[i][:])
}
return ckzg4844.VerifyCellKZGProofBatch(commitmentsBytes, cellIndices, ckzgCells, proofsBytes)
}

View File

@@ -22,7 +22,7 @@ import (
// The caller of this function must have a lock on forkchoice.
func (s *Service) getRecentPreState(ctx context.Context, c *ethpb.Checkpoint) state.ReadOnlyBeaconState {
headEpoch := slots.ToEpoch(s.HeadSlot())
if c.Epoch < headEpoch || c.Epoch == 0 {
if c.Epoch+1 < headEpoch || c.Epoch == 0 {
return nil
}
// Only use head state if the head state is compatible with the target checkpoint.
@@ -30,11 +30,13 @@ func (s *Service) getRecentPreState(ctx context.Context, c *ethpb.Checkpoint) st
if err != nil {
return nil
}
headDependent, err := s.cfg.ForkChoiceStore.DependentRootForEpoch([32]byte(headRoot), c.Epoch-1)
// headEpoch - 1 equals c.Epoch if c is from the previous epoch and equals c.Epoch - 1 if c is from the current epoch.
// We don't use the smaller c.Epoch - 1 because forkchoice would not have the data to answer that.
headDependent, err := s.cfg.ForkChoiceStore.DependentRootForEpoch([32]byte(headRoot), headEpoch-1)
if err != nil {
return nil
}
targetDependent, err := s.cfg.ForkChoiceStore.DependentRootForEpoch([32]byte(c.Root), c.Epoch-1)
targetDependent, err := s.cfg.ForkChoiceStore.DependentRootForEpoch([32]byte(c.Root), headEpoch-1)
if err != nil {
return nil
}
@@ -43,7 +45,7 @@ func (s *Service) getRecentPreState(ctx context.Context, c *ethpb.Checkpoint) st
}
// If the head state alone is enough, we can return it directly read only.
if c.Epoch == headEpoch {
if c.Epoch <= headEpoch {
st, err := s.HeadStateReadOnly(ctx)
if err != nil {
return nil

View File

@@ -170,12 +170,13 @@ func TestService_GetRecentPreState(t *testing.T) {
err = s.SetFinalizedCheckpoint(cp0)
require.NoError(t, err)
st, root, err := prepareForkchoiceState(ctx, 31, [32]byte(ckRoot), [32]byte{}, [32]byte{'R'}, cp0, cp0)
st, blk, err := prepareForkchoiceState(ctx, 31, [32]byte(ckRoot), [32]byte{}, [32]byte{'R'}, cp0, cp0)
require.NoError(t, err)
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, st, root))
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, st, blk))
service.head = &head{
root: [32]byte(ckRoot),
state: s,
block: blk,
slot: 31,
}
require.NotNil(t, service.getRecentPreState(ctx, &ethpb.Checkpoint{Epoch: 1, Root: ckRoot}))
@@ -197,12 +198,13 @@ func TestService_GetRecentPreState_Old_Checkpoint(t *testing.T) {
err = s.SetFinalizedCheckpoint(cp0)
require.NoError(t, err)
st, root, err := prepareForkchoiceState(ctx, 33, [32]byte(ckRoot), [32]byte{}, [32]byte{'R'}, cp0, cp0)
st, blk, err := prepareForkchoiceState(ctx, 33, [32]byte(ckRoot), [32]byte{}, [32]byte{'R'}, cp0, cp0)
require.NoError(t, err)
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, st, root))
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, st, blk))
service.head = &head{
root: [32]byte(ckRoot),
state: s,
block: blk,
slot: 33,
}
require.IsNil(t, service.getRecentPreState(ctx, &ethpb.Checkpoint{}))
@@ -227,6 +229,7 @@ func TestService_GetRecentPreState_Same_DependentRoots(t *testing.T) {
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, st, blk))
st, blk, err = prepareForkchoiceState(ctx, 64, [32]byte{'T'}, blk.Root(), [32]byte{}, cp0, cp0)
require.NoError(t, err)
headBlock := blk
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, st, blk))
st, blk, err = prepareForkchoiceState(ctx, 33, [32]byte{'U'}, [32]byte(ckRoot), [32]byte{}, cp0, cp0)
require.NoError(t, err)
@@ -235,8 +238,9 @@ func TestService_GetRecentPreState_Same_DependentRoots(t *testing.T) {
service.head = &head{
root: [32]byte{'T'},
state: s,
block: headBlock,
slot: 64,
state: s,
}
require.NotNil(t, service.getRecentPreState(ctx, &ethpb.Checkpoint{Epoch: 2, Root: cpRoot[:]}))
}
@@ -263,6 +267,7 @@ func TestService_GetRecentPreState_Different_DependentRoots(t *testing.T) {
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, st, blk))
st, blk, err = prepareForkchoiceState(ctx, 64, [32]byte{'U'}, blk.Root(), [32]byte{}, cp0, cp0)
require.NoError(t, err)
headBlock := blk
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, st, blk))
st, blk, err = prepareForkchoiceState(ctx, 33, [32]byte{'V'}, [32]byte(ckRoot), [32]byte{}, cp0, cp0)
require.NoError(t, err)
@@ -270,7 +275,8 @@ func TestService_GetRecentPreState_Different_DependentRoots(t *testing.T) {
cpRoot := blk.Root()
service.head = &head{
root: [32]byte{'T'},
root: [32]byte{'U'},
block: headBlock,
state: s,
slot: 64,
}
@@ -287,12 +293,13 @@ func TestService_GetRecentPreState_Different(t *testing.T) {
err = s.SetFinalizedCheckpoint(cp0)
require.NoError(t, err)
st, root, err := prepareForkchoiceState(ctx, 33, [32]byte(ckRoot), [32]byte{}, [32]byte{'R'}, cp0, cp0)
st, blk, err := prepareForkchoiceState(ctx, 33, [32]byte(ckRoot), [32]byte{}, [32]byte{'R'}, cp0, cp0)
require.NoError(t, err)
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, st, root))
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, st, blk))
service.head = &head{
root: [32]byte(ckRoot),
state: s,
block: blk,
slot: 33,
}
require.IsNil(t, service.getRecentPreState(ctx, &ethpb.Checkpoint{}))

View File

@@ -66,9 +66,6 @@ func (s *Service) postBlockProcess(cfg *postBlockProcessConfig) error {
startTime := time.Now()
fcuArgs := &fcuConfig{}
if s.inRegularSync() {
defer s.handleSecondFCUCall(cfg, fcuArgs)
}
if features.Get().EnableLightClient && slots.ToEpoch(s.CurrentSlot()) >= params.BeaconConfig().AltairForkEpoch {
defer s.processLightClientUpdates(cfg)
}
@@ -105,14 +102,17 @@ func (s *Service) postBlockProcess(cfg *postBlockProcessConfig) error {
s.logNonCanonicalBlockReceived(cfg.roblock.Root(), cfg.headRoot)
return nil
}
if err := s.getFCUArgs(cfg, fcuArgs); err != nil {
log.WithError(err).Error("Could not get forkchoice update argument")
return nil
}
if err := s.sendFCU(cfg, fcuArgs); err != nil {
return errors.Wrap(err, "could not send FCU to engine")
}
s.sendFCU(cfg, fcuArgs)
// Pre-Fulu the caches are updated when computing the payload attributes
if cfg.postState.Version() >= version.Fulu {
go func() {
ctx, cancel := context.WithTimeout(s.ctx, slotDeadline)
defer cancel()
cfg.ctx = ctx
s.updateCachesPostBlockProcessing(cfg)
}()
}
return nil
}
@@ -295,14 +295,6 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo
return errors.Wrap(err, "could not set optimistic block to valid")
}
}
arg := &fcuConfig{
headState: preState,
headRoot: lastBR,
headBlock: lastB,
}
if _, err := s.notifyForkchoiceUpdate(ctx, arg); err != nil {
return err
}
return s.saveHeadNoDB(ctx, lastB, lastBR, preState, !isValidPayload)
}
@@ -330,6 +322,7 @@ func (s *Service) areSidecarsAvailable(ctx context.Context, avs das.Availability
return nil
}
// the caller of this function must not hold a lock in forkchoice store.
func (s *Service) updateEpochBoundaryCaches(ctx context.Context, st state.BeaconState) error {
e := coreTime.CurrentEpoch(st)
if err := helpers.UpdateCommitteeCache(ctx, st, e); err != nil {
@@ -359,7 +352,9 @@ func (s *Service) updateEpochBoundaryCaches(ctx context.Context, st state.Beacon
if e > 0 {
e = e - 1
}
s.ForkChoicer().RLock()
target, err := s.cfg.ForkChoiceStore.TargetRootForEpoch(r, e)
s.ForkChoicer().RUnlock()
if err != nil {
log.WithError(err).Error("Could not update proposer index state-root map")
return nil
@@ -372,7 +367,7 @@ func (s *Service) updateEpochBoundaryCaches(ctx context.Context, st state.Beacon
}
// Epoch boundary tasks: it copies the headState and updates the epoch boundary
// caches.
// caches. The caller of this function must not hold a lock in forkchoice store.
func (s *Service) handleEpochBoundary(ctx context.Context, slot primitives.Slot, headState state.BeaconState, blockRoot []byte) error {
ctx, span := trace.StartSpan(ctx, "blockChain.handleEpochBoundary")
defer span.End()
@@ -912,8 +907,6 @@ func (s *Service) lateBlockTasks(ctx context.Context) {
if currentSlot == s.HeadSlot() {
return
}
s.cfg.ForkChoiceStore.RLock()
defer s.cfg.ForkChoiceStore.RUnlock()
// return early if we are in init sync
if !s.inRegularSync() {
return
@@ -926,14 +919,32 @@ func (s *Service) lateBlockTasks(ctx context.Context) {
if lastState == nil {
lastRoot, lastState = headRoot[:], headState
}
// Copy all the field tries in our cached state in the event of late
// blocks.
lastState.CopyAllTries()
if err := transition.UpdateNextSlotCache(ctx, lastRoot, lastState); err != nil {
log.WithError(err).Debug("Could not update next slot state cache")
}
if err := s.handleEpochBoundary(ctx, currentSlot, headState, headRoot[:]); err != nil {
log.WithError(err).Error("Could not update epoch boundary caches")
// Before Fulu we need to process the next slot to find out if we are proposing.
if lastState.Version() < version.Fulu {
// Copy all the field tries in our cached state in the event of late
// blocks.
lastState.CopyAllTries()
if err := transition.UpdateNextSlotCache(ctx, lastRoot, lastState); err != nil {
log.WithError(err).Debug("Could not update next slot state cache")
}
if err := s.handleEpochBoundary(ctx, currentSlot, headState, headRoot[:]); err != nil {
log.WithError(err).Error("Could not update epoch boundary caches")
}
} else {
// After Fulu, we can update the caches asynchronously after sending FCU to the engine
defer func() {
go func() {
ctx, cancel := context.WithTimeout(s.ctx, slotDeadline)
defer cancel()
lastState.CopyAllTries()
if err := transition.UpdateNextSlotCache(ctx, lastRoot, lastState); err != nil {
log.WithError(err).Debug("Could not update next slot state cache")
}
if err := s.handleEpochBoundary(ctx, currentSlot, headState, headRoot[:]); err != nil {
log.WithError(err).Error("Could not update epoch boundary caches")
}
}()
}()
}
// return early if we already started building a block for the current
// head root
@@ -963,6 +974,8 @@ func (s *Service) lateBlockTasks(ctx context.Context) {
headBlock: headBlock,
attributes: attribute,
}
s.cfg.ForkChoiceStore.Lock()
defer s.cfg.ForkChoiceStore.Unlock()
_, err = s.notifyForkchoiceUpdate(ctx, fcuArgs)
if err != nil {
log.WithError(err).Debug("could not perform late block tasks: failed to update forkchoice with engine")

View File

@@ -42,14 +42,8 @@ func (s *Service) getFCUArgs(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) er
if err := s.getFCUArgsEarlyBlock(cfg, fcuArgs); err != nil {
return err
}
if !s.inRegularSync() {
return nil
}
slot := cfg.roblock.Block().Slot()
if slots.WithinVotingWindow(s.genesisTime, slot) {
return nil
}
return s.computePayloadAttributes(cfg, fcuArgs)
fcuArgs.attributes = s.getPayloadAttribute(cfg.ctx, fcuArgs.headState, fcuArgs.proposingSlot, cfg.headRoot[:])
return nil
}
func (s *Service) getFCUArgsEarlyBlock(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) error {
@@ -173,26 +167,19 @@ func (s *Service) processLightClientUpdates(cfg *postBlockProcessConfig) {
// updateCachesPostBlockProcessing updates the next slot cache and handles the epoch
// boundary in order to compute the right proposer indices after processing
// state transition. This function is called on late blocks while still locked,
// before sending FCU to the engine.
func (s *Service) updateCachesPostBlockProcessing(cfg *postBlockProcessConfig) error {
// state transition. The caller of this function must not hold a lock in forkchoice store.
func (s *Service) updateCachesPostBlockProcessing(cfg *postBlockProcessConfig) {
slot := cfg.postState.Slot()
root := cfg.roblock.Root()
if err := transition.UpdateNextSlotCache(cfg.ctx, root[:], cfg.postState); err != nil {
return errors.Wrap(err, "could not update next slot state cache")
log.WithError(err).Error("Could not update next slot state cache")
return
}
if !slots.IsEpochEnd(slot) {
return nil
return
}
return s.handleEpochBoundary(cfg.ctx, slot, cfg.postState, root[:])
}
// handleSecondFCUCall handles a second call to FCU when syncing a new block.
// This is useful when proposing in the next block and we want to defer the
// computation of the next slot shuffling.
func (s *Service) handleSecondFCUCall(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) {
if (fcuArgs.attributes == nil || fcuArgs.attributes.IsEmpty()) && cfg.headRoot == cfg.roblock.Root() {
go s.sendFCUWithAttributes(cfg, fcuArgs)
if err := s.handleEpochBoundary(cfg.ctx, slot, cfg.postState, root[:]); err != nil {
log.WithError(err).Error("Could not handle epoch boundary")
}
}
@@ -202,20 +189,6 @@ func reportProcessingTime(startTime time.Time) {
onBlockProcessingTime.Observe(float64(time.Since(startTime).Milliseconds()))
}
// computePayloadAttributes modifies the passed FCU arguments to
// contain the right payload attributes with the tracked proposer. It gets
// called on blocks that arrive after the attestation voting window, or in a
// background routine after syncing early blocks.
func (s *Service) computePayloadAttributes(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) error {
if cfg.roblock.Root() == cfg.headRoot {
if err := s.updateCachesPostBlockProcessing(cfg); err != nil {
return err
}
}
fcuArgs.attributes = s.getPayloadAttribute(cfg.ctx, fcuArgs.headState, fcuArgs.proposingSlot, cfg.headRoot[:])
return nil
}
// getBlockPreState returns the pre state of an incoming block. It uses the parent root of the block
// to retrieve the state in DB. It verifies the pre state's validity and the incoming block
// is in the correct time window.

View File

@@ -738,7 +738,9 @@ func TestOnBlock_CanFinalize_WithOnTick(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, r, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, r)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true}))
service.cfg.ForkChoiceStore.Unlock()
require.NoError(t, service.updateJustificationOnBlock(ctx, preState, postState, currStoreJustifiedEpoch))
_, err = service.updateFinalizationOnBlock(ctx, preState, postState, currStoreFinalizedEpoch)
require.NoError(t, err)
@@ -788,7 +790,9 @@ func TestOnBlock_CanFinalize(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, r, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, r)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true}))
service.cfg.ForkChoiceStore.Unlock()
require.NoError(t, service.updateJustificationOnBlock(ctx, preState, postState, currStoreJustifiedEpoch))
_, err = service.updateFinalizationOnBlock(ctx, preState, postState, currStoreFinalizedEpoch)
require.NoError(t, err)
@@ -816,25 +820,9 @@ func TestOnBlock_NilBlock(t *testing.T) {
service, tr := minimalTestService(t)
signed := &consensusblocks.SignedBeaconBlock{}
roblock := consensusblocks.ROBlock{ReadOnlySignedBeaconBlock: signed}
service.cfg.ForkChoiceStore.Lock()
err := service.postBlockProcess(&postBlockProcessConfig{tr.ctx, roblock, [32]byte{}, nil, true})
require.Equal(t, true, IsInvalidBlock(err))
}
func TestOnBlock_InvalidSignature(t *testing.T) {
service, tr := minimalTestService(t)
ctx := tr.ctx
gs, keys := util.DeterministicGenesisState(t, 32)
require.NoError(t, service.saveGenesisData(ctx, gs))
blk, err := util.GenerateFullBlock(gs, keys, util.DefaultBlockGenConfig(), 1)
require.NoError(t, err)
blk.Signature = []byte{'a'} // Mutate the signature.
wsb, err := consensusblocks.NewSignedBeaconBlock(blk)
require.NoError(t, err)
preState, err := service.getBlockPreState(ctx, wsb.Block())
require.NoError(t, err)
_, err = service.validateStateTransition(ctx, preState, wsb)
service.cfg.ForkChoiceStore.Unlock()
require.Equal(t, true, IsInvalidBlock(err))
}
@@ -866,7 +854,9 @@ func TestOnBlock_CallNewPayloadAndForkchoiceUpdated(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, r, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, r)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Unlock()
testState, err = service.cfg.StateGen.StateByRoot(ctx, r)
require.NoError(t, err)
}
@@ -1339,7 +1329,9 @@ func TestOnBlock_ProcessBlocksParallel(t *testing.T) {
lock.Lock()
roblock, err := consensusblocks.NewROBlockWithRoot(wsb1, r1)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true}))
service.cfg.ForkChoiceStore.Unlock()
lock.Unlock()
wg.Done()
}()
@@ -1351,7 +1343,9 @@ func TestOnBlock_ProcessBlocksParallel(t *testing.T) {
lock.Lock()
roblock, err := consensusblocks.NewROBlockWithRoot(wsb2, r2)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true}))
service.cfg.ForkChoiceStore.Unlock()
lock.Unlock()
wg.Done()
}()
@@ -1363,7 +1357,9 @@ func TestOnBlock_ProcessBlocksParallel(t *testing.T) {
lock.Lock()
roblock, err := consensusblocks.NewROBlockWithRoot(wsb3, r3)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true}))
service.cfg.ForkChoiceStore.Unlock()
lock.Unlock()
wg.Done()
}()
@@ -1375,7 +1371,9 @@ func TestOnBlock_ProcessBlocksParallel(t *testing.T) {
lock.Lock()
roblock, err := consensusblocks.NewROBlockWithRoot(wsb4, r4)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true}))
service.cfg.ForkChoiceStore.Unlock()
lock.Unlock()
wg.Done()
}()
@@ -1400,197 +1398,6 @@ func Test_verifyBlkFinalizedSlot_invalidBlock(t *testing.T) {
require.Equal(t, true, IsInvalidBlock(err))
}
// See the description in #10777 and #10782 for the full setup
// We sync optimistically a chain of blocks. Block 17 is the last block in Epoch
// 2. Block 18 justifies block 12 (the first in Epoch 2) and Block 19 returns
// INVALID from FCU, with LVH block 17. No head is viable. We check
// that the node is optimistic and that we can actually import a block on top of
// 17 and recover.
func TestStore_NoViableHead_FCU(t *testing.T) {
params.SetupTestConfigCleanup(t)
config := params.BeaconConfig()
config.SlotsPerEpoch = 6
config.AltairForkEpoch = 1
config.BellatrixForkEpoch = 2
params.OverrideBeaconConfig(config)
mockEngine := &mockExecution.EngineClient{ErrNewPayload: execution.ErrAcceptedSyncingPayloadStatus, ErrForkchoiceUpdated: execution.ErrAcceptedSyncingPayloadStatus}
service, tr := minimalTestService(t, WithExecutionEngineCaller(mockEngine))
ctx := tr.ctx
st, keys := util.DeterministicGenesisState(t, 64)
stateRoot, err := st.HashTreeRoot(ctx)
require.NoError(t, err, "Could not hash genesis state")
require.NoError(t, service.saveGenesisData(ctx, st))
genesis := blocks.NewGenesisBlock(stateRoot[:])
wsb, err := consensusblocks.NewSignedBeaconBlock(genesis)
require.NoError(t, err)
require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wsb), "Could not save genesis block")
parentRoot, err := genesis.Block.HashTreeRoot()
require.NoError(t, err, "Could not get signing root")
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, st, parentRoot), "Could not save genesis state")
require.NoError(t, service.cfg.BeaconDB.SaveHeadBlockRoot(ctx, parentRoot), "Could not save genesis state")
for i := 1; i < 6; i++ {
driftGenesisTime(service, primitives.Slot(i), 0)
st, err := service.HeadState(ctx)
require.NoError(t, err)
b, err := util.GenerateFullBlock(st, keys, util.DefaultBlockGenConfig(), primitives.Slot(i))
require.NoError(t, err)
wsb, err := consensusblocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
root, err := b.Block.HashTreeRoot()
require.NoError(t, err)
preState, err := service.getBlockPreState(ctx, wsb.Block())
require.NoError(t, err)
postState, err := service.validateStateTransition(ctx, preState, wsb)
require.NoError(t, err)
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
}
for i := 6; i < 12; i++ {
driftGenesisTime(service, primitives.Slot(i), 0)
st, err := service.HeadState(ctx)
require.NoError(t, err)
b, err := util.GenerateFullBlockAltair(st, keys, util.DefaultBlockGenConfig(), primitives.Slot(i))
require.NoError(t, err)
wsb, err := consensusblocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
root, err := b.Block.HashTreeRoot()
require.NoError(t, err)
preState, err := service.getBlockPreState(ctx, wsb.Block())
require.NoError(t, err)
postState, err := service.validateStateTransition(ctx, preState, wsb)
require.NoError(t, err)
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
require.NoError(t, err)
}
for i := 12; i < 18; i++ {
driftGenesisTime(service, primitives.Slot(i), 0)
st, err := service.HeadState(ctx)
require.NoError(t, err)
b, err := util.GenerateFullBlockBellatrix(st, keys, util.DefaultBlockGenConfig(), primitives.Slot(i))
require.NoError(t, err)
wsb, err := consensusblocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
root, err := b.Block.HashTreeRoot()
require.NoError(t, err)
preState, err := service.getBlockPreState(ctx, wsb.Block())
require.NoError(t, err)
postState, err := service.validateStateTransition(ctx, preState, wsb)
require.NoError(t, err)
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
require.NoError(t, err)
}
// Check that we haven't justified the second epoch yet
jc := service.cfg.ForkChoiceStore.JustifiedCheckpoint()
require.Equal(t, primitives.Epoch(0), jc.Epoch)
// import a block that justifies the second epoch
driftGenesisTime(service, 18, 0)
validHeadState, err := service.HeadState(ctx)
require.NoError(t, err)
b, err := util.GenerateFullBlockBellatrix(validHeadState, keys, util.DefaultBlockGenConfig(), 18)
require.NoError(t, err)
wsb, err = consensusblocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
firstInvalidRoot, err := b.Block.HashTreeRoot()
require.NoError(t, err)
preState, err := service.getBlockPreState(ctx, wsb.Block())
require.NoError(t, err)
postState, err := service.validateStateTransition(ctx, preState, wsb)
require.NoError(t, err)
require.NoError(t, service.savePostStateInfo(ctx, firstInvalidRoot, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, firstInvalidRoot)
require.NoError(t, err)
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
require.NoError(t, err)
jc = service.cfg.ForkChoiceStore.JustifiedCheckpoint()
require.Equal(t, primitives.Epoch(2), jc.Epoch)
sjc := validHeadState.CurrentJustifiedCheckpoint()
require.Equal(t, primitives.Epoch(0), sjc.Epoch)
lvh := b.Block.Body.ExecutionPayload.ParentHash
// check our head
require.Equal(t, firstInvalidRoot, service.cfg.ForkChoiceStore.CachedHeadRoot())
// import another block to find out that it was invalid
mockEngine = &mockExecution.EngineClient{ErrNewPayload: execution.ErrAcceptedSyncingPayloadStatus, ErrForkchoiceUpdated: execution.ErrInvalidPayloadStatus, ForkChoiceUpdatedResp: lvh}
service.cfg.ExecutionEngineCaller = mockEngine
driftGenesisTime(service, 19, 0)
st, err = service.HeadState(ctx)
require.NoError(t, err)
b, err = util.GenerateFullBlockBellatrix(st, keys, util.DefaultBlockGenConfig(), 19)
require.NoError(t, err)
wsb, err = consensusblocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
root, err := b.Block.HashTreeRoot()
require.NoError(t, err)
preState, err = service.getBlockPreState(ctx, wsb.Block())
require.NoError(t, err)
postState, err = service.validateStateTransition(ctx, preState, wsb)
require.NoError(t, err)
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err = consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
require.ErrorContains(t, "received an INVALID payload from execution engine", err)
// Check that forkchoice's head is the last invalid block imported. The
// store's headroot is the previous head (since the invalid block did
// not finish importing) one and that the node is optimistic
require.Equal(t, root, service.cfg.ForkChoiceStore.CachedHeadRoot())
headRoot, err := service.HeadRoot(ctx)
require.NoError(t, err)
require.Equal(t, firstInvalidRoot, bytesutil.ToBytes32(headRoot))
optimistic, err := service.IsOptimistic(ctx)
require.NoError(t, err)
require.Equal(t, true, optimistic)
// import another block based on the last valid head state
mockEngine = &mockExecution.EngineClient{}
service.cfg.ExecutionEngineCaller = mockEngine
driftGenesisTime(service, 20, 0)
b, err = util.GenerateFullBlockBellatrix(validHeadState, keys, &util.BlockGenConfig{}, 20)
require.NoError(t, err)
wsb, err = consensusblocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
root, err = b.Block.HashTreeRoot()
require.NoError(t, err)
preState, err = service.getBlockPreState(ctx, wsb.Block())
require.NoError(t, err)
postState, err = service.validateStateTransition(ctx, preState, wsb)
require.NoError(t, err)
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err = consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true})
require.NoError(t, err)
// Check the newly imported block is head, it justified the right
// checkpoint and the node is no longer optimistic
require.Equal(t, root, service.cfg.ForkChoiceStore.CachedHeadRoot())
sjc = service.CurrentJustifiedCheckpt()
require.Equal(t, jc.Epoch, sjc.Epoch)
require.Equal(t, jc.Root, bytesutil.ToBytes32(sjc.Root))
optimistic, err = service.IsOptimistic(ctx)
require.NoError(t, err)
require.Equal(t, false, optimistic)
}
// See the description in #10777 and #10782 for the full setup
// We sync optimistically a chain of blocks. Block 17 is the last block in Epoch
// 2. Block 18 justifies block 12 (the first in Epoch 2) and Block 19 returns
@@ -1642,7 +1449,9 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Unlock()
}
for i := 6; i < 12; i++ {
@@ -1662,8 +1471,9 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Unlock()
}
for i := 12; i < 18; i++ {
@@ -1684,8 +1494,9 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Unlock()
}
// Check that we haven't justified the second epoch yet
jc := service.cfg.ForkChoiceStore.JustifiedCheckpoint()
@@ -1708,7 +1519,9 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, firstInvalidRoot, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, firstInvalidRoot)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
service.cfg.ForkChoiceStore.Unlock()
require.NoError(t, err)
jc = service.cfg.ForkChoiceStore.JustifiedCheckpoint()
require.Equal(t, primitives.Epoch(2), jc.Epoch)
@@ -1718,6 +1531,10 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) {
lvh := b.Block.Body.ExecutionPayload.ParentHash
// check our head
require.Equal(t, firstInvalidRoot, service.cfg.ForkChoiceStore.CachedHeadRoot())
isBlock18OptimisticAfterImport, err := service.IsOptimisticForRoot(ctx, firstInvalidRoot)
require.NoError(t, err)
require.Equal(t, true, isBlock18OptimisticAfterImport)
time.Sleep(20 * time.Millisecond) // wait for async forkchoice update to be processed
// import another block to find out that it was invalid
mockEngine = &mockExecution.EngineClient{ErrNewPayload: execution.ErrInvalidPayloadStatus, NewPayloadResp: lvh}
@@ -1768,7 +1585,9 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err = consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true})
service.cfg.ForkChoiceStore.Unlock()
require.NoError(t, err)
// Check the newly imported block is head, it justified the right
// checkpoint and the node is no longer optimistic
@@ -1835,7 +1654,9 @@ func TestStore_NoViableHead_Liveness(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Unlock()
}
for i := 6; i < 12; i++ {
@@ -1856,8 +1677,9 @@ func TestStore_NoViableHead_Liveness(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Unlock()
}
// import the merge block
@@ -1877,7 +1699,9 @@ func TestStore_NoViableHead_Liveness(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, lastValidRoot, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, lastValidRoot)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
service.cfg.ForkChoiceStore.Unlock()
require.NoError(t, err)
// save the post state and the payload Hash of this block since it will
// be the LVH
@@ -1906,8 +1730,9 @@ func TestStore_NoViableHead_Liveness(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, invalidRoots[i-13], wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, invalidRoots[i-13])
require.NoError(t, err)
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Unlock()
}
// Check that we have justified the second epoch
jc := service.cfg.ForkChoiceStore.JustifiedCheckpoint()
@@ -1975,7 +1800,9 @@ func TestStore_NoViableHead_Liveness(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err = consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true}))
service.cfg.ForkChoiceStore.Unlock()
// Check that the head is still INVALID and the node is still optimistic
require.Equal(t, invalidHeadRoot, service.cfg.ForkChoiceStore.CachedHeadRoot())
optimistic, err = service.IsOptimistic(ctx)
@@ -2000,7 +1827,9 @@ func TestStore_NoViableHead_Liveness(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true})
service.cfg.ForkChoiceStore.Unlock()
require.NoError(t, err)
st, err = service.cfg.StateGen.StateByRoot(ctx, root)
require.NoError(t, err)
@@ -2028,7 +1857,9 @@ func TestStore_NoViableHead_Liveness(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err = consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, true})
service.cfg.ForkChoiceStore.Unlock()
require.NoError(t, err)
require.Equal(t, root, service.cfg.ForkChoiceStore.CachedHeadRoot())
sjc = service.CurrentJustifiedCheckpt()
@@ -2072,7 +1903,6 @@ func TestNoViableHead_Reboot(t *testing.T) {
require.NoError(t, service.cfg.BeaconDB.SaveGenesisBlockRoot(ctx, genesisRoot), "Could not save genesis state")
for i := 1; i < 6; i++ {
t.Log(i)
driftGenesisTime(service, primitives.Slot(i), 0)
st, err := service.HeadState(ctx)
require.NoError(t, err)
@@ -2089,7 +1919,9 @@ func TestNoViableHead_Reboot(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Unlock()
}
for i := 6; i < 12; i++ {
@@ -2109,8 +1941,9 @@ func TestNoViableHead_Reboot(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Unlock()
}
// import the merge block
@@ -2130,7 +1963,9 @@ func TestNoViableHead_Reboot(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, lastValidRoot, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, lastValidRoot)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
service.cfg.ForkChoiceStore.Unlock()
require.NoError(t, err)
// save the post state and the payload Hash of this block since it will
// be the LVH
@@ -2161,7 +1996,9 @@ func TestNoViableHead_Reboot(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Unlock()
require.NoError(t, service.updateJustificationOnBlock(ctx, preState, postState, currStoreJustifiedEpoch))
_, err = service.updateFinalizationOnBlock(ctx, preState, postState, currStoreFinalizedEpoch)
require.NoError(t, err)
@@ -2282,7 +2119,9 @@ func TestOnBlock_HandleBlockAttestations(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Unlock()
st, err = service.HeadState(ctx)
require.NoError(t, err)
@@ -2348,7 +2187,9 @@ func TestOnBlock_HandleBlockAttestations(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Unlock()
st, err = service.HeadState(ctx)
require.NoError(t, err)
@@ -2631,7 +2472,10 @@ func TestRollbackBlock(t *testing.T) {
require.NoError(t, err)
// Rollback block insertion into db and caches.
require.ErrorContains(t, fmt.Sprintf("could not insert block %d to fork choice store", roblock.Block().Slot()), service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Lock()
err = service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false})
service.cfg.ForkChoiceStore.Unlock()
require.ErrorContains(t, fmt.Sprintf("could not insert block %d to fork choice store", roblock.Block().Slot()), err)
// The block should no longer exist.
require.Equal(t, false, service.cfg.BeaconDB.HasBlock(ctx, root))
@@ -2732,7 +2576,9 @@ func TestRollbackBlock_ContextDeadline(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Unlock()
b, err = util.GenerateFullBlock(postState, keys, util.DefaultBlockGenConfig(), 34)
require.NoError(t, err)
@@ -2766,7 +2612,10 @@ func TestRollbackBlock_ContextDeadline(t *testing.T) {
require.NoError(t, postState.SetFinalizedCheckpoint(cj))
// Rollback block insertion into db and caches.
require.ErrorContains(t, "context canceled", service.postBlockProcess(&postBlockProcessConfig{cancCtx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Lock()
err = service.postBlockProcess(&postBlockProcessConfig{cancCtx, roblock, [32]byte{}, postState, false})
service.cfg.ForkChoiceStore.Unlock()
require.ErrorContains(t, "context canceled", err)
// The block should no longer exist.
require.Equal(t, false, service.cfg.BeaconDB.HasBlock(ctx, root))
@@ -3262,7 +3111,9 @@ func Test_postBlockProcess_EventSending(t *testing.T) {
}
// Execute postBlockProcess
service.cfg.ForkChoiceStore.Lock()
err = service.postBlockProcess(cfg)
service.cfg.ForkChoiceStore.Unlock()
// Check error expectation
if tt.expectError {

View File

@@ -156,13 +156,15 @@ func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot)
}
if s.inRegularSync() {
fcuArgs.attributes = s.getPayloadAttribute(ctx, headState, proposingSlot, newHeadRoot[:])
if fcuArgs.attributes != nil && s.shouldOverrideFCU(newHeadRoot, proposingSlot) {
return
}
go s.forkchoiceUpdateWithExecution(s.ctx, fcuArgs)
}
if fcuArgs.attributes != nil && s.shouldOverrideFCU(newHeadRoot, proposingSlot) {
return
}
if err := s.forkchoiceUpdateWithExecution(s.ctx, fcuArgs); err != nil {
log.WithError(err).Error("Could not update forkchoice")
if err := s.saveHead(s.ctx, fcuArgs.headRoot, fcuArgs.headBlock, fcuArgs.headState); err != nil {
log.WithError(err).Error("Could not save head")
}
s.pruneAttsFromPool(s.ctx, fcuArgs.headState, fcuArgs.headBlock)
}
// This processes fork choice attestations from the pool to account for validator votes and fork choice.

View File

@@ -117,7 +117,9 @@ func TestService_ProcessAttestationsAndUpdateHead(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, tRoot, wsb, postState))
roblock, err := blocks.NewROBlockWithRoot(wsb, tRoot)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Unlock()
copied, err = service.cfg.StateGen.StateByRoot(ctx, tRoot)
require.NoError(t, err)
require.Equal(t, 2, fcs.NodeCount())
@@ -177,7 +179,9 @@ func TestService_UpdateHead_NoAtts(t *testing.T) {
require.NoError(t, service.savePostStateInfo(ctx, tRoot, wsb, postState))
roblock, err := blocks.NewROBlockWithRoot(wsb, tRoot)
require.NoError(t, err)
service.cfg.ForkChoiceStore.Lock()
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))
service.cfg.ForkChoiceStore.Unlock()
require.Equal(t, 2, fcs.NodeCount())
require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wsb))
require.Equal(t, tRoot, service.head.root)

View File

@@ -89,7 +89,7 @@ func (mb *mockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
return nil
}
func (mb *mockBroadcaster) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn, _ []blocks.PartialDataColumn) error {
func (mb *mockBroadcaster) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn) error {
mb.broadcastCalled = true
return nil
}

View File

@@ -290,52 +290,3 @@ func TestProcessBlockHeader_OK(t *testing.T) {
}
assert.Equal(t, true, proto.Equal(nsh, expected), "Expected %v, received %v", expected, nsh)
}
func TestBlockSignatureSet_OK(t *testing.T) {
validators := make([]*ethpb.Validator, params.BeaconConfig().MinGenesisActiveValidatorCount)
for i := range validators {
validators[i] = &ethpb.Validator{
PublicKey: make([]byte, 32),
WithdrawalCredentials: make([]byte, 32),
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
Slashed: true,
}
}
state, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, state.SetValidators(validators))
require.NoError(t, state.SetSlot(10))
require.NoError(t, state.SetLatestBlockHeader(util.HydrateBeaconHeader(&ethpb.BeaconBlockHeader{
Slot: 9,
ProposerIndex: 0,
})))
latestBlockSignedRoot, err := state.LatestBlockHeader().HashTreeRoot()
require.NoError(t, err)
currentEpoch := time.CurrentEpoch(state)
priv, err := bls.RandKey()
require.NoError(t, err)
pID, err := helpers.BeaconProposerIndex(t.Context(), state)
require.NoError(t, err)
block := util.NewBeaconBlock()
block.Block.Slot = 10
block.Block.ProposerIndex = pID
block.Block.Body.RandaoReveal = bytesutil.PadTo([]byte{'A', 'B', 'C'}, 96)
block.Block.ParentRoot = latestBlockSignedRoot[:]
block.Signature, err = signing.ComputeDomainAndSign(state, currentEpoch, block.Block, params.BeaconConfig().DomainBeaconProposer, priv)
require.NoError(t, err)
proposerIdx, err := helpers.BeaconProposerIndex(t.Context(), state)
require.NoError(t, err)
validators[proposerIdx].Slashed = false
validators[proposerIdx].PublicKey = priv.PublicKey().Marshal()
err = state.UpdateValidatorAtIndex(proposerIdx, validators[proposerIdx])
require.NoError(t, err)
set, err := blocks.BlockSignatureBatch(state, block.Block.ProposerIndex, block.Signature, block.Block.HashTreeRoot)
require.NoError(t, err)
verified, err := set.Verify()
require.NoError(t, err)
assert.Equal(t, true, verified, "Block signature set returned a set which was unable to be verified")
}

View File

@@ -122,24 +122,6 @@ func VerifyBlockSignatureUsingCurrentFork(beaconState state.ReadOnlyBeaconState,
return nil
}
// BlockSignatureBatch retrieves the block signature batch from the provided block and its corresponding state.
func BlockSignatureBatch(beaconState state.ReadOnlyBeaconState,
proposerIndex primitives.ValidatorIndex,
sig []byte,
rootFunc func() ([32]byte, error)) (*bls.SignatureBatch, error) {
currentEpoch := slots.ToEpoch(beaconState.Slot())
domain, err := signing.Domain(beaconState.Fork(), currentEpoch, params.BeaconConfig().DomainBeaconProposer, beaconState.GenesisValidatorsRoot())
if err != nil {
return nil, err
}
proposer, err := beaconState.ValidatorAtIndex(proposerIndex)
if err != nil {
return nil, err
}
proposerPubKey := proposer.PublicKey
return signing.BlockSignatureBatch(proposerPubKey, sig, domain, rootFunc)
}
// RandaoSignatureBatch retrieves the relevant randao specific signature batch object
// from a block and its corresponding state.
func RandaoSignatureBatch(

View File

@@ -278,12 +278,12 @@ func ProcessConsolidationRequests(ctx context.Context, st state.BeaconState, req
if uint64(curEpoch) < e {
continue
}
bal, err := st.PendingBalanceToWithdraw(srcIdx)
hasBal, err := st.HasPendingBalanceToWithdraw(srcIdx)
if err != nil {
log.WithError(err).Error("Failed to fetch pending balance to withdraw")
continue
}
if bal > 0 {
if hasBal {
continue
}

View File

@@ -33,7 +33,6 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

View File

@@ -1,15 +1,11 @@
package peerdas
import (
stderrors "errors"
"iter"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/container/trie"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/pkg/errors"
)
@@ -20,7 +16,6 @@ var (
ErrIndexTooLarge = errors.New("column index is larger than the specified columns count")
ErrNoKzgCommitments = errors.New("no KZG commitments found")
ErrMismatchLength = errors.New("mismatch in the length of the column, commitments or proofs")
ErrEmptySegment = errors.New("empty segment in batch")
ErrInvalidKZGProof = errors.New("invalid KZG proof")
ErrBadRootLength = errors.New("bad root length")
ErrInvalidInclusionProof = errors.New("invalid inclusion proof")
@@ -62,136 +57,62 @@ func VerifyDataColumnSidecar(sidecar blocks.RODataColumn) error {
return nil
}
// CellProofBundleSegment is returned when a batch fails. The caller can call
// the `.Verify` method to verify just this segment.
type CellProofBundleSegment struct {
indices []uint64 // column index i.e in [0-127] for each cell
commitments []kzg.Bytes48 // KZG commitment for the blob that contains the cell that needs to be verified
cells []kzg.Cell // actual cell data for each cell (2KB each)
proofs []kzg.Bytes48 // Cell proof for each cell
// Note: All the above arrays are of the same length.
}
// Verify verifies this segment without batching.
func (s CellProofBundleSegment) Verify() error {
if len(s.cells) == 0 {
return ErrEmptySegment
}
verified, err := kzg.VerifyCellKZGProofBatch(s.commitments, s.indices, s.cells, s.proofs)
if err != nil {
return stderrors.Join(err, ErrInvalidKZGProof)
}
if !verified {
return ErrInvalidKZGProof
}
return nil
}
func VerifyDataColumnsCellsKZGProofs(sizeHint int, cellProofsIter iter.Seq[blocks.CellProofBundle]) error {
// The BatchVerifyDataColumnsCellsKZGProofs call also returns the failed segments if beatch verification fails
// so we can verify each segment seperately to identify the failed cell ("BatchVerifyDataColumnsCellsKZGProofs" is all or nothing).
// But we ignore the failed segment list since we are just passing in one segment.
_, err := BatchVerifyDataColumnsCellsKZGProofs(sizeHint, []iter.Seq[blocks.CellProofBundle]{cellProofsIter})
return err
}
// BatchVerifyDataColumnsCellsKZGProofs verifies if the KZG proofs are correct.
// VerifyDataColumnsSidecarKZGProofs verifies if the KZG proofs are correct.
// Note: We are slightly deviating from the specification here:
// The specification verifies the KZG proofs for each sidecar separately,
// while we are verifying all the KZG proofs from multiple sidecars in a batch.
// This is done to improve performance since the internal KZG library is way more
// efficient when verifying in batch. If the batch fails, the failed segments
// are returned to the caller so that they may try segment by segment without
// batching. On success the failed segment list is empty.
//
// efficient when verifying in batch.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#verify_data_column_sidecar_kzg_proofs
// sizeHint is the total number of cells to verify across all sidecars.
func BatchVerifyDataColumnsCellsKZGProofs(sizeHint int, cellProofsIters []iter.Seq[blocks.CellProofBundle]) ( /* failed segment list */ []CellProofBundleSegment, error) {
commitments := make([]kzg.Bytes48, 0, sizeHint)
indices := make([]uint64, 0, sizeHint)
cells := make([]kzg.Cell, 0, sizeHint)
proofs := make([]kzg.Bytes48, 0, sizeHint)
func VerifyDataColumnsSidecarKZGProofs(sidecars []blocks.RODataColumn) error {
// Compute the total count.
count := 0
for _, sidecar := range sidecars {
count += len(sidecar.Column)
}
var anySegmentEmpty bool
var segments []CellProofBundleSegment
for _, cellProofsIter := range cellProofsIters {
startIdx := len(cells) // starts at 0, and increments by the number of cells in the current segment
for bundle := range cellProofsIter {
commitments := make([]kzg.Bytes48, 0, count)
indices := make([]uint64, 0, count)
cells := make([]kzg.Cell, 0, count)
proofs := make([]kzg.Bytes48, 0, count)
for _, sidecar := range sidecars {
for i := range sidecar.Column {
var (
commitment kzg.Bytes48
cell kzg.Cell
proof kzg.Bytes48
)
// sanity check that each commitment is 48 bytes, each cell is 2KB and each proof is 48 bytes.
if len(bundle.Commitment) != len(commitment) ||
len(bundle.Cell) != len(cell) ||
len(bundle.Proof) != len(proof) {
return nil, ErrMismatchLength
commitmentBytes := sidecar.KzgCommitments[i]
cellBytes := sidecar.Column[i]
proofBytes := sidecar.KzgProofs[i]
if len(commitmentBytes) != len(commitment) ||
len(cellBytes) != len(cell) ||
len(proofBytes) != len(proof) {
return ErrMismatchLength
}
copy(commitment[:], bundle.Commitment)
copy(cell[:], bundle.Cell)
copy(proof[:], bundle.Proof)
copy(commitment[:], commitmentBytes)
copy(cell[:], cellBytes)
copy(proof[:], proofBytes)
commitments = append(commitments, commitment)
indices = append(indices, bundle.ColumnIndex)
indices = append(indices, sidecar.Index)
cells = append(cells, cell)
proofs = append(proofs, proof)
}
if len(cells[startIdx:]) == 0 {
// this basically means that the current segment i.e. the current "bundle" we're
// iterating over is empty because "startIdx" did not increment after the last iteration.
anySegmentEmpty = true
}
segments = append(segments, CellProofBundleSegment{
indices: indices[startIdx:],
commitments: commitments[startIdx:],
cells: cells[startIdx:],
proofs: proofs[startIdx:],
})
}
if anySegmentEmpty {
// if any segment is empty, we don't verify anything as it's a malformed batch and we return all
// the segments here so caller can verify each segment seperately.
return segments, ErrEmptySegment
}
// Batch verify that the cells match the corresponding commitments and proofs.
verified, err := kzg.VerifyCellKZGProofBatch(commitments, indices, cells, proofs)
if err != nil {
return segments, stderrors.Join(err, ErrInvalidKZGProof)
return errors.Wrap(err, "verify cell KZG proof batch")
}
if !verified {
return segments, ErrInvalidKZGProof
}
return nil, nil
}
// verifyKzgCommitmentsInclusionProof is the shared implementation for inclusion proof verification.
func verifyKzgCommitmentsInclusionProof(bodyRoot []byte, kzgCommitments [][]byte, inclusionProof [][]byte) error {
if len(bodyRoot) != fieldparams.RootLength {
return ErrBadRootLength
}
leaves := blocks.LeavesFromCommitments(kzgCommitments)
sparse, err := trie.GenerateTrieFromItems(leaves, fieldparams.LogMaxBlobCommitments)
if err != nil {
return errors.Wrap(err, "generate trie from items")
}
hashTreeRoot, err := sparse.HashTreeRoot()
if err != nil {
return errors.Wrap(err, "hash tree root")
}
verified := trie.VerifyMerkleProof(bodyRoot, hashTreeRoot[:], kzgPosition, inclusionProof)
if !verified {
return ErrInvalidInclusionProof
return ErrInvalidKZGProof
}
return nil
@@ -203,23 +124,30 @@ func VerifyDataColumnSidecarInclusionProof(sidecar blocks.RODataColumn) error {
if sidecar.SignedBlockHeader == nil || sidecar.SignedBlockHeader.Header == nil {
return ErrNilBlockHeader
}
return verifyKzgCommitmentsInclusionProof(
sidecar.SignedBlockHeader.Header.BodyRoot,
sidecar.KzgCommitments,
sidecar.KzgCommitmentsInclusionProof,
)
}
// VerifyPartialDataColumnHeaderInclusionProof verifies if the KZG commitments are included in the beacon block.
func VerifyPartialDataColumnHeaderInclusionProof(header *ethpb.PartialDataColumnHeader) error {
if header.SignedBlockHeader == nil || header.SignedBlockHeader.Header == nil {
return ErrNilBlockHeader
root := sidecar.SignedBlockHeader.Header.BodyRoot
if len(root) != fieldparams.RootLength {
return ErrBadRootLength
}
return verifyKzgCommitmentsInclusionProof(
header.SignedBlockHeader.Header.BodyRoot,
header.KzgCommitments,
header.KzgCommitmentsInclusionProof,
)
leaves := blocks.LeavesFromCommitments(sidecar.KzgCommitments)
sparse, err := trie.GenerateTrieFromItems(leaves, fieldparams.LogMaxBlobCommitments)
if err != nil {
return errors.Wrap(err, "generate trie from items")
}
hashTreeRoot, err := sparse.HashTreeRoot()
if err != nil {
return errors.Wrap(err, "hash tree root")
}
verified := trie.VerifyMerkleProof(root, hashTreeRoot[:], kzgPosition, sidecar.KzgCommitmentsInclusionProof)
if !verified {
return ErrInvalidInclusionProof
}
return nil
}
// ComputeSubnetForDataColumnSidecar computes the subnet for a data column sidecar.

View File

@@ -3,7 +3,6 @@ package peerdas_test
import (
"crypto/rand"
"fmt"
"iter"
"testing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
@@ -73,7 +72,7 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
sidecars := generateRandomSidecars(t, seed, blobCount)
sidecars[0].Column[0] = sidecars[0].Column[0][:len(sidecars[0].Column[0])-1] // Remove one byte to create size mismatch
err := peerdas.VerifyDataColumnsCellsKZGProofs(0, blocks.RODataColumnsToCellProofBundles(sidecars))
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
require.ErrorIs(t, err, peerdas.ErrMismatchLength)
})
@@ -81,15 +80,14 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
sidecars := generateRandomSidecars(t, seed, blobCount)
sidecars[0].Column[0][0]++ // It is OK to overflow
err := peerdas.VerifyDataColumnsCellsKZGProofs(0, blocks.RODataColumnsToCellProofBundles(sidecars))
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
require.ErrorIs(t, err, peerdas.ErrInvalidKZGProof)
})
t.Run("nominal", func(t *testing.T) {
sidecars := generateRandomSidecars(t, seed, blobCount)
failedSegments, err := peerdas.BatchVerifyDataColumnsCellsKZGProofs(blobCount, []iter.Seq[blocks.CellProofBundle]{blocks.RODataColumnsToCellProofBundles(sidecars)})
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
require.NoError(t, err)
require.Equal(t, 0, len(failedSegments))
})
}
@@ -275,7 +273,7 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_SameCommitments_NoBatch(b *testin
for _, sidecar := range sidecars {
sidecars := []blocks.RODataColumn{sidecar}
b.StartTimer()
err := peerdas.VerifyDataColumnsCellsKZGProofs(0, blocks.RODataColumnsToCellProofBundles(sidecars))
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
b.StopTimer()
require.NoError(b, err)
}
@@ -310,7 +308,7 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_DiffCommitments_Batch(b *testing.
}
b.StartTimer()
err := peerdas.VerifyDataColumnsCellsKZGProofs(0, blocks.RODataColumnsToCellProofBundles(allSidecars))
err := peerdas.VerifyDataColumnsSidecarKZGProofs(allSidecars)
b.StopTimer()
require.NoError(b, err)
}
@@ -343,7 +341,7 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_DiffCommitments_Batch4(b *testing
for _, sidecars := range allSidecars {
b.StartTimer()
err := peerdas.VerifyDataColumnsCellsKZGProofs(len(allSidecars), blocks.RODataColumnsToCellProofBundles(sidecars))
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
b.StopTimer()
require.NoError(b, err)
}

View File

@@ -5,7 +5,6 @@ import (
"sync"
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
@@ -340,10 +339,7 @@ func ComputeCellsAndProofsFromFlat(blobs [][]byte, cellProofs [][]byte) ([][]kzg
}
// ComputeCellsAndProofsFromStructured computes the cells and proofs from blobs and cell proofs.
// commitmentCount (i.e. number of blobs) is required to return the correct sized bitlist even if we see a nil slice of blobsAndProofs.
// Returns the bitlist of which blobs are present, the cells for each blob that is present, and the proofs for each cell in each blob that is present.
// The returned cells and proofs are compacted and will not contain entries for missing blobs.
func ComputeCellsAndProofsFromStructured(commitmentCount uint64, blobsAndProofs []*pb.BlobAndProofV2) (bitfield.Bitlist /* parts included */, [][]kzg.Cell, [][]kzg.Proof, error) {
func ComputeCellsAndProofsFromStructured(blobsAndProofs []*pb.BlobAndProofV2) ([][]kzg.Cell, [][]kzg.Proof, error) {
start := time.Now()
defer func() {
cellsAndProofsFromStructuredComputationTime.Observe(float64(time.Since(start).Milliseconds()))
@@ -351,27 +347,16 @@ func ComputeCellsAndProofsFromStructured(commitmentCount uint64, blobsAndProofs
var wg errgroup.Group
var blobsPresent int
for _, blobAndProof := range blobsAndProofs {
if blobAndProof != nil {
blobsPresent++
}
}
cellsPerBlob := make([][]kzg.Cell, blobsPresent) // array of cells for each blob
proofsPerBlob := make([][]kzg.Proof, blobsPresent) // array of proofs for each blob
included := bitfield.NewBitlist(commitmentCount) // bitlist of which blobs are present
cellsPerBlob := make([][]kzg.Cell, len(blobsAndProofs))
proofsPerBlob := make([][]kzg.Proof, len(blobsAndProofs))
var j int
for i, blobAndProof := range blobsAndProofs {
if blobAndProof == nil {
continue
return nil, nil, ErrNilBlobAndProof
}
included.SetBitAt(uint64(i), true) // blob at index i is present
compactIndex := j // compact index is the index of the blob in the returned arrays after accounting for nil/missing blocks.
wg.Go(func() error {
var kzgBlob kzg.Blob
// the number of copied bytes should be equal to the expected size of a blob.
if copy(kzgBlob[:], blobAndProof.Blob) != len(kzgBlob) {
return errors.New("wrong blob size - should never happen")
}
@@ -396,18 +381,17 @@ func ComputeCellsAndProofsFromStructured(commitmentCount uint64, blobsAndProofs
kzgProofs = append(kzgProofs, kzgProof)
}
cellsPerBlob[compactIndex] = cells
proofsPerBlob[compactIndex] = kzgProofs
cellsPerBlob[i] = cells
proofsPerBlob[i] = kzgProofs
return nil
})
j++
}
if err := wg.Wait(); err != nil {
return nil, nil, nil, err
return nil, nil, err
}
return included, cellsPerBlob, proofsPerBlob, nil
return cellsPerBlob, proofsPerBlob, nil
}
// ReconstructBlobs reconstructs blobs from data column sidecars without computing KZG proofs or creating sidecars.

View File

@@ -479,9 +479,8 @@ func TestComputeCellsAndProofsFromFlat(t *testing.T) {
func TestComputeCellsAndProofsFromStructured(t *testing.T) {
t.Run("nil blob and proof", func(t *testing.T) {
included, _, _, err := peerdas.ComputeCellsAndProofsFromStructured(0, []*pb.BlobAndProofV2{nil})
require.NoError(t, err)
require.Equal(t, uint64(0), included.Count())
_, _, err := peerdas.ComputeCellsAndProofsFromStructured([]*pb.BlobAndProofV2{nil})
require.ErrorIs(t, err, peerdas.ErrNilBlobAndProof)
})
t.Run("nominal", func(t *testing.T) {
@@ -534,8 +533,7 @@ func TestComputeCellsAndProofsFromStructured(t *testing.T) {
require.NoError(t, err)
// Test ComputeCellsAndProofs
included, actualCellsPerBlob, actualProofsPerBlob, err := peerdas.ComputeCellsAndProofsFromStructured(uint64(len(blobsAndProofs)), blobsAndProofs)
require.Equal(t, included.Count(), uint64(len(actualCellsPerBlob)))
actualCellsPerBlob, actualProofsPerBlob, err := peerdas.ComputeCellsAndProofsFromStructured(blobsAndProofs)
require.NoError(t, err)
require.Equal(t, blobCount, len(actualCellsPerBlob))

View File

@@ -3,7 +3,6 @@ package peerdas
import (
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
beaconState "github.com/OffchainLabs/prysm/v7/beacon-chain/state"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
@@ -144,51 +143,6 @@ func DataColumnSidecars(cellsPerBlob [][]kzg.Cell, proofsPerBlob [][]kzg.Proof,
return roSidecars, nil
}
// Note: cellsPerBlob
// We should also assert that "included.Count() (i.e. number of bits set) == uint64(len(cellsPerBlob))" otherwise "cells[idx] = cells[idx][1:]" below will be out of bounds.
func PartialColumns(included bitfield.Bitlist, cellsPerBlob [][]kzg.Cell, proofsPerBlob [][]kzg.Proof, src ConstructionPopulator) ([]blocks.PartialDataColumn, error) {
start := time.Now()
const numberOfColumns = uint64(fieldparams.NumberOfColumns)
// rotate the cells and proofs from being per blob(i.e. "row indexed") to being per column ("column indexed").
// The returned arrays are of size "numberOfColumns" in length and the "included" bitfield is used to filter out cells that are not present in each column.
cells, proofs, err := rotateRowsToCols(cellsPerBlob, proofsPerBlob, numberOfColumns)
if err != nil {
return nil, errors.Wrap(err, "rotate cells and proofs")
}
info, err := src.extract()
if err != nil {
return nil, errors.Wrap(err, "extract block info")
}
dataColumns := make([]blocks.PartialDataColumn, 0, numberOfColumns)
for idx := range numberOfColumns {
dc, err := blocks.NewPartialDataColumn(info.signedBlockHeader, idx, info.kzgCommitments, info.kzgInclusionProof)
if err != nil {
return nil, errors.Wrap(err, "new ro data column")
}
// info.kzgCommitments is the array of KZG commitments for each blob in the block so it's length is the number of blobs in the block.
// The included bitlist is the bitlist of which blobs are present i.e. which are the blobs we have the blob data for.
// we're now iterating over each row and extending the column one cell at a time for each blob that is present.
for i := range len(info.kzgCommitments) {
if !included.BitAt(uint64(i)) {
continue
}
// TODO: Check for "out of bounds" here. How do we know that cells always have upto "numberOfColumns" entries?
// Okay, this probably works because "rotateRowsToCols" above allocates "numberOfColumns" entries for each cell and proof.
// The "included" bitlist is used to filter out cells that are not present in each column so we only set the cells that are present in
// this call to "ExtendFromVerfifiedCell".
dc.ExtendFromVerfifiedCell(uint64(i), cells[idx][0], proofs[idx][0])
cells[idx] = cells[idx][1:]
proofs[idx] = proofs[idx][1:]
}
dataColumns = append(dataColumns, dc)
}
dataColumnComputationTime.Observe(float64(time.Since(start).Milliseconds()))
return dataColumns, nil
}
// Slot returns the slot of the source
func (s *BlockReconstructionSource) Slot() primitives.Slot {
return s.Block().Slot()

View File

@@ -182,12 +182,6 @@ func ProcessBlockNoVerifyAnySig(
return nil, nil, err
}
sig := signed.Signature()
bSet, err := b.BlockSignatureBatch(st, blk.ProposerIndex(), sig[:], blk.HashTreeRoot)
if err != nil {
tracing.AnnotateError(span, err)
return nil, nil, errors.Wrap(err, "could not retrieve block signature set")
}
randaoReveal := signed.Block().Body().RandaoReveal()
rSet, err := b.RandaoSignatureBatch(ctx, st, randaoReveal[:])
if err != nil {
@@ -201,7 +195,7 @@ func ProcessBlockNoVerifyAnySig(
// Merge beacon block, randao and attestations signatures into a set.
set := bls.NewSet()
set.Join(bSet).Join(rSet).Join(aSet)
set.Join(rSet).Join(aSet)
if blk.Version() >= version.Capella {
changes, err := signed.Block().Body().BLSToExecutionChanges()

View File

@@ -157,9 +157,8 @@ func TestProcessBlockNoVerify_SigSetContainsDescriptions(t *testing.T) {
set, _, err := transition.ProcessBlockNoVerifyAnySig(t.Context(), beaconState, wsb)
require.NoError(t, err)
assert.Equal(t, len(set.Signatures), len(set.Descriptions), "Signatures and descriptions do not match up")
assert.Equal(t, "block signature", set.Descriptions[0])
assert.Equal(t, "randao signature", set.Descriptions[1])
assert.Equal(t, "attestation signature", set.Descriptions[2])
assert.Equal(t, "randao signature", set.Descriptions[0])
assert.Equal(t, "attestation signature", set.Descriptions[1])
}
func TestProcessOperationsNoVerifyAttsSigs_OK(t *testing.T) {

View File

@@ -67,9 +67,9 @@ func NewSyncNeeds(current CurrentSlotter, oldestSlotFlagPtr *primitives.Slot, bl
// Override spec minimum block retention with user-provided flag only if it is lower than the spec minimum.
sn.blockRetention = primitives.Epoch(params.BeaconConfig().MinEpochsForBlockRequests)
if oldestSlotFlagPtr != nil {
oldestEpoch := slots.ToEpoch(*oldestSlotFlagPtr)
if oldestEpoch < sn.blockRetention {
if *oldestSlotFlagPtr <= syncEpochOffset(current(), sn.blockRetention) {
sn.validOldestSlotPtr = oldestSlotFlagPtr
} else {
log.WithField("backfill-oldest-slot", *oldestSlotFlagPtr).

View File

@@ -128,6 +128,9 @@ func TestSyncNeedsInitialize(t *testing.T) {
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
minBlobEpochs := params.BeaconConfig().MinEpochsForBlobsSidecarsRequest
minColEpochs := params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest
denebSlot := slots.UnsafeEpochStart(params.BeaconConfig().DenebForkEpoch)
fuluSlot := slots.UnsafeEpochStart(params.BeaconConfig().FuluForkEpoch)
minSlots := slots.UnsafeEpochStart(primitives.Epoch(params.BeaconConfig().MinEpochsForBlockRequests))
currentSlot := primitives.Slot(10000)
currentFunc := func() primitives.Slot { return currentSlot }
@@ -141,6 +144,7 @@ func TestSyncNeedsInitialize(t *testing.T) {
expectedCol primitives.Epoch
name string
input SyncNeeds
current func() primitives.Slot
}{
{
name: "basic initialization with no flags",
@@ -174,13 +178,13 @@ func TestSyncNeedsInitialize(t *testing.T) {
{
name: "valid oldestSlotFlagPtr (earlier than spec minimum)",
blobRetentionFlag: 0,
oldestSlotFlagPtr: func() *primitives.Slot {
slot := primitives.Slot(10)
return &slot
}(),
oldestSlotFlagPtr: &denebSlot,
expectValidOldest: true,
expectedBlob: minBlobEpochs,
expectedCol: minColEpochs,
current: func() primitives.Slot {
return fuluSlot + minSlots
},
},
{
name: "invalid oldestSlotFlagPtr (later than spec minimum)",
@@ -210,6 +214,9 @@ func TestSyncNeedsInitialize(t *testing.T) {
{
name: "both blob retention flag and oldest slot set",
blobRetentionFlag: minBlobEpochs + 5,
current: func() primitives.Slot {
return fuluSlot + minSlots
},
oldestSlotFlagPtr: func() *primitives.Slot {
slot := primitives.Slot(100)
return &slot
@@ -232,16 +239,27 @@ func TestSyncNeedsInitialize(t *testing.T) {
expectedBlob: 5000,
expectedCol: 5000,
},
{
name: "regression for deneb start",
blobRetentionFlag: 8212500,
expectValidOldest: true,
oldestSlotFlagPtr: &denebSlot,
current: func() primitives.Slot {
return fuluSlot + minSlots
},
expectedBlob: 8212500,
expectedCol: 8212500,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
result, err := NewSyncNeeds(currentFunc, tc.oldestSlotFlagPtr, tc.blobRetentionFlag)
if tc.current == nil {
tc.current = currentFunc
}
result, err := NewSyncNeeds(tc.current, tc.oldestSlotFlagPtr, tc.blobRetentionFlag)
require.NoError(t, err)
// Check that current, deneb, fulu are set correctly
require.Equal(t, currentSlot, result.current())
// Check retention calculations
require.Equal(t, tc.expectedBlob, result.blobRetention)
require.Equal(t, tc.expectedCol, result.colRetention)

View File

@@ -38,6 +38,7 @@ go_library(
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_spf13_afero//:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

View File

@@ -25,6 +25,7 @@ import (
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/pkg/errors"
"github.com/spf13/afero"
"golang.org/x/sync/errgroup"
)
const (
@@ -185,73 +186,162 @@ func (dcs *DataColumnStorage) WarmCache() {
highestStoredEpoch := primitives.Epoch(0)
// Walk the data column filesystem to warm up the cache.
if err := afero.Walk(dcs.fs, ".", func(path string, info os.FileInfo, fileErr error) (err error) {
if fileErr != nil {
return fileErr
}
// If not a leaf, skip.
if info.IsDir() {
return nil
}
// Extract metadata from the file path.
fileMetadata, err := extractFileMetadata(path)
if err != nil {
log.WithError(err).Error("Error encountered while extracting file metadata")
return nil
}
// Open the data column filesystem file.
f, err := dcs.fs.Open(path)
if err != nil {
log.WithError(err).Error("Error encountered while opening data column filesystem file")
return nil
}
// Close the file.
defer func() {
// Overwrite the existing error only if it is nil, since the close error is less important.
closeErr := f.Close()
if closeErr != nil && err == nil {
err = closeErr
}
}()
// Read the metadata of the file.
metadata, err := dcs.metadata(f)
if err != nil {
log.WithError(err).Error("Error encountered while reading metadata from data column filesystem file")
return nil
}
// Check the indices.
indices := metadata.indices.all()
if len(indices) == 0 {
return nil
}
// Build the ident.
dataColumnsIdent := DataColumnsIdent{Root: fileMetadata.blockRoot, Epoch: fileMetadata.epoch, Indices: indices}
// Update the highest stored epoch.
highestStoredEpoch = max(highestStoredEpoch, fileMetadata.epoch)
// Set the ident in the cache.
if err := dcs.cache.set(dataColumnsIdent); err != nil {
log.WithError(err).Error("Error encountered while ensuring data column filesystem cache")
}
return nil
}); err != nil {
log.WithError(err).Error("Error encountered while walking data column filesystem.")
// List all period directories
periodFileInfos, err := afero.ReadDir(dcs.fs, ".")
if err != nil {
log.WithError(err).Error("Error reading top directory during warm cache")
return
}
// Prune the cache and the filesystem.
// Iterate through periods
for _, periodFileInfo := range periodFileInfos {
if !periodFileInfo.IsDir() {
continue
}
periodPath := periodFileInfo.Name()
// List all epoch directories in this period
epochFileInfos, err := afero.ReadDir(dcs.fs, periodPath)
if err != nil {
log.WithError(err).WithField("period", periodPath).Error("Error reading period directory during warm cache")
continue
}
// Iterate through epochs
for _, epochFileInfo := range epochFileInfos {
if !epochFileInfo.IsDir() {
continue
}
epochPath := path.Join(periodPath, epochFileInfo.Name())
// List all .sszs files in this epoch
files, err := listEpochFiles(dcs.fs, epochPath)
if err != nil {
log.WithError(err).WithField("epoch", epochPath).Error("Error listing epoch files during warm cache")
continue
}
if len(files) == 0 {
continue
}
// Process all files in this epoch in parallel
epochHighest, err := dcs.processEpochFiles(files)
if err != nil {
log.WithError(err).WithField("epoch", epochPath).Error("Error processing epoch files during warm cache")
}
highestStoredEpoch = max(highestStoredEpoch, epochHighest)
}
}
// Prune the cache and the filesystem
dcs.prune()
log.WithField("elapsed", time.Since(start)).Info("Data column filesystem cache warm-up complete")
totalElapsed := time.Since(start)
// Log summary
log.WithField("elapsed", totalElapsed).Info("Data column filesystem cache warm-up complete")
}
// listEpochFiles lists all .sszs files in an epoch directory.
func listEpochFiles(fs afero.Fs, epochPath string) ([]string, error) {
fileInfos, err := afero.ReadDir(fs, epochPath)
if err != nil {
return nil, errors.Wrap(err, "read epoch directory")
}
files := make([]string, 0, len(fileInfos))
for _, fileInfo := range fileInfos {
if fileInfo.IsDir() {
continue
}
fileName := fileInfo.Name()
if strings.HasSuffix(fileName, "."+dataColumnsFileExtension) {
files = append(files, path.Join(epochPath, fileName))
}
}
return files, nil
}
// processEpochFiles processes all .sszs files in an epoch directory in parallel.
func (dcs *DataColumnStorage) processEpochFiles(files []string) (primitives.Epoch, error) {
var (
eg errgroup.Group
mu sync.Mutex
)
highestEpoch := primitives.Epoch(0)
for _, filePath := range files {
eg.Go(func() error {
epoch, err := dcs.processFile(filePath)
if err != nil {
log.WithError(err).WithField("file", filePath).Error("Error processing file during warm cache")
return nil
}
mu.Lock()
defer mu.Unlock()
highestEpoch = max(highestEpoch, epoch)
return nil
})
}
if err := eg.Wait(); err != nil {
return highestEpoch, err
}
return highestEpoch, nil
}
// processFile processes a single .sszs file.
func (dcs *DataColumnStorage) processFile(filePath string) (primitives.Epoch, error) {
// Extract metadata from the file path
fileMetadata, err := extractFileMetadata(filePath)
if err != nil {
return 0, errors.Wrap(err, "extract file metadata")
}
// Open the file (each goroutine gets its own FD)
f, err := dcs.fs.Open(filePath)
if err != nil {
return 0, errors.Wrap(err, "open file")
}
defer func() {
if closeErr := f.Close(); closeErr != nil {
log.WithError(closeErr).WithField("file", filePath).Error("Error closing file during warm cache")
}
}()
// Read metadata
metadata, err := dcs.metadata(f)
if err != nil {
return 0, errors.Wrap(err, "read metadata")
}
// Extract indices
indices := metadata.indices.all()
if len(indices) == 0 {
return fileMetadata.epoch, nil // No indices, skip
}
// Build ident and set in cache (thread-safe)
dataColumnsIdent := DataColumnsIdent{
Root: fileMetadata.blockRoot,
Epoch: fileMetadata.epoch,
Indices: indices,
}
if err := dcs.cache.set(dataColumnsIdent); err != nil {
return 0, errors.Wrap(err, "cache set")
}
return fileMetadata.epoch, nil
}
// Summary returns the DataColumnStorageSummary.

View File

@@ -72,7 +72,6 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_k8s_client_go//tools/cache:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",

View File

@@ -7,7 +7,6 @@ import (
"strings"
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/execution/types"
@@ -58,7 +57,6 @@ var (
fuluEngineEndpoints = []string{
GetPayloadMethodV5,
GetBlobsV2,
GetBlobsV3,
}
)
@@ -100,8 +98,6 @@ const (
GetBlobsV1 = "engine_getBlobsV1"
// GetBlobsV2 request string for JSON-RPC.
GetBlobsV2 = "engine_getBlobsV2"
// GetBlobsV3 request string for JSON-RPC.
GetBlobsV3 = "engine_getBlobsV3"
// Defines the seconds before timing out engine endpoints with non-block execution semantics.
defaultEngineTimeout = time.Second
)
@@ -125,7 +121,7 @@ type Reconstructor interface {
ctx context.Context, blindedBlocks []interfaces.ReadOnlySignedBeaconBlock,
) ([]interfaces.SignedBeaconBlock, error)
ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [fieldparams.RootLength]byte, hi func(uint64) bool) ([]blocks.VerifiedROBlob, error)
ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, []blocks.PartialDataColumn, error)
ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error)
}
// EngineCaller defines a client that can interact with an Ethereum
@@ -552,22 +548,6 @@ func (s *Service) GetBlobsV2(ctx context.Context, versionedHashes []common.Hash)
return result, handleRPCError(err)
}
func (s *Service) GetBlobsV3(ctx context.Context, versionedHashes []common.Hash) ([]*pb.BlobAndProofV2, error) {
ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.GetBlobsV3")
defer span.End()
start := time.Now()
if !s.capabilityCache.has(GetBlobsV3) {
return nil, errors.New(fmt.Sprintf("%s is not supported", GetBlobsV3))
}
getBlobsV3RequestsTotal.Inc()
result := make([]*pb.BlobAndProofV2, len(versionedHashes))
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV3, versionedHashes)
getBlobsV3Latency.Observe(time.Since(start).Seconds())
return result, handleRPCError(err)
}
// ReconstructFullBlock takes in a blinded beacon block and reconstructs
// a beacon block with a full execution payload via the engine API.
func (s *Service) ReconstructFullBlock(
@@ -678,51 +658,40 @@ func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.
return verifiedBlobs, nil
}
func (s *Service) ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, []blocks.PartialDataColumn, error) {
func (s *Service) ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error) {
root := populator.Root()
// Fetch cells and proofs from the execution client using the KZG commitments from the sidecar.
commitments, err := populator.Commitments()
if err != nil {
return nil, nil, wrapWithBlockRoot(err, root, "commitments")
return nil, wrapWithBlockRoot(err, root, "commitments")
}
included, cellsPerBlob, proofsPerBlob, err := s.fetchCellsAndProofsFromExecution(ctx, commitments)
log.Info("Received cells and proofs from execution client", "included", included, "cells count", len(cellsPerBlob), "err", err)
cellsPerBlob, proofsPerBlob, err := s.fetchCellsAndProofsFromExecution(ctx, commitments)
if err != nil {
return nil, nil, wrapWithBlockRoot(err, root, "fetch cells and proofs from execution client")
return nil, wrapWithBlockRoot(err, root, "fetch cells and proofs from execution client")
}
partialColumns, err := peerdas.PartialColumns(included, cellsPerBlob, proofsPerBlob, populator)
haveAllBlobs := included.Count() == uint64(len(commitments))
log.Info("Constructed partial columns", "haveAllBlobs", haveAllBlobs)
// TODO: Check for err==nil before we go here.
if haveAllBlobs {
// Construct data column sidears from the signed block and cells and proofs.
roSidecars, err := peerdas.DataColumnSidecars(cellsPerBlob, proofsPerBlob, populator)
if err != nil {
return nil, nil, wrapWithBlockRoot(err, populator.Root(), "data column sidcars from column sidecar")
}
// Upgrade the sidecars to verified sidecars.
// We trust the execution layer we are connected to, so we can upgrade the sidecar into a verified one.
verifiedROSidecars := upgradeSidecarsToVerifiedSidecars(roSidecars)
return verifiedROSidecars, partialColumns, nil
// Return early if nothing is returned from the EL.
if len(cellsPerBlob) == 0 {
return nil, nil
}
// Construct data column sidears from the signed block and cells and proofs.
roSidecars, err := peerdas.DataColumnSidecars(cellsPerBlob, proofsPerBlob, populator)
if err != nil {
return nil, nil, wrapWithBlockRoot(err, populator.Root(), "partial columns from column sidecar")
return nil, wrapWithBlockRoot(err, populator.Root(), "data column sidcars from column sidecar")
}
return nil, partialColumns, nil
// Upgrade the sidecars to verified sidecars.
// We trust the execution layer we are connected to, so we can upgrade the sidecar into a verified one.
verifiedROSidecars := upgradeSidecarsToVerifiedSidecars(roSidecars)
return verifiedROSidecars, nil
}
// fetchCellsAndProofsFromExecution fetches cells and proofs from the execution client (using engine_getBlobsV2 execution API method)
// The returned cells and proofs are compacted and will not contain entries for missing blobs.
// The returned bitlist is the bitlist of which blobs are present i.e. which are the blobs we have the blob data for. This
// will help index into the cells and proofs arrays.
func (s *Service) fetchCellsAndProofsFromExecution(ctx context.Context, kzgCommitments [][]byte) (bitfield.Bitlist /* included parts */, [][]kzg.Cell, [][]kzg.Proof, error) {
func (s *Service) fetchCellsAndProofsFromExecution(ctx context.Context, kzgCommitments [][]byte) ([][]kzg.Cell, [][]kzg.Proof, error) {
// Collect KZG hashes for all blobs.
versionedHashes := make([]common.Hash, 0, len(kzgCommitments))
for _, commitment := range kzgCommitments {
@@ -730,44 +699,24 @@ func (s *Service) fetchCellsAndProofsFromExecution(ctx context.Context, kzgCommi
versionedHashes = append(versionedHashes, versionedHash)
}
var blobAndProofs []*pb.BlobAndProofV2
// Fetch all blobsAndCellsProofs from the execution client.
var err error
useV3 := s.capabilityCache.has(GetBlobsV3)
if useV3 {
// v3 can return a partial response. V2 is all or nothing
blobAndProofs, err = s.GetBlobsV3(ctx, versionedHashes)
} else {
blobAndProofs, err = s.GetBlobsV2(ctx, versionedHashes)
}
blobAndProofV2s, err := s.GetBlobsV2(ctx, versionedHashes)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "get blobs V2/3")
return nil, nil, errors.Wrapf(err, "get blobs V2")
}
// Track complete vs partial responses for V3
if useV3 && len(blobAndProofs) > 0 {
nonNilCount := 0
for _, bp := range blobAndProofs {
if bp != nil {
nonNilCount++
}
}
if nonNilCount == len(versionedHashes) {
getBlobsV3CompleteResponsesTotal.Inc()
} else if nonNilCount > 0 {
getBlobsV3PartialResponsesTotal.Inc()
}
// Return early if nothing is returned from the EL.
if len(blobAndProofV2s) == 0 {
return nil, nil, nil
}
// Compute cells and proofs from the blobs and cell proofs.
included, cellsPerBlob, proofsPerBlob, err := peerdas.ComputeCellsAndProofsFromStructured(uint64(len(kzgCommitments)), blobAndProofs)
cellsPerBlob, proofsPerBlob, err := peerdas.ComputeCellsAndProofsFromStructured(blobAndProofV2s)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "compute cells and proofs")
return nil, nil, errors.Wrap(err, "compute cells and proofs")
}
return included, cellsPerBlob, proofsPerBlob, nil
return cellsPerBlob, proofsPerBlob, nil
}
// upgradeSidecarsToVerifiedSidecars upgrades a list of data column sidecars into verified data column sidecars.

View File

@@ -2587,7 +2587,7 @@ func TestConstructDataColumnSidecars(t *testing.T) {
ctx := context.Background()
t.Run("GetBlobsV2 is not supported", func(t *testing.T) {
_, _, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
_, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.ErrorContains(t, "engine_getBlobsV2 is not supported", err)
})
@@ -2598,7 +2598,7 @@ func TestConstructDataColumnSidecars(t *testing.T) {
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
dataColumns, _, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
dataColumns, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
require.Equal(t, 0, len(dataColumns))
})
@@ -2611,7 +2611,7 @@ func TestConstructDataColumnSidecars(t *testing.T) {
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
dataColumns, _, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
dataColumns, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
require.Equal(t, 128, len(dataColumns))
})

View File

@@ -34,25 +34,6 @@ var (
Buckets: []float64{25, 50, 100, 200, 500, 1000, 2000, 4000},
},
)
getBlobsV3RequestsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "beacon_engine_getBlobsV3_requests_total",
Help: "Total number of engine_getBlobsV3 requests sent",
})
getBlobsV3CompleteResponsesTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "beacon_engine_getBlobsV3_complete_responses_total",
Help: "Total number of complete engine_getBlobsV3 successful responses received",
})
getBlobsV3PartialResponsesTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "beacon_engine_getBlobsV3_partial_responses_total",
Help: "Total number of engine_getBlobsV3 partial responses received",
})
getBlobsV3Latency = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "beacon_engine_getBlobsV3_request_duration_seconds",
Help: "Duration of engine_getBlobsV3 requests in seconds",
Buckets: []float64{0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 4},
},
)
errParseCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "execution_parse_error_count",
Help: "The number of errors that occurred while parsing execution payload",

View File

@@ -118,8 +118,8 @@ func (e *EngineClient) ReconstructBlobSidecars(context.Context, interfaces.ReadO
}
// ConstructDataColumnSidecars is a mock implementation of the ConstructDataColumnSidecars method.
func (e *EngineClient) ConstructDataColumnSidecars(context.Context, peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, []blocks.PartialDataColumn, error) {
return e.DataColumnSidecars, nil, e.ErrorDataColumnSidecars
func (e *EngineClient) ConstructDataColumnSidecars(context.Context, peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error) {
return e.DataColumnSidecars, e.ErrorDataColumnSidecars
}
// GetTerminalBlockHash --

View File

@@ -668,7 +668,6 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
DB: b.db,
StateGen: b.stateGen,
ClockWaiter: b.ClockWaiter,
PartialDataColumns: b.cliCtx.Bool(flags.PartialDataColumns.Name),
})
if err != nil {
return err

View File

@@ -51,7 +51,6 @@ go_library(
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/partialdatacolumnbroadcaster:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",

View File

@@ -342,7 +342,7 @@ func (s *Service) BroadcastLightClientFinalityUpdate(ctx context.Context, update
// there is at least one peer in each needed subnet. If not, it will attempt to find one before broadcasting.
// This function is non-blocking. It stops trying to broadcast a given sidecar when more than one slot has passed, or the context is
// cancelled (whichever comes first).
func (s *Service) BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn, partialColumns []blocks.PartialDataColumn) error {
func (s *Service) BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn) error {
// Increase the number of broadcast attempts.
dataColumnSidecarBroadcastAttempts.Add(float64(len(sidecars)))
@@ -352,7 +352,7 @@ func (s *Service) BroadcastDataColumnSidecars(ctx context.Context, sidecars []bl
return errors.Wrap(err, "current fork digest")
}
go s.broadcastDataColumnSidecars(ctx, forkDigest, sidecars, partialColumns)
go s.broadcastDataColumnSidecars(ctx, forkDigest, sidecars)
return nil
}
@@ -360,7 +360,7 @@ func (s *Service) BroadcastDataColumnSidecars(ctx context.Context, sidecars []bl
// broadcastDataColumnSidecars broadcasts multiple data column sidecars to the p2p network, after ensuring
// there is at least one peer in each needed subnet. If not, it will attempt to find one before broadcasting.
// It returns when all broadcasts are complete, or the context is cancelled (whichever comes first).
func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [fieldparams.VersionLength]byte, sidecars []blocks.VerifiedRODataColumn, partialColumns []blocks.PartialDataColumn) {
func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [fieldparams.VersionLength]byte, sidecars []blocks.VerifiedRODataColumn) {
type rootAndIndex struct {
root [fieldparams.RootLength]byte
index uint64
@@ -374,7 +374,7 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
logLevel := logrus.GetLevel()
slotPerRoot := make(map[[fieldparams.RootLength]byte]primitives.Slot, 1)
for i, sidecar := range sidecars {
for _, sidecar := range sidecars {
slotPerRoot[sidecar.BlockRoot()] = sidecar.Slot()
wg.Go(func() {
@@ -398,14 +398,6 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
return
}
if s.partialColumnBroadcaster != nil && i < len(partialColumns) {
fullTopicStr := topic + s.Encoding().ProtocolSuffix()
if err := s.partialColumnBroadcaster.Publish(fullTopicStr, partialColumns[i]); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot partial broadcast data column sidecar")
}
}
// Broadcast the data column sidecar to the network.
if err := s.broadcastObject(ctx, sidecar, topic); err != nil {
tracing.AnnotateError(span, err)

View File

@@ -773,7 +773,7 @@ func TestService_BroadcastDataColumn(t *testing.T) {
time.Sleep(50 * time.Millisecond)
// Broadcast to peers and wait.
err = service.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{verifiedRoSidecar}, nil)
err = service.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{verifiedRoSidecar})
require.NoError(t, err)
// Receive the message.

View File

@@ -26,7 +26,6 @@ const (
// Config for the p2p service. These parameters are set from application level flags
// to initialize the p2p service.
type Config struct {
PartialDataColumns bool
NoDiscovery bool
EnableUPnP bool
StaticPeerID bool

View File

@@ -4,7 +4,6 @@ import (
"context"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
@@ -29,7 +28,6 @@ type (
Broadcaster
SetStreamHandler
PubSubProvider
PartialColumnBroadcasterProvider
PubSubTopicUser
SenderEncoder
PeerManager
@@ -54,7 +52,7 @@ type (
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
BroadcastLightClientOptimisticUpdate(ctx context.Context, update interfaces.LightClientOptimisticUpdate) error
BroadcastLightClientFinalityUpdate(ctx context.Context, update interfaces.LightClientFinalityUpdate) error
BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn, partialColumns []blocks.PartialDataColumn) error
BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn) error
}
// SetStreamHandler configures p2p to handle streams of a certain topic ID.
@@ -94,11 +92,6 @@ type (
PubSub() *pubsub.PubSub
}
// PubSubProvider provides the p2p pubsub protocol.
PartialColumnBroadcasterProvider interface {
PartialColumnBroadcaster() *partialdatacolumnbroadcaster.PartialColumnBroadcaster
}
// PeerManager abstracts some peer management methods from libp2p.
PeerManager interface {
Disconnect(peer.ID) error

View File

@@ -157,11 +157,6 @@ var (
Help: "The number of publish messages received via rpc for a particular topic",
},
[]string{"topic"})
pubsubRPCPubRecvSize = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_recv_pub_size_total",
Help: "The total size of publish messages received via rpc for a particular topic",
},
[]string{"topic", "is_partial"})
pubsubRPCDrop = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_drop_total",
Help: "The number of messages dropped via rpc for a particular control message",
@@ -176,11 +171,6 @@ var (
Help: "The number of publish messages dropped via rpc for a particular topic",
},
[]string{"topic"})
pubsubRPCPubDropSize = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_drop_pub_size_total",
Help: "The total size of publish messages dropped via rpc for a particular topic",
},
[]string{"topic", "is_partial"})
pubsubRPCSent = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_sent_total",
Help: "The number of messages sent via rpc for a particular control message",
@@ -195,11 +185,6 @@ var (
Help: "The number of publish messages sent via rpc for a particular topic",
},
[]string{"topic"})
pubsubRPCPubSentSize = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gossipsub_pubsub_rpc_sent_pub_size_total",
Help: "The total size of publish messages sent via rpc for a particular topic",
},
[]string{"topic", "is_partial"})
)
func (s *Service) updateMetrics() {

View File

@@ -1,27 +0,0 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"metrics.go",
"partial.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster",
visibility = ["//visibility:public"],
deps = [
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//internal/logrusadapter:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages/bitmap:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -1,25 +0,0 @@
load("@prysm//tools/go:def.bzl", "go_test")
go_test(
name = "go_default_test",
size = "medium",
srcs = ["two_node_test.go"],
deps = [
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/partialdatacolumnbroadcaster:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p//x/simlibp2p:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_marcopolo_simnet//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -1,238 +0,0 @@
package integrationtest
import (
"context"
"crypto/rand"
"fmt"
"testing"
"testing/synctest"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
"github.com/OffchainLabs/prysm/v7/testing/util"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
simlibp2p "github.com/libp2p/go-libp2p/x/simlibp2p"
"github.com/marcopolo/simnet"
"github.com/sirupsen/logrus"
)
// TestTwoNodePartialColumnExchange tests that two nodes can exchange partial columns
// and reconstruct the complete column. Node 1 has cells 0-2, Node 2 has cells 3-5.
// After exchange, both should have all cells.
func TestTwoNodePartialColumnExchange(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
// Create a simulated libp2p network
latency := time.Millisecond * 10
network, meta, err := simlibp2p.SimpleLibp2pNetwork([]simlibp2p.NodeLinkSettingsAndCount{
{LinkSettings: simnet.NodeBiDiLinkSettings{
Downlink: simnet.LinkSettings{BitsPerSecond: 20 * simlibp2p.OneMbps, Latency: latency / 2},
Uplink: simnet.LinkSettings{BitsPerSecond: 20 * simlibp2p.OneMbps, Latency: latency / 2},
}, Count: 2},
}, simlibp2p.NetworkSettings{UseBlankHost: true})
require.NoError(t, err)
require.NoError(t, network.Start())
defer func() {
require.NoError(t, network.Close())
}()
defer func() {
for _, node := range meta.Nodes {
err := node.Close()
if err != nil {
panic(err)
}
}
}()
h1 := meta.Nodes[0]
h2 := meta.Nodes[1]
logger := logrus.New()
logger.SetLevel(logrus.DebugLevel)
broadcaster1 := partialdatacolumnbroadcaster.NewBroadcaster(logger)
broadcaster2 := partialdatacolumnbroadcaster.NewBroadcaster(logger)
opts1 := broadcaster1.AppendPubSubOpts([]pubsub.Option{
pubsub.WithMessageSigning(false),
pubsub.WithStrictSignatureVerification(false),
})
opts2 := broadcaster2.AppendPubSubOpts([]pubsub.Option{
pubsub.WithMessageSigning(false),
pubsub.WithStrictSignatureVerification(false),
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ps1, err := pubsub.NewGossipSub(ctx, h1, opts1...)
require.NoError(t, err)
ps2, err := pubsub.NewGossipSub(ctx, h2, opts2...)
require.NoError(t, err)
go broadcaster1.Start()
go broadcaster2.Start()
defer func() {
broadcaster1.Stop()
broadcaster2.Stop()
}()
// Generate Test Data
var blockRoot [fieldparams.RootLength]byte
copy(blockRoot[:], []byte("test-block-root"))
numCells := 6
commitments := make([][]byte, numCells)
cells := make([][]byte, numCells)
proofs := make([][]byte, numCells)
for i := range numCells {
commitments[i] = make([]byte, 48)
cells[i] = make([]byte, 2048)
rand.Read(cells[i])
proofs[i] = make([]byte, 48)
fmt.Appendf(proofs[i][:0], "proof %d", i)
}
roDC, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{
{
BodyRoot: blockRoot[:],
KzgCommitments: commitments,
Column: cells,
KzgProofs: proofs,
},
})
pc1, err := blocks.NewPartialDataColumn(roDC[0].DataColumnSidecar.SignedBlockHeader, roDC[0].Index, roDC[0].KzgCommitments, roDC[0].KzgCommitmentsInclusionProof)
require.NoError(t, err)
pc2, err := blocks.NewPartialDataColumn(roDC[0].DataColumnSidecar.SignedBlockHeader, roDC[0].Index, roDC[0].KzgCommitments, roDC[0].KzgCommitmentsInclusionProof)
require.NoError(t, err)
// Split data
for i := range numCells {
if i%2 == 0 {
pc1.ExtendFromVerfifiedCell(uint64(i), roDC[0].Column[i], roDC[0].KzgProofs[i])
} else {
pc2.ExtendFromVerfifiedCell(uint64(i), roDC[0].Column[i], roDC[0].KzgProofs[i])
}
}
// Setup Topic and Subscriptions
digest := params.ForkDigest(0)
columnIndex := uint64(12)
subnet := peerdas.ComputeSubnetForDataColumnSidecar(columnIndex)
topicStr := fmt.Sprintf(p2p.DataColumnSubnetTopicFormat, digest, subnet) +
encoder.SszNetworkEncoder{}.ProtocolSuffix()
time.Sleep(100 * time.Millisecond)
topic1, err := ps1.Join(topicStr, pubsub.RequestPartialMessages())
require.NoError(t, err)
topic2, err := ps2.Join(topicStr, pubsub.RequestPartialMessages())
require.NoError(t, err)
// Header validator that verifies the inclusion proof
headerValidator := func(header *ethpb.PartialDataColumnHeader) (reject bool, err error) {
if header == nil {
return false, fmt.Errorf("nil header")
}
if header.SignedBlockHeader == nil || header.SignedBlockHeader.Header == nil {
return true, fmt.Errorf("nil signed block header")
}
if len(header.KzgCommitments) == 0 {
return true, fmt.Errorf("empty kzg commitments")
}
// Verify inclusion proof
if err := peerdas.VerifyPartialDataColumnHeaderInclusionProof(header); err != nil {
return true, fmt.Errorf("invalid inclusion proof: %w", err)
}
t.Log("Header validation passed")
return false, nil
}
cellValidator := func(_ []blocks.CellProofBundle) error {
return nil
}
node1Complete := make(chan blocks.VerifiedRODataColumn, 1)
node2Complete := make(chan blocks.VerifiedRODataColumn, 1)
handler1 := func(topic string, col blocks.VerifiedRODataColumn) {
t.Logf("Node 1: Completed! Column has %d cells", len(col.Column))
node1Complete <- col
}
handler2 := func(topic string, col blocks.VerifiedRODataColumn) {
t.Logf("Node 2: Completed! Column has %d cells", len(col.Column))
node2Complete <- col
}
// Connect hosts
err = h1.Connect(context.Background(), peer.AddrInfo{
ID: h2.ID(),
Addrs: h2.Addrs(),
})
require.NoError(t, err)
time.Sleep(300 * time.Millisecond)
// Subscribe to regular GossipSub (critical for partial message RPC exchange!)
sub1, err := topic1.Subscribe()
require.NoError(t, err)
defer sub1.Cancel()
sub2, err := topic2.Subscribe()
require.NoError(t, err)
defer sub2.Cancel()
err = broadcaster1.Subscribe(topic1, headerValidator, cellValidator, handler1)
require.NoError(t, err)
err = broadcaster2.Subscribe(topic2, headerValidator, cellValidator, handler2)
require.NoError(t, err)
// Wait for mesh to form
time.Sleep(2 * time.Second)
// Publish
t.Log("Publishing from Node 1")
err = broadcaster1.Publish(topicStr, pc1)
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
t.Log("Publishing from Node 2")
err = broadcaster2.Publish(topicStr, pc2)
require.NoError(t, err)
// Wait for Completion
timeout := time.After(10 * time.Second)
var col1, col2 blocks.VerifiedRODataColumn
receivedCount := 0
for receivedCount < 2 {
select {
case col1 = <-node1Complete:
t.Log("Node 1 completed reconstruction")
receivedCount++
case col2 = <-node2Complete:
t.Log("Node 2 completed reconstruction")
receivedCount++
case <-timeout:
t.Fatalf("Timeout: Only %d/2 nodes completed", receivedCount)
}
}
// Verify both columns have all cells
assert.Equal(t, numCells, len(col1.Column), "Node 1 should have all cells")
assert.Equal(t, numCells, len(col2.Column), "Node 2 should have all cells")
assert.DeepSSZEqual(t, cells, col1.Column, "Node 1 cell mismatch")
assert.DeepSSZEqual(t, cells, col2.Column, "Node 2 cell mismatch")
})
}

View File

@@ -1,18 +0,0 @@
package partialdatacolumnbroadcaster
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
partialMessageUsefulCellsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "beacon_partial_message_useful_cells_total",
Help: "Number of useful cells received via a partial message",
}, []string{"column_index"})
partialMessageCellsReceivedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "beacon_partial_message_cells_received_total",
Help: "Number of total cells received via a partial message",
}, []string{"column_index"})
)

View File

@@ -1,544 +0,0 @@
package partialdatacolumnbroadcaster
import (
"bytes"
"log/slog"
"regexp"
"strconv"
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/internal/logrusadapter"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p-pubsub/partialmessages"
"github.com/libp2p/go-libp2p-pubsub/partialmessages/bitmap"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// TODOs:
// different eager push strategies:
// - no eager push
// - full column eager push
// - With debouncing - some factor of RTT
// - eager push missing cells
const TTLInSlots = 3
const maxConcurrentValidators = 128
var dataColumnTopicRegex = regexp.MustCompile(`data_column_sidecar_(\d+)`)
func extractColumnIndexFromTopic(topic string) (uint64, error) {
matches := dataColumnTopicRegex.FindStringSubmatch(topic)
if len(matches) < 2 {
return 0, errors.New("could not extract column index from topic")
}
return strconv.ParseUint(matches[1], 10, 64)
}
// HeaderValidator validates a PartialDataColumnHeader.
// Returns (reject, err) where:
// - reject=true, err!=nil: REJECT - peer should be penalized
// - reject=false, err!=nil: IGNORE - don't penalize, just ignore
// - reject=false, err=nil: valid header
type HeaderValidator func(header *ethpb.PartialDataColumnHeader) (reject bool, err error)
type ColumnValidator func(cells []blocks.CellProofBundle) error
type PartialColumnBroadcaster struct {
logger *logrus.Logger
ps *pubsub.PubSub
stop chan struct{}
// map topic -> headerValidators
headerValidators map[string]HeaderValidator
// map topic -> Validator
validators map[string]ColumnValidator
// map topic -> handler
handlers map[string]SubHandler
// map topic -> *pubsub.Topic
topics map[string]*pubsub.Topic
concurrentValidatorSemaphore chan struct{}
// map topic -> map[groupID]PartialColumn
partialMsgStore map[string]map[string]*blocks.PartialDataColumn
groupTTL map[string]int8
// validHeaderCache caches validated headers by group ID (works across topics)
validHeaderCache map[string]*ethpb.PartialDataColumnHeader
incomingReq chan request
}
type requestKind uint8
const (
requestKindPublish requestKind = iota
requestKindSubscribe
requestKindUnsubscribe
requestKindHandleIncomingRPC
requestKindCellsValidated
)
type request struct {
kind requestKind
response chan error
sub subscribe
unsub unsubscribe
publish publish
incomingRPC rpcWithFrom
cellsValidated *cellsValidated
}
type publish struct {
topic string
c blocks.PartialDataColumn
}
type subscribe struct {
t *pubsub.Topic
headerValidator HeaderValidator
validator ColumnValidator
handler SubHandler
}
type unsubscribe struct {
topic string
}
type rpcWithFrom struct {
*pubsub_pb.PartialMessagesExtension
from peer.ID
}
type cellsValidated struct {
validationTook time.Duration
topic string
group []byte
cellIndices []uint64
cells []blocks.CellProofBundle
}
func NewBroadcaster(logger *logrus.Logger) *PartialColumnBroadcaster {
return &PartialColumnBroadcaster{
validators: make(map[string]ColumnValidator),
headerValidators: make(map[string]HeaderValidator),
handlers: make(map[string]SubHandler),
topics: make(map[string]*pubsub.Topic),
partialMsgStore: make(map[string]map[string]*blocks.PartialDataColumn),
groupTTL: make(map[string]int8),
validHeaderCache: make(map[string]*ethpb.PartialDataColumnHeader),
// GossipSub sends the messages to this channel. The buffer should be
// big enough to avoid dropping messages. We don't want to block the gossipsub event loop for this.
incomingReq: make(chan request, 128*16),
logger: logger,
concurrentValidatorSemaphore: make(chan struct{}, maxConcurrentValidators),
}
}
// AppendPubSubOpts adds the necessary pubsub options to enable partial messages.
func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubsub.Option {
slogger := slog.New(logrusadapter.Handler{Logger: p.logger})
opts = append(opts,
pubsub.WithPartialMessagesExtension(&partialmessages.PartialMessagesExtension{
Logger: slogger,
MergePartsMetadata: func(topic string, left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
if len(left) == 0 {
return right
}
merged, err := bitfield.Bitlist(left).Or(bitfield.Bitlist(right))
if err != nil {
p.logger.Warn("Failed to merge bitfields", "err", err, "left", left, "right", right)
return left
}
return partialmessages.PartsMetadata(merged)
},
ValidateRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
// TODO. Add some basic and fast sanity checks
return nil
},
OnIncomingRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
select {
case p.incomingReq <- request{
kind: requestKindHandleIncomingRPC,
incomingRPC: rpcWithFrom{rpc, from},
}:
default:
p.logger.Warn("Dropping incoming partial RPC", "rpc", rpc)
}
return nil
},
}),
func(ps *pubsub.PubSub) error {
p.ps = ps
return nil
},
)
return opts
}
// Start starts the event loop of the PartialColumnBroadcaster. Should be called
// within a goroutine (go p.Start())
func (p *PartialColumnBroadcaster) Start() {
if p.stop != nil {
return
}
p.stop = make(chan struct{})
p.loop()
}
func (p *PartialColumnBroadcaster) loop() {
cleanup := time.NewTicker(time.Second * time.Duration(params.BeaconConfig().SecondsPerSlot))
defer cleanup.Stop()
for {
select {
case <-p.stop:
return
case <-cleanup.C:
for groupID, ttl := range p.groupTTL {
if ttl > 0 {
p.groupTTL[groupID] = ttl - 1
continue
}
delete(p.groupTTL, groupID)
delete(p.validHeaderCache, groupID)
for topic, msgStore := range p.partialMsgStore {
delete(msgStore, groupID)
if len(msgStore) == 0 {
delete(p.partialMsgStore, topic)
}
}
}
case req := <-p.incomingReq:
switch req.kind {
case requestKindPublish:
req.response <- p.publish(req.publish.topic, req.publish.c)
case requestKindSubscribe:
req.response <- p.subscribe(req.sub.t, req.sub.headerValidator, req.sub.validator, req.sub.handler)
case requestKindUnsubscribe:
req.response <- p.unsubscribe(req.unsub.topic)
case requestKindHandleIncomingRPC:
err := p.handleIncomingRPC(req.incomingRPC)
if err != nil {
p.logger.Error("Failed to handle incoming partial RPC", "err", err)
}
case requestKindCellsValidated:
err := p.handleCellsValidated(req.cellsValidated)
if err != nil {
p.logger.Error("Failed to handle cells validated", "err", err)
}
default:
p.logger.Error("Unknown request kind", "kind", req.kind)
}
}
}
}
func (p *PartialColumnBroadcaster) getDataColumn(topic string, group []byte) *blocks.PartialDataColumn {
topicStore, ok := p.partialMsgStore[topic]
if !ok {
return nil
}
msg, ok := topicStore[string(group)]
if !ok {
return nil
}
return msg
}
func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) error {
if p.ps == nil {
return errors.New("pubsub not initialized")
}
hasMessage := len(rpcWithFrom.PartialMessage) > 0
var message ethpb.PartialDataColumnSidecar
if hasMessage {
err := message.UnmarshalSSZ(rpcWithFrom.PartialMessage)
if err != nil {
return errors.Wrap(err, "failed to unmarshal partial message data")
}
}
topicID := rpcWithFrom.GetTopicID()
groupID := rpcWithFrom.GroupID
ourDataColumn := p.getDataColumn(topicID, groupID)
var shouldRepublish bool
if ourDataColumn == nil && hasMessage {
var header *ethpb.PartialDataColumnHeader
// Check cache first for this group
if cachedHeader, ok := p.validHeaderCache[string(groupID)]; ok {
header = cachedHeader
} else {
// We haven't seen this group before. Check if we have a valid header.
if len(message.Header) == 0 {
p.logger.Debug("No partial column found and no header in message, ignoring")
return nil
}
header = message.Header[0]
headerValidator, ok := p.headerValidators[topicID]
if !ok || headerValidator == nil {
p.logger.Debug("No header validator registered for topic")
return nil
}
reject, err := headerValidator(header)
if err != nil {
p.logger.Debug("Header validation failed", "err", err, "reject", reject)
if reject {
// REJECT case: penalize the peer
_ = p.ps.PeerFeedback(topicID, rpcWithFrom.from, pubsub.PeerFeedbackInvalidMessage)
}
// Both REJECT and IGNORE: don't process further
return nil
}
// Cache the valid header
p.validHeaderCache[string(groupID)] = header
// TODO: We now have the information we need to call GetBlobsV3, we should do that to see what we have locally.
}
columnIndex, err := extractColumnIndexFromTopic(topicID)
if err != nil {
return err
}
newColumn, err := blocks.NewPartialDataColumn(
header.SignedBlockHeader,
columnIndex,
header.KzgCommitments,
header.KzgCommitmentsInclusionProof,
)
if err != nil {
p.logger.WithError(err).WithFields(logrus.Fields{
"topic": topicID,
"columnIndex": columnIndex,
"numCommitments": len(header.KzgCommitments),
}).Error("Failed to create partial data column from header")
return err
}
// Save to store
topicStore, ok := p.partialMsgStore[topicID]
if !ok {
topicStore = make(map[string]*blocks.PartialDataColumn)
p.partialMsgStore[topicID] = topicStore
}
topicStore[string(newColumn.GroupID())] = &newColumn
p.groupTTL[string(newColumn.GroupID())] = TTLInSlots
ourDataColumn = &newColumn
shouldRepublish = true
}
if ourDataColumn == nil {
// We don't have a partial column for this. Can happen if we got cells
// without a header.
return nil
}
logger := p.logger.WithFields(logrus.Fields{
"from": rpcWithFrom.from,
"topic": topicID,
"group": groupID,
})
validator, validatorOK := p.validators[topicID]
if len(rpcWithFrom.PartialMessage) > 0 && validatorOK {
// TODO: is there any penalty we want to consider for giving us data we didn't request?
// Note that we need to be careful around race conditions and eager data.
// Also note that protobufs by design allow extra data that we don't parse.
// Marco's thoughts. No, we don't need to do anything else here.
cellIndices, cellsToVerify, err := ourDataColumn.CellsToVerifyFromPartialMessage(&message)
if err != nil {
return err
}
// Track cells received via partial message
if len(cellIndices) > 0 {
columnIndexStr := strconv.FormatUint(ourDataColumn.Index, 10)
partialMessageCellsReceivedTotal.WithLabelValues(columnIndexStr).Add(float64(len(cellIndices)))
}
if len(cellsToVerify) > 0 {
p.concurrentValidatorSemaphore <- struct{}{}
go func() {
defer func() {
<-p.concurrentValidatorSemaphore
}()
start := time.Now()
err := validator(cellsToVerify)
if err != nil {
logger.Error("failed to validate cells", "err", err)
_ = p.ps.PeerFeedback(topicID, rpcWithFrom.from, pubsub.PeerFeedbackInvalidMessage)
return
}
_ = p.ps.PeerFeedback(topicID, rpcWithFrom.from, pubsub.PeerFeedbackUsefulMessage)
p.incomingReq <- request{
kind: requestKindCellsValidated,
cellsValidated: &cellsValidated{
validationTook: time.Since(start),
topic: topicID,
group: groupID,
cells: cellsToVerify,
cellIndices: cellIndices,
},
}
}()
}
}
peerHas := bitmap.Bitmap(rpcWithFrom.PartsMetadata)
iHave := bitmap.Bitmap(ourDataColumn.PartsMetadata())
if !shouldRepublish && len(peerHas) > 0 && !bytes.Equal(peerHas, iHave) {
// Either we have something they don't or vice versa
shouldRepublish = true
logger.Debug("republishing due to parts metadata difference")
}
if shouldRepublish {
err := p.ps.PublishPartialMessage(topicID, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
return err
}
}
return nil
}
func (p *PartialColumnBroadcaster) handleCellsValidated(cells *cellsValidated) error {
ourDataColumn := p.getDataColumn(cells.topic, cells.group)
if ourDataColumn == nil {
return errors.New("data column not found for verified cells")
}
extended := ourDataColumn.ExtendFromVerfifiedCells(cells.cellIndices, cells.cells)
p.logger.Debug("Extended partial message", "duration", cells.validationTook, "extended", extended)
columnIndexStr := strconv.FormatUint(ourDataColumn.Index, 10)
if extended {
// Track useful cells (cells that extended our data)
partialMessageUsefulCellsTotal.WithLabelValues(columnIndexStr).Add(float64(len(cells.cells)))
// TODO: we could use the heuristic here that if this data was
// useful to us, it's likely useful to our peers and we should
// republish eagerly
if col, ok := ourDataColumn.Complete(p.logger); ok {
p.logger.Info("Completed partial column", "topic", cells.topic, "group", cells.group)
handler, handlerOK := p.handlers[cells.topic]
if handlerOK {
go handler(cells.topic, col)
}
} else {
p.logger.Info("Extended partial column", "topic", cells.topic, "group", cells.group)
}
err := p.ps.PublishPartialMessage(cells.topic, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
return err
}
}
return nil
}
func (p *PartialColumnBroadcaster) Stop() {
if p.stop != nil {
close(p.stop)
p.stop = nil
}
}
// Publish publishes the partial column.
func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataColumn) error {
if p.ps == nil {
return errors.New("pubsub not initialized")
}
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindPublish,
response: respCh,
publish: publish{
topic: topic,
c: c,
},
}
return <-respCh
}
func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataColumn) error {
topicStore, ok := p.partialMsgStore[topic]
if !ok {
topicStore = make(map[string]*blocks.PartialDataColumn)
p.partialMsgStore[topic] = topicStore
}
topicStore[string(c.GroupID())] = &c
p.groupTTL[string(c.GroupID())] = TTLInSlots
return p.ps.PublishPartialMessage(topic, &c, partialmessages.PublishOptions{})
}
type SubHandler func(topic string, col blocks.VerifiedRODataColumn)
func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindSubscribe,
sub: subscribe{
t: t,
headerValidator: headerValidator,
validator: validator,
handler: handler,
},
response: respCh,
}
return <-respCh
}
func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
topic := t.String()
if _, ok := p.topics[topic]; ok {
return errors.New("already subscribed")
}
p.topics[topic] = t
p.headerValidators[topic] = headerValidator
p.validators[topic] = validator
p.handlers[topic] = handler
return nil
}
func (p *PartialColumnBroadcaster) Unsubscribe(topic string) error {
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindUnsubscribe,
unsub: unsubscribe{
topic: topic,
},
response: respCh,
}
return <-respCh
}
func (p *PartialColumnBroadcaster) unsubscribe(topic string) error {
t, ok := p.topics[topic]
if !ok {
return errors.New("topic not found")
}
delete(p.topics, topic)
delete(p.partialMsgStore, topic)
delete(p.headerValidators, topic)
delete(p.validators, topic)
delete(p.handlers, topic)
return t.Close()
}

View File

@@ -58,7 +58,7 @@ func TestPeerExplicitAdd(t *testing.T) {
resAddress, err := p.Address(id)
require.NoError(t, err)
assert.Equal(t, address.Equal(resAddress), true, "Unexpected address")
assert.Equal(t, address, resAddress, "Unexpected address")
resDirection, err := p.Direction(id)
require.NoError(t, err)
@@ -72,7 +72,7 @@ func TestPeerExplicitAdd(t *testing.T) {
resAddress2, err := p.Address(id)
require.NoError(t, err)
assert.Equal(t, address2.Equal(resAddress2), true, "Unexpected address")
assert.Equal(t, address2, resAddress2, "Unexpected address")
resDirection2, err := p.Direction(id)
require.NoError(t, err)

View File

@@ -160,9 +160,6 @@ func (s *Service) pubsubOptions() []pubsub.Option {
}
psOpts = append(psOpts, pubsub.WithDirectPeers(directPeersAddrInfos))
}
if s.partialColumnBroadcaster != nil {
psOpts = s.partialColumnBroadcaster.AppendPubSubOpts(psOpts)
}
return psOpts
}

View File

@@ -88,20 +88,20 @@ func (g gossipTracer) ThrottlePeer(p peer.ID) {
// RecvRPC .
func (g gossipTracer) RecvRPC(rpc *pubsub.RPC) {
g.setMetricFromRPC(recv, pubsubRPCSubRecv, pubsubRPCPubRecv, pubsubRPCPubRecvSize, pubsubRPCRecv, rpc)
g.setMetricFromRPC(recv, pubsubRPCSubRecv, pubsubRPCPubRecv, pubsubRPCRecv, rpc)
}
// SendRPC .
func (g gossipTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {
g.setMetricFromRPC(send, pubsubRPCSubSent, pubsubRPCPubSent, pubsubRPCPubSentSize, pubsubRPCSent, rpc)
g.setMetricFromRPC(send, pubsubRPCSubSent, pubsubRPCPubSent, pubsubRPCSent, rpc)
}
// DropRPC .
func (g gossipTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {
g.setMetricFromRPC(drop, pubsubRPCSubDrop, pubsubRPCPubDrop, pubsubRPCPubDropSize, pubsubRPCDrop, rpc)
g.setMetricFromRPC(drop, pubsubRPCSubDrop, pubsubRPCPubDrop, pubsubRPCDrop, rpc)
}
func (g gossipTracer) setMetricFromRPC(act action, subCtr prometheus.Counter, pubCtr, pubSizeCtr, ctrlCtr *prometheus.CounterVec, rpc *pubsub.RPC) {
func (g gossipTracer) setMetricFromRPC(act action, subCtr prometheus.Counter, pubCtr, ctrlCtr *prometheus.CounterVec, rpc *pubsub.RPC) {
subCtr.Add(float64(len(rpc.Subscriptions)))
if rpc.Control != nil {
ctrlCtr.WithLabelValues("graft").Add(float64(len(rpc.Control.Graft)))
@@ -110,17 +110,12 @@ func (g gossipTracer) setMetricFromRPC(act action, subCtr prometheus.Counter, pu
ctrlCtr.WithLabelValues("iwant").Add(float64(len(rpc.Control.Iwant)))
ctrlCtr.WithLabelValues("idontwant").Add(float64(len(rpc.Control.Idontwant)))
}
// For incoming messages from pubsub, we do not record metrics for them as these values
// could be junk.
if act == recv {
return
}
for _, msg := range rpc.Publish {
pubCtr.WithLabelValues(msg.GetTopic()).Inc()
pubSizeCtr.WithLabelValues(msg.GetTopic(), "false").Add(float64(msg.Size()))
}
if rpc.Partial != nil {
pubCtr.WithLabelValues(rpc.Partial.GetTopicID()).Inc()
pubSizeCtr.WithLabelValues(rpc.Partial.GetTopicID(), "true").Add(float64(rpc.Partial.Size()))
// For incoming messages from pubsub, we do not record metrics for them as these values
// could be junk.
if act == recv {
continue
}
pubCtr.WithLabelValues(*msg.Topic).Inc()
}
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/OffchainLabs/prysm/v7/async"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers/scorers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/types"
@@ -78,7 +77,6 @@ type Service struct {
privKey *ecdsa.PrivateKey
metaData metadata.Metadata
pubsub *pubsub.PubSub
partialColumnBroadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster
joinedTopics map[string]*pubsub.Topic
joinedTopicsLock sync.RWMutex
subnetsLock map[uint64]*sync.RWMutex
@@ -149,10 +147,6 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
custodyInfoSet: make(chan struct{}),
}
if cfg.PartialDataColumns {
s.partialColumnBroadcaster = partialdatacolumnbroadcaster.NewBroadcaster(log.Logger)
}
ipAddr := prysmnetwork.IPAddr()
opts, err := s.buildOptions(ipAddr, s.privKey)
@@ -311,10 +305,6 @@ func (s *Service) Start() {
logExternalDNSAddr(s.host.ID(), p2pHostDNS, p2pTCPPort)
}
go s.forkWatcher()
if s.partialColumnBroadcaster != nil {
go s.partialColumnBroadcaster.Start()
}
}
// Stop the p2p service and terminate all peer connections.
@@ -324,10 +314,6 @@ func (s *Service) Stop() error {
if s.dv5Listener != nil {
s.dv5Listener.Close()
}
if s.partialColumnBroadcaster != nil {
s.partialColumnBroadcaster.Stop()
}
return nil
}
@@ -364,10 +350,6 @@ func (s *Service) PubSub() *pubsub.PubSub {
return s.pubsub
}
func (s *Service) PartialColumnBroadcaster() *partialdatacolumnbroadcaster.PartialColumnBroadcaster {
return s.partialColumnBroadcaster
}
// Host returns the currently running libp2p
// host of the service.
func (s *Service) Host() host.Host {

View File

@@ -21,7 +21,6 @@ go_library(
deps = [
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/partialdatacolumnbroadcaster:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//config/fieldparams:go_default_library",

View File

@@ -4,7 +4,6 @@ import (
"context"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
@@ -109,10 +108,6 @@ func (*FakeP2P) PubSub() *pubsub.PubSub {
return nil
}
func (*FakeP2P) PartialColumnBroadcaster() *partialdatacolumnbroadcaster.PartialColumnBroadcaster {
return nil
}
// MetadataSeq -- fake.
func (*FakeP2P) MetadataSeq() uint64 {
return 0
@@ -174,7 +169,7 @@ func (*FakeP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interfac
}
// BroadcastDataColumnSidecar -- fake.
func (*FakeP2P) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn, _ []blocks.PartialDataColumn) error {
func (*FakeP2P) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn) error {
return nil
}

View File

@@ -63,7 +63,7 @@ func (m *MockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
}
// BroadcastDataColumnSidecar broadcasts a data column for mock.
func (m *MockBroadcaster) BroadcastDataColumnSidecars(context.Context, []blocks.VerifiedRODataColumn, []blocks.PartialDataColumn) error {
func (m *MockBroadcaster) BroadcastDataColumnSidecars(context.Context, []blocks.VerifiedRODataColumn) error {
m.BroadcastCalled.Store(true)
return nil
}

View File

@@ -12,7 +12,6 @@ import (
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers/scorers"
@@ -234,7 +233,7 @@ func (p *TestP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interf
}
// BroadcastDataColumnSidecar broadcasts a data column for mock.
func (p *TestP2P) BroadcastDataColumnSidecars(context.Context, []blocks.VerifiedRODataColumn, []blocks.PartialDataColumn) error {
func (p *TestP2P) BroadcastDataColumnSidecars(context.Context, []blocks.VerifiedRODataColumn) error {
p.BroadcastCalled.Store(true)
return nil
}
@@ -300,10 +299,6 @@ func (p *TestP2P) PubSub() *pubsub.PubSub {
return p.pubsub
}
func (p *TestP2P) PartialColumnBroadcaster() *partialdatacolumnbroadcaster.PartialColumnBroadcaster {
return nil
}
// Disconnect from a peer.
func (p *TestP2P) Disconnect(pid peer.ID) error {
return p.BHost.Network().ClosePeer(pid)

View File

@@ -14,6 +14,7 @@ import (
"github.com/OffchainLabs/prysm/v7/api"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
coreblocks "github.com/OffchainLabs/prysm/v7/beacon-chain/core/blocks"
corehelpers "github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition"
"github.com/OffchainLabs/prysm/v7/beacon-chain/db/filters"
@@ -957,6 +958,13 @@ func (s *Server) validateConsensus(ctx context.Context, b *eth.GenericSignedBeac
}
}
}
blockRoot, err := blk.Block().HashTreeRoot()
if err != nil {
return errors.Wrap(err, "could not hash block")
}
if err := coreblocks.VerifyBlockSignatureUsingCurrentFork(parentState, blk, blockRoot); err != nil {
return errors.Wrap(err, "could not verify block signature")
}
_, err = transition.ExecuteStateTransition(ctx, parentState, blk)
if err != nil {
return errors.Wrap(err, "could not execute state transition")

View File

@@ -130,6 +130,10 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitAttestationsV2")
defer span.End()
if shared.IsSyncing(ctx, w, s.SyncChecker, s.HeadFetcher, s.TimeFetcher, s.OptimisticModeFetcher) {
return
}
versionHeader := r.Header.Get(api.VersionHeader)
if versionHeader == "" {
httputil.HandleError(w, api.VersionHeader+" header is required", http.StatusBadRequest)
@@ -238,22 +242,14 @@ func (s *Server) handleAttestationsElectra(
},
})
targetState, err := s.AttestationStateFetcher.AttestationTargetState(ctx, singleAtt.Data.Target)
if err != nil {
return nil, nil, errors.Wrap(err, "could not get target state for attestation")
}
committee, err := corehelpers.BeaconCommitteeFromState(ctx, targetState, singleAtt.Data.Slot, singleAtt.CommitteeId)
if err != nil {
return nil, nil, errors.Wrap(err, "could not get committee for attestation")
}
att := singleAtt.ToAttestationElectra(committee)
wantedEpoch := slots.ToEpoch(att.Data.Slot)
// Broadcast first using CommitteeId directly (fast path)
// This matches gRPC behavior and avoids blocking on state fetching
wantedEpoch := slots.ToEpoch(singleAtt.Data.Slot)
vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
if err != nil {
return nil, nil, errors.Wrap(err, "could not get head validator indices")
}
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.GetCommitteeIndex(), att.Data.Slot)
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), singleAtt.CommitteeId, singleAtt.Data.Slot)
if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, singleAtt); err != nil {
failedBroadcasts = append(failedBroadcasts, &server.IndexedError{
Index: i,
@@ -264,17 +260,35 @@ func (s *Server) handleAttestationsElectra(
}
continue
}
}
if features.Get().EnableExperimentalAttestationPool {
if err = s.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("Could not save attestation")
// Save to pool after broadcast (slow path - requires state fetching)
// Run in goroutine to avoid blocking the HTTP response
go func() {
for _, singleAtt := range validAttestations {
targetState, err := s.AttestationStateFetcher.AttestationTargetState(context.Background(), singleAtt.Data.Target)
if err != nil {
log.WithError(err).Error("Could not get target state for attestation")
continue
}
} else {
if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not save attestation")
committee, err := corehelpers.BeaconCommitteeFromState(context.Background(), targetState, singleAtt.Data.Slot, singleAtt.CommitteeId)
if err != nil {
log.WithError(err).Error("Could not get committee for attestation")
continue
}
att := singleAtt.ToAttestationElectra(committee)
if features.Get().EnableExperimentalAttestationPool {
if err = s.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("Could not save attestation")
}
} else {
if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not save attestation")
}
}
}
}
}()
if len(failedBroadcasts) > 0 {
log.WithFields(logrus.Fields{
@@ -470,6 +484,10 @@ func (s *Server) SubmitSyncCommitteeSignatures(w http.ResponseWriter, r *http.Re
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitPoolSyncCommitteeSignatures")
defer span.End()
if shared.IsSyncing(ctx, w, s.SyncChecker, s.HeadFetcher, s.TimeFetcher, s.OptimisticModeFetcher) {
return
}
var req structs.SubmitSyncCommitteeSignaturesRequest
err := json.NewDecoder(r.Body).Decode(&req.Data)
switch {

View File

@@ -26,6 +26,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits/mock"
p2pMock "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core"
mockSync "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/initial-sync/testing"
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
@@ -622,6 +623,8 @@ func TestSubmitAttestationsV2(t *testing.T) {
HeadFetcher: chainService,
ChainInfoFetcher: chainService,
TimeFetcher: chainService,
OptimisticModeFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: false},
OperationNotifier: &blockchainmock.MockOperationNotifier{},
AttestationStateFetcher: chainService,
}
@@ -654,6 +657,7 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
time.Sleep(100 * time.Millisecond) // Wait for async pool save
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
})
t.Run("multiple", func(t *testing.T) {
@@ -673,6 +677,7 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, http.StatusOK, writer.Code)
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, 2, broadcaster.NumAttestations())
time.Sleep(100 * time.Millisecond) // Wait for async pool save
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
})
t.Run("phase0 att post electra", func(t *testing.T) {
@@ -793,6 +798,7 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Source.Epoch)
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetData().Target.Root))
assert.Equal(t, primitives.Epoch(0), broadcaster.BroadcastAttestations[0].GetData().Target.Epoch)
time.Sleep(100 * time.Millisecond) // Wait for async pool save
assert.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
})
t.Run("multiple", func(t *testing.T) {
@@ -812,6 +818,7 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, http.StatusOK, writer.Code)
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, 2, broadcaster.NumAttestations())
time.Sleep(100 * time.Millisecond) // Wait for async pool save
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
})
t.Run("no body", func(t *testing.T) {
@@ -861,6 +868,27 @@ func TestSubmitAttestationsV2(t *testing.T) {
assert.Equal(t, true, strings.Contains(e.Failures[0].Message, "Incorrect attestation signature"))
})
})
t.Run("syncing", func(t *testing.T) {
chainService := &blockchainmock.ChainService{}
s := &Server{
HeadFetcher: chainService,
TimeFetcher: chainService,
OptimisticModeFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: true},
}
var body bytes.Buffer
_, err := body.WriteString(singleAtt)
require.NoError(t, err)
request := httptest.NewRequest(http.MethodPost, "http://example.com", &body)
request.Header.Set(api.VersionHeader, version.String(version.Phase0))
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
s.SubmitAttestationsV2(writer, request)
assert.Equal(t, http.StatusServiceUnavailable, writer.Code)
assert.Equal(t, true, strings.Contains(writer.Body.String(), "Beacon node is currently syncing"))
})
}
func TestListVoluntaryExits(t *testing.T) {
@@ -1057,14 +1085,19 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
t.Run("single", func(t *testing.T) {
broadcaster := &p2pMock.MockBroadcaster{}
chainService := &blockchainmock.ChainService{
State: st,
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
}
s := &Server{
HeadFetcher: chainService,
TimeFetcher: chainService,
OptimisticModeFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: false},
CoreService: &core.Service{
SyncCommitteePool: synccommittee.NewStore(),
P2P: broadcaster,
HeadFetcher: &blockchainmock.ChainService{
State: st,
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
},
HeadFetcher: chainService,
},
}
@@ -1089,14 +1122,19 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
})
t.Run("multiple", func(t *testing.T) {
broadcaster := &p2pMock.MockBroadcaster{}
chainService := &blockchainmock.ChainService{
State: st,
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
}
s := &Server{
HeadFetcher: chainService,
TimeFetcher: chainService,
OptimisticModeFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: false},
CoreService: &core.Service{
SyncCommitteePool: synccommittee.NewStore(),
P2P: broadcaster,
HeadFetcher: &blockchainmock.ChainService{
State: st,
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
},
HeadFetcher: chainService,
},
}
@@ -1120,13 +1158,18 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
})
t.Run("invalid", func(t *testing.T) {
broadcaster := &p2pMock.MockBroadcaster{}
chainService := &blockchainmock.ChainService{
State: st,
}
s := &Server{
HeadFetcher: chainService,
TimeFetcher: chainService,
OptimisticModeFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: false},
CoreService: &core.Service{
SyncCommitteePool: synccommittee.NewStore(),
P2P: broadcaster,
HeadFetcher: &blockchainmock.ChainService{
State: st,
},
HeadFetcher: chainService,
},
}
@@ -1149,7 +1192,13 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
assert.Equal(t, false, broadcaster.BroadcastCalled.Load())
})
t.Run("empty", func(t *testing.T) {
s := &Server{}
chainService := &blockchainmock.ChainService{State: st}
s := &Server{
HeadFetcher: chainService,
TimeFetcher: chainService,
OptimisticModeFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
var body bytes.Buffer
_, err := body.WriteString("[]")
@@ -1166,7 +1215,13 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
assert.Equal(t, true, strings.Contains(e.Message, "No data submitted"))
})
t.Run("no body", func(t *testing.T) {
s := &Server{}
chainService := &blockchainmock.ChainService{State: st}
s := &Server{
HeadFetcher: chainService,
TimeFetcher: chainService,
OptimisticModeFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
request := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
writer := httptest.NewRecorder()
@@ -1179,6 +1234,26 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
assert.Equal(t, http.StatusBadRequest, e.Code)
assert.Equal(t, true, strings.Contains(e.Message, "No data submitted"))
})
t.Run("syncing", func(t *testing.T) {
chainService := &blockchainmock.ChainService{State: st}
s := &Server{
HeadFetcher: chainService,
TimeFetcher: chainService,
OptimisticModeFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: true},
}
var body bytes.Buffer
_, err := body.WriteString(singleSyncCommitteeMsg)
require.NoError(t, err)
request := httptest.NewRequest(http.MethodPost, "http://example.com", &body)
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
s.SubmitSyncCommitteeSignatures(writer, request)
assert.Equal(t, http.StatusServiceUnavailable, writer.Code)
assert.Equal(t, true, strings.Contains(writer.Body.String(), "Beacon node is currently syncing"))
})
}
func TestListBLSToExecutionChanges(t *testing.T) {

View File

@@ -40,6 +40,7 @@ func GetForkSchedule(w http.ResponseWriter, r *http.Request) {
httputil.WriteJson(w, &structs.GetForkScheduleResponse{
Data: data,
})
return
}
previous := schedule[0]
for _, entry := range schedule {

View File

@@ -52,24 +52,27 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation
ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestation")
defer span.End()
if vs.SyncChecker.Syncing() {
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
resp, err := vs.proposeAtt(ctx, att, att.GetData().CommitteeIndex)
if err != nil {
return nil, err
}
if features.Get().EnableExperimentalAttestationPool {
if err = vs.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("Could not save attestation")
}
} else {
go func() {
go func() {
if features.Get().EnableExperimentalAttestationPool {
if err := vs.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("Could not save attestation")
}
} else {
attCopy := att.Copy()
if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil {
log.WithError(err).Error("Could not save unaggregated attestation")
return
}
}()
}
}
}()
return resp, nil
}
@@ -82,6 +85,10 @@ func (vs *Server) ProposeAttestationElectra(ctx context.Context, singleAtt *ethp
ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestationElectra")
defer span.End()
if vs.SyncChecker.Syncing() {
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
resp, err := vs.proposeAtt(ctx, singleAtt, singleAtt.GetCommitteeIndex())
if err != nil {
return nil, err
@@ -98,18 +105,17 @@ func (vs *Server) ProposeAttestationElectra(ctx context.Context, singleAtt *ethp
singleAttCopy := singleAtt.Copy()
att := singleAttCopy.ToAttestationElectra(committee)
if features.Get().EnableExperimentalAttestationPool {
if err = vs.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("Could not save attestation")
}
} else {
go func() {
go func() {
if features.Get().EnableExperimentalAttestationPool {
if err := vs.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("Could not save attestation")
}
} else {
if err := vs.AttPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not save unaggregated attestation")
return
}
}()
}
}
}()
return resp, nil
}

View File

@@ -38,6 +38,7 @@ func TestProposeAttestation(t *testing.T) {
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
TimeFetcher: chainService,
AttestationStateFetcher: chainService,
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
head := util.NewBeaconBlock()
head.Block.Slot = 999
@@ -141,6 +142,7 @@ func TestProposeAttestation_IncorrectSignature(t *testing.T) {
P2P: &mockp2p.MockBroadcaster{},
AttPool: attestations.NewPool(),
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
SyncChecker: &mockSync.Sync{IsSyncing: false},
}
req := util.HydrateAttestation(&ethpb.Attestation{})
@@ -149,6 +151,37 @@ func TestProposeAttestation_IncorrectSignature(t *testing.T) {
assert.ErrorContains(t, wanted, err)
}
func TestProposeAttestation_Syncing(t *testing.T) {
attesterServer := &Server{
SyncChecker: &mockSync.Sync{IsSyncing: true},
}
req := util.HydrateAttestation(&ethpb.Attestation{})
_, err := attesterServer.ProposeAttestation(t.Context(), req)
assert.ErrorContains(t, "Syncing to latest head", err)
s, ok := status.FromError(err)
require.Equal(t, true, ok)
assert.Equal(t, codes.Unavailable, s.Code())
}
func TestProposeAttestationElectra_Syncing(t *testing.T) {
attesterServer := &Server{
SyncChecker: &mockSync.Sync{IsSyncing: true},
}
req := &ethpb.SingleAttestation{
Data: &ethpb.AttestationData{
Source: &ethpb.Checkpoint{Root: make([]byte, 32)},
Target: &ethpb.Checkpoint{Root: make([]byte, 32)},
},
}
_, err := attesterServer.ProposeAttestationElectra(t.Context(), req)
assert.ErrorContains(t, "Syncing to latest head", err)
s, ok := status.FromError(err)
require.Equal(t, true, ok)
assert.Equal(t, codes.Unavailable, s.Code())
}
func TestGetAttestationData_OK(t *testing.T) {
block := util.NewBeaconBlock()
block.Block.Slot = 3*params.BeaconConfig().SlotsPerEpoch + 1

View File

@@ -7,7 +7,6 @@ import (
"sync"
"time"
"github.com/OffchainLabs/go-bitfield"
builderapi "github.com/OffchainLabs/prysm/v7/api/client/builder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v7/beacon-chain/builder"
@@ -309,7 +308,6 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
}
rob, err := blocks.NewROBlockWithRoot(block, root)
var partialColumns []blocks.PartialDataColumn
if block.IsBlinded() {
block, blobSidecars, err = vs.handleBlindedBlock(ctx, block)
if errors.Is(err, builderapi.ErrBadGateway) {
@@ -317,7 +315,7 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
}
} else if block.Version() >= version.Deneb {
blobSidecars, dataColumnSidecars, partialColumns, err = vs.handleUnblindedBlock(rob, req)
blobSidecars, dataColumnSidecars, err = vs.handleUnblindedBlock(rob, req)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "%s: %v", "handle block failed", err)
@@ -337,7 +335,7 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
wg.Wait()
if err := vs.broadcastAndReceiveSidecars(ctx, block, root, blobSidecars, dataColumnSidecars, partialColumns); err != nil {
if err := vs.broadcastAndReceiveSidecars(ctx, block, root, blobSidecars, dataColumnSidecars); err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive sidecars: %v", err)
}
if err := <-errChan; err != nil {
@@ -354,10 +352,9 @@ func (vs *Server) broadcastAndReceiveSidecars(
root [fieldparams.RootLength]byte,
blobSidecars []*ethpb.BlobSidecar,
dataColumnSidecars []blocks.RODataColumn,
partialColumns []blocks.PartialDataColumn,
) error {
if block.Version() >= version.Fulu {
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSidecars, partialColumns); err != nil {
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSidecars); err != nil {
return errors.Wrap(err, "broadcast and receive data columns")
}
return nil
@@ -406,41 +403,34 @@ func (vs *Server) handleBlindedBlock(ctx context.Context, block interfaces.Signe
func (vs *Server) handleUnblindedBlock(
block blocks.ROBlock,
req *ethpb.GenericSignedBeaconBlock,
) ([]*ethpb.BlobSidecar, []blocks.RODataColumn, []blocks.PartialDataColumn, error) {
) ([]*ethpb.BlobSidecar, []blocks.RODataColumn, error) {
rawBlobs, proofs, err := blobsAndProofs(req)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
if block.Version() >= version.Fulu {
// Compute cells and proofs from the blobs and cell proofs.
cellsPerBlob, proofsPerBlob, err := peerdas.ComputeCellsAndProofsFromFlat(rawBlobs, proofs)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "compute cells and proofs")
return nil, nil, errors.Wrap(err, "compute cells and proofs")
}
// Construct data column sidecars from the signed block and cells and proofs.
roDataColumnSidecars, err := peerdas.DataColumnSidecars(cellsPerBlob, proofsPerBlob, peerdas.PopulateFromBlock(block))
if err != nil {
return nil, nil, nil, errors.Wrap(err, "data column sidcars")
return nil, nil, errors.Wrap(err, "data column sidcars")
}
included := bitfield.NewBitlist(uint64(len(cellsPerBlob)))
included = included.Not() // all bits set to 1
partialColumns, err := peerdas.PartialColumns(included, cellsPerBlob, proofsPerBlob, peerdas.PopulateFromBlock(block))
if err != nil {
return nil, nil, nil, errors.Wrap(err, "data column sidcars")
}
return nil, roDataColumnSidecars, partialColumns, nil
return nil, roDataColumnSidecars, nil
}
blobSidecars, err := BuildBlobSidecars(block, rawBlobs, proofs)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "build blob sidecars")
return nil, nil, errors.Wrap(err, "build blob sidecars")
}
return blobSidecars, nil, nil, nil
return blobSidecars, nil, nil
}
// broadcastReceiveBlock broadcasts a block and handles its reception.
@@ -507,7 +497,7 @@ func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethp
}
// broadcastAndReceiveDataColumns handles the broadcasting and reception of data columns sidecars.
func (vs *Server) broadcastAndReceiveDataColumns(ctx context.Context, roSidecars []blocks.RODataColumn, partialColumns []blocks.PartialDataColumn) error {
func (vs *Server) broadcastAndReceiveDataColumns(ctx context.Context, roSidecars []blocks.RODataColumn) error {
// We built this block ourselves, so we can upgrade the read only data column sidecar into a verified one.
verifiedSidecars := make([]blocks.VerifiedRODataColumn, 0, len(roSidecars))
for _, sidecar := range roSidecars {
@@ -516,7 +506,7 @@ func (vs *Server) broadcastAndReceiveDataColumns(ctx context.Context, roSidecars
}
// Broadcast sidecars (non blocking).
if err := vs.P2P.BroadcastDataColumnSidecars(ctx, verifiedSidecars, partialColumns); err != nil {
if err := vs.P2P.BroadcastDataColumnSidecars(ctx, verifiedSidecars); err != nil {
return errors.Wrap(err, "broadcast data column sidecars")
}

View File

@@ -58,7 +58,6 @@ go_library(
"validate_bls_to_execution_change.go",
"validate_data_column.go",
"validate_light_client.go",
"validate_partial_header.go",
"validate_proposer_slashing.go",
"validate_sync_committee_message.go",
"validate_sync_contribution_proof.go",
@@ -99,7 +98,6 @@ go_library(
"//beacon-chain/operations/voluntaryexits:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/partialdatacolumnbroadcaster:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/types:go_default_library",
"//beacon-chain/slasher/types:go_default_library",

View File

@@ -2,7 +2,6 @@ package sync
import (
"context"
"iter"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
@@ -23,16 +22,9 @@ type signatureVerifier struct {
resChan chan error
}
type errorWithSegment struct {
err error
// segment is only available if the batched verification failed
segment peerdas.CellProofBundleSegment
}
type kzgVerifier struct {
sizeHint int
cellProofs iter.Seq[blocks.CellProofBundle]
resChan chan errorWithSegment
dataColumns []blocks.RODataColumn
resChan chan error
}
// A routine that runs in the background to perform batch
@@ -67,9 +59,8 @@ func (s *Service) verifierRoutine() {
// A routine that runs in the background to perform batch
// KZG verifications by draining the channel and processing all pending requests.
func (s *Service) kzgVerifierRoutine() {
kzgBatch := make([]*kzgVerifier, 0, 1)
for {
kzgBatch = kzgBatch[:0]
kzgBatch := make([]*kzgVerifier, 0, 1)
select {
case <-s.ctx.Done():
return
@@ -165,74 +156,46 @@ func performBatchAggregation(aggSet *bls.SignatureBatch) (*bls.SignatureBatch, e
}
func (s *Service) validateWithKzgBatchVerifier(ctx context.Context, dataColumns []blocks.RODataColumn) (pubsub.ValidationResult, error) {
if len(dataColumns) == 0 {
return pubsub.ValidationReject, errors.New("no data columns provided")
}
// If there are no cells in this column, we can return early since there's nothing to validate.
allEmpty := true
for _, column := range dataColumns {
if len(column.Column) != 0 {
allEmpty = false
break
}
}
if allEmpty {
return pubsub.ValidationAccept, nil
}
sizeHint := len(dataColumns[0].Column) * len(dataColumns)
err := s.validateKZGProofs(ctx, sizeHint, blocks.RODataColumnsToCellProofBundles(dataColumns))
if ctx.Err() != nil {
return pubsub.ValidationIgnore, ctx.Err()
} else if err != nil {
return pubsub.ValidationReject, err
}
return pubsub.ValidationAccept, nil
}
func (s *Service) validateKZGProofs(ctx context.Context, sizeHint int, cellProofs iter.Seq[blocks.CellProofBundle]) error {
_, span := trace.StartSpan(ctx, "sync.validateKZGProofs")
_, span := trace.StartSpan(ctx, "sync.validateWithKzgBatchVerifier")
defer span.End()
timeout := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
resChan := make(chan errorWithSegment, 1)
verificationSet := &kzgVerifier{sizeHint: sizeHint, cellProofs: cellProofs, resChan: resChan}
resChan := make(chan error, 1)
verificationSet := &kzgVerifier{dataColumns: dataColumns, resChan: resChan}
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
select {
case s.kzgChan <- verificationSet:
case <-ctx.Done():
return ctx.Err()
return pubsub.ValidationIgnore, ctx.Err()
}
select {
case <-ctx.Done():
return ctx.Err() // parent context canceled, give up
case errWithSegment := <-resChan:
if errWithSegment.err != nil {
err := errWithSegment.err
log.WithError(err).Trace("Could not perform batch verification of cells")
return pubsub.ValidationIgnore, ctx.Err() // parent context canceled, give up
case err := <-resChan:
if err != nil {
log.WithError(err).Trace("Could not perform batch verification")
tracing.AnnotateError(span, err)
// We failed batch verification. Try again in this goroutine without batching
return validateUnbatchedKZGProofs(ctx, errWithSegment.segment)
return s.validateUnbatchedColumnsKzg(ctx, dataColumns)
}
}
return nil
return pubsub.ValidationAccept, nil
}
func validateUnbatchedKZGProofs(ctx context.Context, segment peerdas.CellProofBundleSegment) error {
func (s *Service) validateUnbatchedColumnsKzg(ctx context.Context, columns []blocks.RODataColumn) (pubsub.ValidationResult, error) {
_, span := trace.StartSpan(ctx, "sync.validateUnbatchedColumnsKzg")
defer span.End()
start := time.Now()
if err := segment.Verify(); err != nil {
if err := peerdas.VerifyDataColumnsSidecarKZGProofs(columns); err != nil {
err = errors.Wrap(err, "could not verify")
tracing.AnnotateError(span, err)
return err
return pubsub.ValidationReject, err
}
verification.DataColumnBatchKZGVerificationHistogram.WithLabelValues("fallback").Observe(float64(time.Since(start).Milliseconds()))
return nil
return pubsub.ValidationAccept, nil
}
func verifyKzgBatch(kzgBatch []*kzgVerifier) {
@@ -240,16 +203,14 @@ func verifyKzgBatch(kzgBatch []*kzgVerifier) {
return
}
cellProofIters := make([]iter.Seq[blocks.CellProofBundle], 0, len(kzgBatch))
var sizeHint int
allDataColumns := make([]blocks.RODataColumn, 0, len(kzgBatch))
for _, kzgVerifier := range kzgBatch {
sizeHint += kzgVerifier.sizeHint
cellProofIters = append(cellProofIters, kzgVerifier.cellProofs)
allDataColumns = append(allDataColumns, kzgVerifier.dataColumns...)
}
var verificationErr error
start := time.Now()
segments, err := peerdas.BatchVerifyDataColumnsCellsKZGProofs(sizeHint, cellProofIters)
err := peerdas.VerifyDataColumnsSidecarKZGProofs(allDataColumns)
if err != nil {
verificationErr = errors.Wrap(err, "batch KZG verification failed")
} else {
@@ -257,14 +218,7 @@ func verifyKzgBatch(kzgBatch []*kzgVerifier) {
}
// Send the same result to all verifiers in the batch
for i, verifier := range kzgBatch {
var segment peerdas.CellProofBundleSegment
if verificationErr != nil {
segment = segments[i]
}
verifier.resChan <- errorWithSegment{
err: verificationErr,
segment: segment,
}
for _, verifier := range kzgBatch {
verifier.resChan <- verificationErr
}
}

View File

@@ -3,6 +3,7 @@ package sync
import (
"bytes"
"context"
"fmt"
"maps"
"slices"
"sync"
@@ -243,8 +244,10 @@ func requestDirectSidecarsFromPeers(
}
// Compute missing indices by root, excluding those already in storage.
var lastRoot [fieldparams.RootLength]byte
missingIndicesByRoot := make(map[[fieldparams.RootLength]byte]map[uint64]bool, len(incompleteRoots))
for root := range incompleteRoots {
lastRoot = root
storedIndices := storedIndicesByRoot[root]
missingIndices := make(map[uint64]bool, len(requestedIndices))
@@ -259,6 +262,7 @@ func requestDirectSidecarsFromPeers(
}
}
initialMissingRootCount := len(missingIndicesByRoot)
initialMissingCount := computeTotalCount(missingIndicesByRoot)
indicesByRootByPeer, err := computeIndicesByRootByPeer(params.P2P, slotByRoot, missingIndicesByRoot, connectedPeers)
@@ -301,11 +305,19 @@ func requestDirectSidecarsFromPeers(
}
}
log.WithFields(logrus.Fields{
"duration": time.Since(start),
"initialMissingCount": initialMissingCount,
"finalMissingCount": computeTotalCount(missingIndicesByRoot),
}).Debug("Requested direct data column sidecars from peers")
log := log.WithFields(logrus.Fields{
"duration": time.Since(start),
"initialMissingRootCount": initialMissingRootCount,
"initialMissingCount": initialMissingCount,
"finalMissingRootCount": len(missingIndicesByRoot),
"finalMissingCount": computeTotalCount(missingIndicesByRoot),
})
if initialMissingRootCount == 1 {
log = log.WithField("root", fmt.Sprintf("%#x", lastRoot))
}
log.Debug("Requested direct data column sidecars from peers")
return verifiedColumnsByRoot, nil
}

View File

@@ -88,12 +88,12 @@ func TestVerifierRoutine(t *testing.T) {
go service.kzgVerifierRoutine()
dataColumns := createValidTestDataColumns(t, 1)
resChan := make(chan errorWithSegment, 1)
service.kzgChan <- &kzgVerifier{sizeHint: 1, cellProofs: blocks.RODataColumnsToCellProofBundles(dataColumns), resChan: resChan}
resChan := make(chan error, 1)
service.kzgChan <- &kzgVerifier{dataColumns: dataColumns, resChan: resChan}
select {
case errWithSegment := <-resChan:
require.NoError(t, errWithSegment.err)
case err := <-resChan:
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("timeout waiting for verification result")
}
@@ -109,19 +109,19 @@ func TestVerifierRoutine(t *testing.T) {
go service.kzgVerifierRoutine()
const numRequests = 5
resChans := make([]chan errorWithSegment, numRequests)
resChans := make([]chan error, numRequests)
for i := range numRequests {
dataColumns := createValidTestDataColumns(t, 1)
resChan := make(chan errorWithSegment, 1)
resChan := make(chan error, 1)
resChans[i] = resChan
service.kzgChan <- &kzgVerifier{sizeHint: 1, cellProofs: blocks.RODataColumnsToCellProofBundles(dataColumns), resChan: resChan}
service.kzgChan <- &kzgVerifier{dataColumns: dataColumns, resChan: resChan}
}
for i := range numRequests {
select {
case errWithSegment := <-resChans[i]:
require.NoError(t, errWithSegment.err)
case err := <-resChans[i]:
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatalf("timeout waiting for verification result %d", i)
}
@@ -158,14 +158,14 @@ func TestVerifyKzgBatch(t *testing.T) {
t.Run("all valid data columns succeed", func(t *testing.T) {
dataColumns := createValidTestDataColumns(t, 3)
resChan := make(chan errorWithSegment, 1)
kzgVerifiers := []*kzgVerifier{{sizeHint: 3, cellProofs: blocks.RODataColumnsToCellProofBundles(dataColumns), resChan: resChan}}
resChan := make(chan error, 1)
kzgVerifiers := []*kzgVerifier{{dataColumns: dataColumns, resChan: resChan}}
verifyKzgBatch(kzgVerifiers)
select {
case errWithSegment := <-resChan:
require.NoError(t, errWithSegment.err)
case err := <-resChan:
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("timeout waiting for batch verification")
}
@@ -176,14 +176,14 @@ func TestVerifyKzgBatch(t *testing.T) {
invalidColumns := createInvalidTestDataColumns(t, 1)
allColumns := append(validColumns, invalidColumns...)
resChan := make(chan errorWithSegment, 1)
kzgVerifiers := []*kzgVerifier{{sizeHint: 2, cellProofs: blocks.RODataColumnsToCellProofBundles(allColumns), resChan: resChan}}
resChan := make(chan error, 1)
kzgVerifiers := []*kzgVerifier{{dataColumns: allColumns, resChan: resChan}}
verifyKzgBatch(kzgVerifiers)
select {
case errWithSegment := <-resChan:
assert.NotNil(t, errWithSegment.err)
case err := <-resChan:
assert.NotNil(t, err)
case <-time.After(time.Second):
t.Fatal("timeout waiting for batch verification")
}

View File

@@ -256,16 +256,6 @@ var (
Help: "Count the number of data column sidecars obtained via the execution layer.",
},
)
usefulFullColumnsReceivedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "beacon_useful_full_columns_received_total",
Help: "Number of useful full columns (any cell being useful) received",
}, []string{"column_index"})
partialMessageColumnCompletionsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "beacon_partial_message_column_completions_total",
Help: "How often the partial message first completed the column",
}, []string{"column_index"})
)
func (s *Service) updateMetrics() {

View File

@@ -3,9 +3,9 @@ package sync
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"slices"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/blocks"
@@ -21,13 +21,23 @@ import (
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/time"
"github.com/OffchainLabs/prysm/v7/time/slots"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var pendingAttsLimit = 32768
const pendingAttsLimit = 32768
// aggregatorIndexFilter defines how aggregator index should be handled in equality checks.
type aggregatorIndexFilter int
const (
// ignoreAggregatorIndex means aggregates differing only by aggregator index are considered equal.
ignoreAggregatorIndex aggregatorIndexFilter = iota
// includeAggregatorIndex means aggregator index must also match for aggregates to be considered equal.
includeAggregatorIndex
)
// This method processes pending attestations as a "known" block as arrived. With validations,
// the valid attestations get saved into the operation mem pool, and the invalid attestations gets deleted
@@ -50,16 +60,7 @@ func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte
attestations := s.blkRootToPendingAtts[bRoot]
s.pendingAttsLock.RUnlock()
if len(attestations) > 0 {
start := time.Now()
s.processAttestations(ctx, attestations)
duration := time.Since(start)
log.WithFields(logrus.Fields{
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
"pendingAttsCount": len(attestations),
"duration": duration,
}).Debug("Verified and saved pending attestations to pool")
}
s.processAttestations(ctx, attestations)
randGen := rand.NewGenerator()
// Delete the missing block root key from pending attestation queue so a node will not request for the block again.
@@ -79,26 +80,71 @@ func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte
return s.sendBatchRootRequest(ctx, pendingRoots, randGen)
}
// processAttestations processes a list of attestations.
// It assumes (for logging purposes only) that all attestations pertain to the same block.
func (s *Service) processAttestations(ctx context.Context, attestations []any) {
if len(attestations) == 0 {
return
}
firstAttestation := attestations[0]
var blockRoot []byte
switch v := firstAttestation.(type) {
case ethpb.Att:
blockRoot = v.GetData().BeaconBlockRoot
case ethpb.SignedAggregateAttAndProof:
blockRoot = v.AggregateAttestationAndProof().AggregateVal().GetData().BeaconBlockRoot
default:
log.Warnf("Unexpected attestation type %T, skipping processing", v)
return
}
validAggregates := make([]ethpb.SignedAggregateAttAndProof, 0, len(attestations))
startAggregate := time.Now()
atts := make([]ethpb.Att, 0, len(attestations))
aggregateAttAndProofCount := 0
for _, att := range attestations {
switch v := att.(type) {
case ethpb.Att:
atts = append(atts, v)
case ethpb.SignedAggregateAttAndProof:
s.processAggregate(ctx, v)
aggregateAttAndProofCount++
// Avoid processing multiple aggregates only differing by aggregator index.
if slices.ContainsFunc(validAggregates, func(other ethpb.SignedAggregateAttAndProof) bool {
return pendingAggregatesAreEqual(v, other, ignoreAggregatorIndex)
}) {
continue
}
if err := s.processAggregate(ctx, v); err != nil {
log.WithError(err).Debug("Pending aggregate attestation could not be processed")
continue
}
validAggregates = append(validAggregates, v)
default:
log.Warnf("Unexpected attestation type %T, skipping", v)
}
}
durationAggregateAttAndProof := time.Since(startAggregate)
startAtts := time.Now()
for _, bucket := range bucketAttestationsByData(atts) {
s.processAttestationBucket(ctx, bucket)
}
durationAtts := time.Since(startAtts)
log.WithFields(logrus.Fields{
"blockRoot": fmt.Sprintf("%#x", blockRoot),
"totalCount": len(attestations),
"aggregateAttAndProofCount": aggregateAttAndProofCount,
"uniqueAggregateAttAndProofCount": len(validAggregates),
"attCount": len(atts),
"durationTotal": durationAggregateAttAndProof + durationAtts,
"durationAggregateAttAndProof": durationAggregateAttAndProof,
"durationAtts": durationAtts,
}).Debug("Verified and saved pending attestations to pool")
}
// attestationBucket groups attestations with the same AttestationData for batch processing.
@@ -303,21 +349,20 @@ func (s *Service) processVerifiedAttestation(
})
}
func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAggregateAttAndProof) {
func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAggregateAttAndProof) error {
res, err := s.validateAggregatedAtt(ctx, aggregate)
if err != nil {
log.WithError(err).Debug("Pending aggregated attestation failed validation")
return
return errors.Wrap(err, "validate aggregated att")
}
if res != pubsub.ValidationAccept || !s.validateBlockInAttestation(ctx, aggregate) {
log.Debug("Pending aggregated attestation failed validation")
return
return errors.New("Pending aggregated attestation failed validation")
}
att := aggregate.AggregateAttestationAndProof().AggregateVal()
if err := s.saveAttestation(att); err != nil {
log.WithError(err).Debug("Could not save aggregated attestation")
return
return errors.Wrap(err, "save attestation")
}
_ = s.setAggregatorIndexEpochSeen(att.GetData().Target.Epoch, aggregate.AggregateAttestationAndProof().GetAggregatorIndex())
@@ -325,6 +370,8 @@ func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAg
if err := s.cfg.p2p.Broadcast(ctx, aggregate); err != nil {
log.WithError(err).Debug("Could not broadcast aggregated attestation")
}
return nil
}
// This defines how pending aggregates are saved in the map. The key is the
@@ -336,7 +383,7 @@ func (s *Service) savePendingAggregate(agg ethpb.SignedAggregateAttAndProof) {
s.savePending(root, agg, func(other any) bool {
a, ok := other.(ethpb.SignedAggregateAttAndProof)
return ok && pendingAggregatesAreEqual(agg, a)
return ok && pendingAggregatesAreEqual(agg, a, includeAggregatorIndex)
})
}
@@ -391,13 +438,19 @@ func (s *Service) savePending(root [32]byte, pending any, isEqual func(other any
s.blkRootToPendingAtts[root] = append(s.blkRootToPendingAtts[root], pending)
}
func pendingAggregatesAreEqual(a, b ethpb.SignedAggregateAttAndProof) bool {
// pendingAggregatesAreEqual checks if two pending aggregate attestations are equal.
// The filter parameter controls whether aggregator index is considered in the equality check.
func pendingAggregatesAreEqual(a, b ethpb.SignedAggregateAttAndProof, filter aggregatorIndexFilter) bool {
if a.Version() != b.Version() {
return false
}
if a.AggregateAttestationAndProof().GetAggregatorIndex() != b.AggregateAttestationAndProof().GetAggregatorIndex() {
return false
if filter == includeAggregatorIndex {
if a.AggregateAttestationAndProof().GetAggregatorIndex() != b.AggregateAttestationAndProof().GetAggregatorIndex() {
return false
}
}
aAtt := a.AggregateAttestationAndProof().AggregateVal()
bAtt := b.AggregateAttestationAndProof().AggregateVal()
if aAtt.GetData().Slot != bAtt.GetData().Slot {

View File

@@ -94,7 +94,7 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) {
// Process block A (which exists and has no pending attestations)
// This should skip processing attestations for A and request blocks B and C
require.NoError(t, r.processPendingAttsForBlock(t.Context(), rootA))
require.LogsContain(t, hook, "Requesting block by root")
require.LogsContain(t, hook, "Requesting blocks by root")
}
func TestProcessPendingAtts_HasBlockSaveUnaggregatedAtt(t *testing.T) {
@@ -911,17 +911,17 @@ func Test_pendingAggregatesAreEqual(t *testing.T) {
},
AggregationBits: bitfield.Bitlist{0b1111},
}}}
assert.Equal(t, true, pendingAggregatesAreEqual(a, b))
assert.Equal(t, true, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
})
t.Run("different version", func(t *testing.T) {
a := &ethpb.SignedAggregateAttestationAndProof{Message: &ethpb.AggregateAttestationAndProof{AggregatorIndex: 1}}
b := &ethpb.SignedAggregateAttestationAndProofElectra{Message: &ethpb.AggregateAttestationAndProofElectra{AggregatorIndex: 1}}
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
})
t.Run("different aggregator index", func(t *testing.T) {
a := &ethpb.SignedAggregateAttestationAndProof{Message: &ethpb.AggregateAttestationAndProof{AggregatorIndex: 1}}
b := &ethpb.SignedAggregateAttestationAndProof{Message: &ethpb.AggregateAttestationAndProof{AggregatorIndex: 2}}
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
})
t.Run("different slot", func(t *testing.T) {
a := &ethpb.SignedAggregateAttestationAndProof{
@@ -942,7 +942,7 @@ func Test_pendingAggregatesAreEqual(t *testing.T) {
},
AggregationBits: bitfield.Bitlist{0b1111},
}}}
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
})
t.Run("different committee index", func(t *testing.T) {
a := &ethpb.SignedAggregateAttestationAndProof{
@@ -963,7 +963,7 @@ func Test_pendingAggregatesAreEqual(t *testing.T) {
},
AggregationBits: bitfield.Bitlist{0b1111},
}}}
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
})
t.Run("different aggregation bits", func(t *testing.T) {
a := &ethpb.SignedAggregateAttestationAndProof{
@@ -984,7 +984,30 @@ func Test_pendingAggregatesAreEqual(t *testing.T) {
},
AggregationBits: bitfield.Bitlist{0b1000},
}}}
assert.Equal(t, false, pendingAggregatesAreEqual(a, b))
assert.Equal(t, false, pendingAggregatesAreEqual(a, b, includeAggregatorIndex))
})
t.Run("different aggregator index should be equal while ignoring aggregator index", func(t *testing.T) {
a := &ethpb.SignedAggregateAttestationAndProof{
Message: &ethpb.AggregateAttestationAndProof{
AggregatorIndex: 1,
Aggregate: &ethpb.Attestation{
Data: &ethpb.AttestationData{
Slot: 1,
CommitteeIndex: 1,
},
AggregationBits: bitfield.Bitlist{0b1111},
}}}
b := &ethpb.SignedAggregateAttestationAndProof{
Message: &ethpb.AggregateAttestationAndProof{
AggregatorIndex: 2,
Aggregate: &ethpb.Attestation{
Data: &ethpb.AttestationData{
Slot: 1,
CommitteeIndex: 1,
},
AggregationBits: bitfield.Bitlist{0b1111},
}}}
assert.Equal(t, true, pendingAggregatesAreEqual(a, b, ignoreAggregatorIndex))
})
}

View File

@@ -2,7 +2,6 @@ package sync
import (
"context"
"encoding/hex"
"fmt"
"slices"
"sync"
@@ -44,11 +43,13 @@ func (s *Service) processPendingBlocksQueue() {
if !s.chainIsStarted() {
return
}
locker.Lock()
defer locker.Unlock()
if err := s.processPendingBlocks(s.ctx); err != nil {
log.WithError(err).Debug("Could not process pending blocks")
}
locker.Unlock()
})
}
@@ -73,8 +74,10 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
randGen := rand.NewGenerator()
var parentRoots [][32]byte
blkRoots := make([][32]byte, 0, len(sortedSlots)*maxBlocksPerSlot)
// Iterate through sorted slots.
for _, slot := range sortedSlots {
for i, slot := range sortedSlots {
// Skip processing if slot is in the future.
if slot > s.cfg.clock.CurrentSlot() {
continue
@@ -91,6 +94,9 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
// Process each block in the queue.
for _, b := range blocksInCache {
start := time.Now()
totalDuration := time.Duration(0)
if err := blocks.BeaconBlockIsNil(b); err != nil {
continue
}
@@ -147,19 +153,34 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
}
cancelFunction()
// Process pending attestations for this block.
if err := s.processPendingAttsForBlock(ctx, blkRoot); err != nil {
log.WithError(err).Debug("Failed to process pending attestations for block")
}
blkRoots = append(blkRoots, blkRoot)
// Remove the processed block from the queue.
if err := s.removeBlockFromQueue(b, blkRoot); err != nil {
return err
}
log.WithFields(logrus.Fields{"slot": slot, "blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:]))}).Debug("Processed pending block and cleared it in cache")
duration := time.Since(start)
totalDuration += duration
log.WithFields(logrus.Fields{
"slotIndex": fmt.Sprintf("%d/%d", i+1, len(sortedSlots)),
"slot": slot,
"root": fmt.Sprintf("%#x", blkRoot),
"duration": duration,
"totalDuration": totalDuration,
}).Debug("Processed pending block and cleared it in cache")
}
span.End()
}
for _, blkRoot := range blkRoots {
// Process pending attestations for this block.
if err := s.processPendingAttsForBlock(ctx, blkRoot); err != nil {
log.WithError(err).Debug("Failed to process pending attestations for block")
}
}
return s.sendBatchRootRequest(ctx, parentRoots, randGen)
}
@@ -379,6 +400,19 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra
req = roots[:maxReqBlock]
}
if logrus.GetLevel() >= logrus.DebugLevel {
rootsStr := make([]string, 0, len(roots))
for _, req := range roots {
rootsStr = append(rootsStr, fmt.Sprintf("%#x", req))
}
log.WithFields(logrus.Fields{
"peer": pid,
"count": len(req),
"roots": rootsStr,
}).Debug("Requesting blocks by root")
}
// Send the request to the peer.
if err := s.sendBeaconBlocksRequest(ctx, &req, pid); err != nil {
tracing.AnnotateError(span, err)
@@ -438,8 +472,6 @@ func (s *Service) filterOutPendingAndSynced(roots [][fieldparams.RootLength]byte
roots = append(roots[:i], roots[i+1:]...)
continue
}
log.WithField("blockRoot", fmt.Sprintf("%#x", r)).Debug("Requesting block by root")
}
return roots
}

View File

@@ -5,8 +5,6 @@ import (
"fmt"
"reflect"
"runtime/debug"
"slices"
"strconv"
"strings"
"sync"
"time"
@@ -16,13 +14,11 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v7/config/features"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
@@ -65,15 +61,6 @@ type subscribeParameters struct {
// getSubnetsRequiringPeers is a function that returns all subnets that require peers to be found
// but for which no subscriptions are needed.
getSubnetsRequiringPeers func(currentSlot primitives.Slot) map[uint64]bool
partial *partialSubscribeParameters
}
type partialSubscribeParameters struct {
broadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster
validateHeader partialdatacolumnbroadcaster.HeaderValidator
validate partialdatacolumnbroadcaster.ColumnValidator
handle partialdatacolumnbroadcaster.SubHandler
}
// shortTopic is a less verbose version of topic strings used for logging.
@@ -333,35 +320,6 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
// New gossip topic in Fulu.
if params.BeaconConfig().FuluForkEpoch <= nse.Epoch {
s.spawn(func() {
var ps *partialSubscribeParameters
broadcaster := s.cfg.p2p.PartialColumnBroadcaster()
if broadcaster != nil {
ps = &partialSubscribeParameters{
broadcaster: broadcaster,
validateHeader: func(header *ethpb.PartialDataColumnHeader) (bool, error) {
return s.validatePartialDataColumnHeader(context.TODO(), header)
},
validate: func(cellsToVerify []blocks.CellProofBundle) error {
return s.validateKZGProofs(context.TODO(), len(cellsToVerify), slices.Values(cellsToVerify))
},
handle: func(topic string, col blocks.VerifiedRODataColumn) {
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
defer cancel()
slot := col.SignedBlockHeader.Header.Slot
proposerIndex := col.SignedBlockHeader.Header.ProposerIndex
if !s.hasSeenDataColumnIndex(slot, proposerIndex, col.Index) {
s.setSeenDataColumnIndex(slot, proposerIndex, col.Index)
// This column was completed from a partial message.
partialMessageColumnCompletionsTotal.WithLabelValues(strconv.FormatUint(col.Index, 10)).Inc()
}
err := s.verifiedRODataColumnSubscriber(ctx, col)
if err != nil {
log.WithError(err).Error("Failed to handle verified RO data column subscriber")
}
},
}
}
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.DataColumnSubnetTopicFormat,
validate: s.validateDataColumn,
@@ -369,7 +327,6 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
nse: nse,
getSubnetsToJoin: s.dataColumnSubnetIndices,
getSubnetsRequiringPeers: s.allDataColumnSubnets,
partial: ps,
})
})
}
@@ -408,10 +365,11 @@ func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandle
// Impossible condition as it would mean topic does not exist.
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) // lint:nopanic -- Impossible condition.
}
s.subscribeWithBase(s.addDigestToTopic(topic, nse.ForkDigest)+s.cfg.p2p.Encoding().ProtocolSuffix(), validator, handle)
s.subscribeWithBase(s.addDigestToTopic(topic, nse.ForkDigest), validator, handle)
}
func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription {
topic += s.cfg.p2p.Encoding().ProtocolSuffix()
log := log.WithField("topic", topic)
// Do not resubscribe already seen subscriptions.
@@ -574,11 +532,7 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
func (s *Service) pruneNotWanted(t *subnetTracker, wantedSubnets map[uint64]bool) {
for _, subnet := range t.unwanted(wantedSubnets) {
t.cancelSubscription(subnet)
topic := t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix())
if t.partial != nil {
_ = t.partial.broadcaster.Unsubscribe(topic)
}
s.unSubscribeFromTopic(topic)
s.unSubscribeFromTopic(t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix()))
}
}
@@ -625,34 +579,9 @@ func (s *Service) trySubscribeSubnets(t *subnetTracker) {
subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot())
s.pruneNotWanted(t, subnetsToJoin)
for _, subnet := range t.missing(subnetsToJoin) {
topicStr := t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix())
topicOpts := make([]pubsub.TopicOpt, 0, 2)
requestPartial := t.partial != nil
if requestPartial {
// TODO: do we want the ability to support partial messages without requesting them?
topicOpts = append(topicOpts, pubsub.RequestPartialMessages())
}
topic, err := s.cfg.p2p.JoinTopic(topicStr, topicOpts...)
if err != nil {
log.WithError(err).Error("Failed to join topic")
return
}
if requestPartial {
log.Info("Subscribing to partial columns on", topicStr)
err = t.partial.broadcaster.Subscribe(topic, t.partial.validateHeader, t.partial.validate, t.partial.handle)
if err != nil {
log.WithError(err).Error("Failed to subscribe to partial column")
}
}
// We still need to subscribe to the full columns as well as partial in
// case our peers don't support partial messages.
t.track(subnet, s.subscribeWithBase(topicStr, t.validate, t.handle))
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
topic := t.fullTopic(subnet, "")
t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle))
}
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition/interop"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v7/config/features"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
@@ -190,10 +189,6 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
ctx, cancel := context.WithTimeout(ctx, secondsPerHalfSlot)
defer cancel()
digest, err := s.currentForkDigest()
if err != nil {
return nil, err
}
log := log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", source.Root()),
"slot": source.Slot(),
@@ -219,30 +214,11 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
}
// Try to reconstruct data column constructedSidecars from the execution client.
constructedSidecars, partialColumns, err := s.cfg.executionReconstructor.ConstructDataColumnSidecars(ctx, source)
constructedSidecars, err := s.cfg.executionReconstructor.ConstructDataColumnSidecars(ctx, source)
if err != nil {
return nil, errors.Wrap(err, "reconstruct data column sidecars")
}
partialBroadcaster := s.cfg.p2p.PartialColumnBroadcaster()
if partialBroadcaster != nil {
log.WithField("len(partialColumns)", len(partialColumns)).Debug("Publishing partial columns")
for i := range uint64(len(partialColumns)) {
if !columnIndicesToSample[i] {
continue
}
subnet := peerdas.ComputeSubnetForDataColumnSidecar(i)
topic := fmt.Sprintf(p2p.DataColumnSubnetTopicFormat, digest, subnet) + s.cfg.p2p.Encoding().ProtocolSuffix()
// Publish the partial column. This is idempotent if we republish the same data twice.
// Note, the "partial column" may indeed be complete. We still
// should publish to help our peers.
err = partialBroadcaster.Publish(topic, partialColumns[i])
if err != nil {
log.WithError(err).Warn("Failed to publish partial column")
}
}
}
// No sidecars are retrieved from the EL, retry later
constructedSidecarCount = uint64(len(constructedSidecars))
if constructedSidecarCount == 0 {
@@ -309,7 +285,7 @@ func (s *Service) broadcastAndReceiveUnseenDataColumnSidecars(
}
// Broadcast all the data column sidecars we reconstructed but did not see via gossip (non blocking).
if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, unseenSidecars, nil); err != nil {
if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, unseenSidecars); err != nil {
return nil, errors.Wrap(err, "broadcast data column sidecars")
}

View File

@@ -3,7 +3,6 @@ package sync
import (
"context"
"fmt"
"strconv"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed"
opfeed "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/operation"
@@ -25,13 +24,6 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
return fmt.Errorf("message was not type blocks.VerifiedRODataColumn, type=%T", msg)
}
// Track useful full columns received via gossip (not previously seen)
slot := sidecar.SignedBlockHeader.Header.Slot
proposerIndex := sidecar.SignedBlockHeader.Header.ProposerIndex
if !s.hasSeenDataColumnIndex(slot, proposerIndex, sidecar.Index) {
usefulFullColumnsReceivedTotal.WithLabelValues(strconv.FormatUint(sidecar.Index, 10)).Inc()
}
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
return errors.Wrap(err, "receive data column sidecar")
}
@@ -59,38 +51,6 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
return nil
}
func (s *Service) verifiedRODataColumnSubscriber(ctx context.Context, sidecar blocks.VerifiedRODataColumn) error {
log.WithField("slot", sidecar.Slot()).WithField("column", sidecar.Index).Info("Received data column sidecar")
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
return errors.Wrap(err, "receive data column sidecar")
}
var wg errgroup.Group
wg.Go(func() error {
if err := s.processDataColumnSidecarsFromReconstruction(ctx, sidecar); err != nil {
return errors.Wrap(err, "process data column sidecars from reconstruction")
}
return nil
})
wg.Go(func() error {
// Broadcast our complete column for peers that don't use partial messages
if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{sidecar}, nil); err != nil {
return errors.Wrap(err, "process data column sidecars from execution")
}
return nil
})
if err := wg.Wait(); err != nil {
return err
}
return nil
}
// receiveDataColumnSidecar receives a single data column sidecar: marks it as seen and saves it to the chain.
// Do not loop over this function to receive multiple sidecars, use receiveDataColumnSidecars instead.
func (s *Service) receiveDataColumnSidecar(ctx context.Context, sidecar blocks.VerifiedRODataColumn) error {

View File

@@ -3,11 +3,12 @@ package sync
import (
"context"
"fmt"
"math"
"slices"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/operation"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v7/beacon-chain/verification"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
@@ -51,14 +52,12 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
// Decode the message, reject if it fails.
m, err := s.decodePubsubMessage(msg)
if err != nil {
log.WithError(err).Error("Failed to decode message")
return pubsub.ValidationReject, err
}
// Reject messages that are not of the expected type.
dcsc, ok := m.(*eth.DataColumnSidecar)
if !ok {
log.WithField("message", m).Error("Message is not of type *eth.DataColumnSidecar")
return pubsub.ValidationReject, errWrongMessage
}
@@ -72,7 +71,6 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
roDataColumns := []blocks.RODataColumn{roDataColumn}
// Create the verifier.
// Question(marco): Do we want the multiple columns verifier? Is batching used only for kzg proofs?
verifier := s.newColumnsVerifier(roDataColumns, verification.GossipDataColumnSidecarRequirements)
// Start the verification process.
@@ -146,12 +144,9 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
}
// [REJECT] The sidecar's column data is valid as verified by `verify_data_column_sidecar_kzg_proofs(sidecar)`.
validationResult, err := s.validateWithKzgBatchVerifier(ctx, roDataColumns)
if validationResult != pubsub.ValidationAccept {
return validationResult, err
if err := verifier.SidecarKzgProofVerified(); err != nil {
return pubsub.ValidationReject, err
}
// Mark KZG verification as satisfied since we did it via batch verifier
verifier.SatisfyRequirement(verification.RequireSidecarKzgProofVerified)
// [IGNORE] The sidecar is the first sidecar for the tuple `(block_header.slot, block_header.proposer_index, sidecar.index)`
// with valid header signature, sidecar inclusion proof, and kzg proof.
@@ -195,19 +190,13 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
dataColumnSidecarArrivalGossipSummary.Observe(float64(sinceSlotStartTime.Milliseconds()))
dataColumnSidecarVerificationGossipHistogram.Observe(float64(validationTime.Milliseconds()))
peerGossipScore := s.cfg.p2p.Peers().Scorers().GossipScorer().Score(pid)
select {
case s.dataColumnLogCh <- dataColumnLogEntry{
Slot: roDataColumn.Slot(),
ColIdx: roDataColumn.Index,
PropIdx: roDataColumn.ProposerIndex(),
BlockRoot: roDataColumn.BlockRoot(),
ParentRoot: roDataColumn.ParentRoot(),
PeerSuffix: pid.String()[len(pid.String())-6:],
PeerGossipScore: peerGossipScore,
validationTime: validationTime,
sinceStartTime: sinceSlotStartTime,
slot: roDataColumn.Slot(),
index: roDataColumn.Index,
root: roDataColumn.BlockRoot(),
validationTime: validationTime,
sinceStartTime: sinceSlotStartTime,
}:
default:
log.WithField("slot", roDataColumn.Slot()).Warn("Failed to send data column log entry")
@@ -252,68 +241,69 @@ func computeCacheKey(slot primitives.Slot, proposerIndex primitives.ValidatorInd
}
type dataColumnLogEntry struct {
Slot primitives.Slot
ColIdx uint64
PropIdx primitives.ValidatorIndex
BlockRoot [32]byte
ParentRoot [32]byte
PeerSuffix string
PeerGossipScore float64
validationTime time.Duration
sinceStartTime time.Duration
slot primitives.Slot
index uint64
root [32]byte
validationTime time.Duration
sinceStartTime time.Duration
}
func (s *Service) processDataColumnLogs() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
slotStats := make(map[primitives.Slot][fieldparams.NumberOfColumns]dataColumnLogEntry)
slotStats := make(map[[fieldparams.RootLength]byte][]dataColumnLogEntry)
for {
select {
case entry := <-s.dataColumnLogCh:
cols := slotStats[entry.Slot]
cols[entry.ColIdx] = entry
slotStats[entry.Slot] = cols
case col := <-s.dataColumnLogCh:
cols := slotStats[col.root]
cols = append(cols, col)
slotStats[col.root] = cols
case <-ticker.C:
for slot, columns := range slotStats {
var (
colIndices = make([]uint64, 0, fieldparams.NumberOfColumns)
peers = make([]string, 0, fieldparams.NumberOfColumns)
gossipScores = make([]float64, 0, fieldparams.NumberOfColumns)
validationTimes = make([]string, 0, fieldparams.NumberOfColumns)
sinceStartTimes = make([]string, 0, fieldparams.NumberOfColumns)
)
for root, columns := range slotStats {
indices := make([]uint64, 0, fieldparams.NumberOfColumns)
minValidationTime, maxValidationTime, sumValidationTime := time.Duration(0), time.Duration(0), time.Duration(0)
minSinceStartTime, maxSinceStartTime, sumSinceStartTime := time.Duration(0), time.Duration(0), time.Duration(0)
totalReceived := 0
for _, entry := range columns {
if entry.PeerSuffix == "" {
for _, column := range columns {
indices = append(indices, column.index)
sumValidationTime += column.validationTime
sumSinceStartTime += column.sinceStartTime
if totalReceived == 0 {
minValidationTime, maxValidationTime = column.validationTime, column.validationTime
minSinceStartTime, maxSinceStartTime = column.sinceStartTime, column.sinceStartTime
totalReceived++
continue
}
colIndices = append(colIndices, entry.ColIdx)
peers = append(peers, entry.PeerSuffix)
gossipScores = append(gossipScores, roundFloat(entry.PeerGossipScore, 2))
validationTimes = append(validationTimes, fmt.Sprintf("%.2fms", float64(entry.validationTime.Milliseconds())))
sinceStartTimes = append(sinceStartTimes, fmt.Sprintf("%.2fms", float64(entry.sinceStartTime.Milliseconds())))
minValidationTime, maxValidationTime = min(minValidationTime, column.validationTime), max(maxValidationTime, column.validationTime)
minSinceStartTime, maxSinceStartTime = min(minSinceStartTime, column.sinceStartTime), max(maxSinceStartTime, column.sinceStartTime)
totalReceived++
}
log.WithFields(logrus.Fields{
"slot": slot,
"receivedCount": totalReceived,
"columnIndices": colIndices,
"peers": peers,
"gossipScores": gossipScores,
"validationTimes": validationTimes,
"sinceStartTimes": sinceStartTimes,
}).Debug("Accepted data column sidecars summary")
if totalReceived > 0 {
slices.Sort(indices)
avgValidationTime := sumValidationTime / time.Duration(totalReceived)
avgSinceStartTime := sumSinceStartTime / time.Duration(totalReceived)
log.WithFields(logrus.Fields{
"slot": columns[0].slot,
"root": fmt.Sprintf("%#x", root),
"count": totalReceived,
"indices": helpers.PrettySlice(indices),
"validationTime": prettyMinMaxAverage(minValidationTime, maxValidationTime, avgValidationTime),
"sinceStartTime": prettyMinMaxAverage(minSinceStartTime, maxSinceStartTime, avgSinceStartTime),
}).Debug("Accepted data column sidecars summary")
}
}
slotStats = make(map[primitives.Slot][fieldparams.NumberOfColumns]dataColumnLogEntry)
slotStats = make(map[[fieldparams.RootLength]byte][]dataColumnLogEntry)
}
}
}
func roundFloat(f float64, decimals int) float64 {
mult := math.Pow(10, float64(decimals))
return math.Round(f*mult) / mult
func prettyMinMaxAverage(min, max, average time.Duration) string {
return fmt.Sprintf("[min: %v, avg: %v, max: %v]", min, average, max)
}

View File

@@ -1,142 +0,0 @@
package sync
import (
"context"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/signing"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/pkg/errors"
)
var (
// REJECT errors - peer should be penalized
errHeaderEmptyCommitments = errors.New("header has no kzg commitments")
errHeaderParentInvalid = errors.New("header parent invalid")
errHeaderSlotNotAfterParent = errors.New("header slot not after parent")
errHeaderNotFinalizedDescendant = errors.New("header not finalized descendant")
errHeaderInvalidInclusionProof = errors.New("invalid inclusion proof")
errHeaderInvalidSignature = errors.New("invalid proposer signature")
errHeaderUnexpectedProposer = errors.New("unexpected proposer index")
// IGNORE errors - don't penalize peer
errHeaderNil = errors.New("nil header")
errHeaderFromFuture = errors.New("header is from future slot")
errHeaderNotAboveFinalized = errors.New("header slot not above finalized")
errHeaderParentNotSeen = errors.New("header parent not seen")
)
// validatePartialDataColumnHeader validates a PartialDataColumnHeader per the consensus spec.
// Returns (reject, err) where reject=true means the peer should be penalized.
func (s *Service) validatePartialDataColumnHeader(ctx context.Context, header *ethpb.PartialDataColumnHeader) (reject bool, err error) {
if header == nil || header.SignedBlockHeader == nil || header.SignedBlockHeader.Header == nil {
return false, errHeaderNil // IGNORE
}
blockHeader := header.SignedBlockHeader.Header
headerSlot := blockHeader.Slot
parentRoot := bytesutil.ToBytes32(blockHeader.ParentRoot)
// [REJECT] kzg_commitments list is non-empty
if len(header.KzgCommitments) == 0 {
return true, errHeaderEmptyCommitments
}
// [IGNORE] Not from future slot (with MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance)
currentSlot := s.cfg.clock.CurrentSlot()
if headerSlot > currentSlot {
maxDisparity := params.BeaconConfig().MaximumGossipClockDisparityDuration()
slotStart, err := s.cfg.clock.SlotStart(headerSlot)
if err != nil {
return false, err
}
if s.cfg.clock.Now().Before(slotStart.Add(-maxDisparity)) {
return false, errHeaderFromFuture // IGNORE
}
}
// [IGNORE] Slot above finalized
finalizedCheckpoint := s.cfg.chain.FinalizedCheckpt()
startSlot, err := slots.EpochStart(finalizedCheckpoint.Epoch)
if err != nil {
return false, err
}
if headerSlot <= startSlot {
return false, errHeaderNotAboveFinalized // IGNORE
}
// [IGNORE] Parent has been seen
if !s.cfg.chain.HasBlock(ctx, parentRoot) {
return false, errHeaderParentNotSeen // IGNORE
}
// [REJECT] Parent passes validation (not a bad block)
if s.hasBadBlock(parentRoot) {
return true, errHeaderParentInvalid
}
// [REJECT] Header slot > parent slot
parentSlot, err := s.cfg.chain.RecentBlockSlot(parentRoot)
if err != nil {
return false, errors.Wrap(err, "get parent slot")
}
if headerSlot <= parentSlot {
return true, errHeaderSlotNotAfterParent
}
// [REJECT] Finalized checkpoint is ancestor (parent is in forkchoice)
if !s.cfg.chain.InForkchoice(parentRoot) {
return true, errHeaderNotFinalizedDescendant
}
// [REJECT] Inclusion proof valid
if err := peerdas.VerifyPartialDataColumnHeaderInclusionProof(header); err != nil {
return true, errHeaderInvalidInclusionProof
}
// [REJECT] Valid proposer signature
parentState, err := s.cfg.stateGen.StateByRoot(ctx, parentRoot)
if err != nil {
return false, errors.Wrap(err, "get parent state")
}
proposerIdx := blockHeader.ProposerIndex
proposer, err := parentState.ValidatorAtIndex(proposerIdx)
if err != nil {
return false, errors.Wrap(err, "get proposer")
}
domain, err := signing.Domain(
parentState.Fork(),
slots.ToEpoch(headerSlot),
params.BeaconConfig().DomainBeaconProposer,
parentState.GenesisValidatorsRoot(),
)
if err != nil {
return false, errors.Wrap(err, "get domain")
}
if err := signing.VerifyBlockHeaderSigningRoot(
blockHeader,
proposer.PublicKey,
header.SignedBlockHeader.Signature,
domain,
); err != nil {
return true, errHeaderInvalidSignature
}
// [REJECT] Expected proposer for slot
expectedProposer, err := helpers.BeaconProposerIndexAtSlot(ctx, parentState, headerSlot)
if err != nil {
return false, errors.Wrap(err, "compute expected proposer")
}
if expectedProposer != proposerIdx {
return true, errHeaderUnexpectedProposer
}
return false, nil // Valid header
}

View File

@@ -687,6 +687,12 @@ func sbrNotFound(t *testing.T, expectedRoot [32]byte) *mockStateByRooter {
}}
}
func sbrReturnsState(st state.BeaconState) *mockStateByRooter {
return &mockStateByRooter{sbr: func(_ context.Context, _ [32]byte) (state.BeaconState, error) {
return st, nil
}}
}
func sbrForValOverride(idx primitives.ValidatorIndex, val *ethpb.Validator) *mockStateByRooter {
return sbrForValOverrideWithT(nil, idx, val)
}

View File

@@ -11,12 +11,10 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition"
forkchoicetypes "github.com/OffchainLabs/prysm/v7/beacon-chain/forkchoice/types"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/runtime/logging"
"github.com/OffchainLabs/prysm/v7/time/slots"
@@ -361,7 +359,7 @@ func (dv *RODataColumnsVerifier) SidecarParentSeen(parentSeen func([fieldparams.
}
if !dv.fc.HasNode(parentRoot) {
return columnErrBuilder(errSidecarParentNotSeen)
return columnErrBuilder(errors.Wrapf(errSidecarParentNotSeen, "parent root: %#x", parentRoot))
}
}
@@ -484,88 +482,19 @@ func (dv *RODataColumnsVerifier) SidecarProposerExpected(ctx context.Context) (e
defer dv.recordResult(RequireSidecarProposerExpected, &err)
type slotParentRoot struct {
slot primitives.Slot
parentRoot [fieldparams.RootLength]byte
}
targetRootBySlotParentRoot := make(map[slotParentRoot][fieldparams.RootLength]byte)
var targetRootFromCache = func(slot primitives.Slot, parentRoot [fieldparams.RootLength]byte) ([fieldparams.RootLength]byte, error) {
// Use cached values if available.
slotParentRoot := slotParentRoot{slot: slot, parentRoot: parentRoot}
if root, ok := targetRootBySlotParentRoot[slotParentRoot]; ok {
return root, nil
}
// Compute the epoch of the data column slot.
dataColumnEpoch := slots.ToEpoch(slot)
if dataColumnEpoch > 0 {
dataColumnEpoch = dataColumnEpoch - 1
}
// Compute the target root for the epoch.
targetRoot, err := dv.fc.TargetRootForEpoch(parentRoot, dataColumnEpoch)
if err != nil {
return [fieldparams.RootLength]byte{}, columnErrBuilder(errors.Wrap(err, "target root from epoch"))
}
// Store the target root in the cache.
targetRootBySlotParentRoot[slotParentRoot] = targetRoot
return targetRoot, nil
}
for _, dataColumn := range dv.dataColumns {
// Extract the slot of the data column.
dataColumnSlot := dataColumn.Slot()
// Extract the root of the parent block corresponding to the data column.
parentRoot := dataColumn.ParentRoot()
// Compute the target root for the data column.
targetRoot, err := targetRootFromCache(dataColumnSlot, parentRoot)
// Get the verifying state, it is guaranteed to have the correct proposer in the lookahead.
verifyingState, err := dv.getVerifyingState(ctx, dataColumn)
if err != nil {
return columnErrBuilder(errors.Wrap(err, "target root"))
return columnErrBuilder(errors.Wrap(err, "verifying state"))
}
// Compute the epoch of the data column slot.
dataColumnEpoch := slots.ToEpoch(dataColumnSlot)
if dataColumnEpoch > 0 {
dataColumnEpoch = dataColumnEpoch - 1
}
// Create a checkpoint for the target root.
checkpoint := &forkchoicetypes.Checkpoint{Root: targetRoot, Epoch: dataColumnEpoch}
// Try to extract the proposer index from the data column in the cache.
idx, cached := dv.pc.Proposer(checkpoint, dataColumnSlot)
if !cached {
parentRoot := dataColumn.ParentRoot()
// Ensure the expensive index computation is only performed once for
// concurrent requests for the same signature data.
idxAny, err, _ := dv.sg.Do(concatRootSlot(parentRoot, dataColumnSlot), func() (any, error) {
verifyingState, err := dv.getVerifyingState(ctx, dataColumn)
if err != nil {
return nil, columnErrBuilder(errors.Wrap(err, "verifying state"))
}
idx, err = helpers.BeaconProposerIndexAtSlot(ctx, verifyingState, dataColumnSlot)
if err != nil {
return nil, columnErrBuilder(errors.Wrap(err, "compute proposer"))
}
return idx, nil
})
if err != nil {
return err
}
var ok bool
if idx, ok = idxAny.(primitives.ValidatorIndex); !ok {
return columnErrBuilder(errors.New("type assertion to ValidatorIndex failed"))
}
// Use proposer lookahead directly
idx, err := helpers.BeaconProposerIndexAtSlot(ctx, verifyingState, dataColumnSlot)
if err != nil {
return columnErrBuilder(errors.Wrap(err, "proposer from lookahead"))
}
if idx != dataColumn.ProposerIndex() {
@@ -626,7 +555,3 @@ func inclusionProofKey(c blocks.RODataColumn) ([32]byte, error) {
return sha256.Sum256(unhashedKey), nil
}
func concatRootSlot(root [fieldparams.RootLength]byte, slot primitives.Slot) string {
return string(root[:]) + fmt.Sprintf("%d", slot)
}

View File

@@ -2,7 +2,6 @@ package verification
import (
"reflect"
"sync"
"testing"
"time"
@@ -795,87 +794,90 @@ func TestDataColumnsSidecarProposerExpected(t *testing.T) {
blobCount = 1
)
parentRoot := [fieldparams.RootLength]byte{}
columns := GenerateTestDataColumns(t, parentRoot, columnSlot, blobCount)
firstColumn := columns[0]
ctx := t.Context()
testCases := []struct {
name string
stateByRooter StateByRooter
proposerCache proposerCache
columns []blocks.RODataColumn
error string
}{
{
name: "Cached, matches",
stateByRooter: nil,
proposerCache: &mockProposerCache{
ProposerCB: pcReturnsIdx(firstColumn.ProposerIndex()),
},
columns: columns,
},
{
name: "Cached, does not match",
stateByRooter: nil,
proposerCache: &mockProposerCache{
ProposerCB: pcReturnsIdx(firstColumn.ProposerIndex() + 1),
},
columns: columns,
error: errSidecarUnexpectedProposer.Error(),
},
{
name: "Not cached, state lookup failure",
stateByRooter: sbrNotFound(t, firstColumn.ParentRoot()),
proposerCache: &mockProposerCache{
ProposerCB: pcReturnsNotFound(),
},
columns: columns,
error: "verifying state",
},
}
parentRoot := [fieldparams.RootLength]byte{}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
initializer := Initializer{
shared: &sharedResources{
sr: tc.stateByRooter,
pc: tc.proposerCache,
hsp: &mockHeadStateProvider{},
fc: &mockForkchoicer{
TargetRootForEpochCB: fcReturnsTargetRoot([fieldparams.RootLength]byte{}),
},
// Create a Fulu state to get the expected proposer from the lookahead.
fuluState, _ := util.DeterministicGenesisStateFulu(t, 32)
expectedProposer, err := fuluState.ProposerLookahead()
require.NoError(t, err)
expectedProposerIdx := primitives.ValidatorIndex(expectedProposer[columnSlot])
// Generate data columns with the expected proposer index.
matchingColumns := generateTestDataColumnsWithProposer(t, parentRoot, columnSlot, blobCount, expectedProposerIdx)
// Generate data columns with wrong proposer index.
wrongColumns := generateTestDataColumnsWithProposer(t, parentRoot, columnSlot, blobCount, expectedProposerIdx+1)
t.Run("Proposer matches", func(t *testing.T) {
initializer := Initializer{
shared: &sharedResources{
sr: sbrReturnsState(fuluState),
hsp: &mockHeadStateProvider{
headRoot: parentRoot[:],
headSlot: columnSlot, // Same epoch so HeadStateReadOnly is used
headStateReadOnly: fuluState,
},
}
fc: &mockForkchoicer{},
},
}
verifier := initializer.NewDataColumnsVerifier(tc.columns, GossipDataColumnSidecarRequirements)
var wg sync.WaitGroup
verifier := initializer.NewDataColumnsVerifier(matchingColumns, GossipDataColumnSidecarRequirements)
err := verifier.SidecarProposerExpected(ctx)
require.NoError(t, err)
require.Equal(t, true, verifier.results.executed(RequireSidecarProposerExpected))
require.NoError(t, verifier.results.result(RequireSidecarProposerExpected))
})
var err1, err2 error
wg.Go(func() {
err1 = verifier.SidecarProposerExpected(ctx)
})
wg.Go(func() {
err2 = verifier.SidecarProposerExpected(ctx)
})
wg.Wait()
t.Run("Proposer does not match", func(t *testing.T) {
initializer := Initializer{
shared: &sharedResources{
sr: sbrReturnsState(fuluState),
hsp: &mockHeadStateProvider{
headRoot: parentRoot[:],
headSlot: columnSlot, // Same epoch so HeadStateReadOnly is used
headStateReadOnly: fuluState,
},
fc: &mockForkchoicer{},
},
}
require.Equal(t, true, verifier.results.executed(RequireSidecarProposerExpected))
verifier := initializer.NewDataColumnsVerifier(wrongColumns, GossipDataColumnSidecarRequirements)
err := verifier.SidecarProposerExpected(ctx)
require.ErrorContains(t, errSidecarUnexpectedProposer.Error(), err)
require.Equal(t, true, verifier.results.executed(RequireSidecarProposerExpected))
require.NotNil(t, verifier.results.result(RequireSidecarProposerExpected))
})
if len(tc.error) > 0 {
require.ErrorContains(t, tc.error, err1)
require.ErrorContains(t, tc.error, err2)
require.NotNil(t, verifier.results.result(RequireSidecarProposerExpected))
return
}
t.Run("State lookup failure", func(t *testing.T) {
columns := GenerateTestDataColumns(t, parentRoot, columnSlot, blobCount)
initializer := Initializer{
shared: &sharedResources{
sr: sbrNotFound(t, columns[0].ParentRoot()),
hsp: &mockHeadStateProvider{},
fc: &mockForkchoicer{},
},
}
require.NoError(t, err1)
require.NoError(t, err2)
require.NoError(t, verifier.results.result(RequireSidecarProposerExpected))
verifier := initializer.NewDataColumnsVerifier(columns, GossipDataColumnSidecarRequirements)
err := verifier.SidecarProposerExpected(ctx)
require.ErrorContains(t, "verifying state", err)
require.Equal(t, true, verifier.results.executed(RequireSidecarProposerExpected))
require.NotNil(t, verifier.results.result(RequireSidecarProposerExpected))
})
}
err := verifier.SidecarProposerExpected(ctx)
require.NoError(t, err)
})
func generateTestDataColumnsWithProposer(t *testing.T, parent [fieldparams.RootLength]byte, slot primitives.Slot, blobCount int, proposer primitives.ValidatorIndex) []blocks.RODataColumn {
roBlock, roBlobs := util.GenerateTestDenebBlockWithSidecar(t, parent, slot, blobCount, util.WithProposer(proposer))
blobs := make([]kzg.Blob, 0, len(roBlobs))
for i := range roBlobs {
blobs = append(blobs, kzg.Blob(roBlobs[i].Blob))
}
cellsPerBlob, proofsPerBlob := util.GenerateCellsAndProofs(t, blobs)
roDataColumnSidecars, err := peerdas.DataColumnSidecars(cellsPerBlob, proofsPerBlob, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
return roDataColumnSidecars
}
func TestColumnRequirementSatisfaction(t *testing.T) {
@@ -922,12 +924,3 @@ func TestColumnRequirementSatisfaction(t *testing.T) {
require.NoError(t, err)
}
func TestConcatRootSlot(t *testing.T) {
root := [fieldparams.RootLength]byte{1, 2, 3}
const slot = primitives.Slot(3210)
const expected = "\x01\x02\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x003210"
actual := concatRootSlot(root, slot)
require.Equal(t, expected, actual)
}

View File

@@ -78,21 +78,11 @@ func (ini *Initializer) NewBlobVerifier(b blocks.ROBlob, reqs []Requirement) *RO
// WARNING: The returned verifier is not thread-safe, and should not be used concurrently.
func (ini *Initializer) NewDataColumnsVerifier(roDataColumns []blocks.RODataColumn, reqs []Requirement) *RODataColumnsVerifier {
return &RODataColumnsVerifier{
sharedResources: ini.shared,
dataColumns: roDataColumns,
results: newResults(reqs...),
verifyDataColumnsCommitment: func(rc []blocks.RODataColumn) error {
if len(rc) == 0 {
return nil
}
var sizeHint int
if len(rc) > 0 {
sizeHint = len(rc[0].Column)
}
sizeHint *= len(rc)
return peerdas.VerifyDataColumnsCellsKZGProofs(sizeHint, blocks.RODataColumnsToCellProofBundles(rc))
},
stateByRoot: make(map[[fieldparams.RootLength]byte]state.BeaconState),
sharedResources: ini.shared,
dataColumns: roDataColumns,
results: newResults(reqs...),
verifyDataColumnsCommitment: peerdas.VerifyDataColumnsSidecarKZGProofs,
stateByRoot: make(map[[fieldparams.RootLength]byte]state.BeaconState),
}
}

View File

@@ -0,0 +1,3 @@
### Added
- `primitives.BuilderIndex`: SSZ `uint64` wrapper for builder registry indices.

View File

@@ -0,0 +1,3 @@
### Changed
- the /eth/v2/beacon/pool/attestations and /eth/v1/beacon/pool/sync_committees now returns a 503 error if the node is still syncing, the rest api is also working in a similar process to gRPC broadcasting immediately now.

View File

@@ -0,0 +1,2 @@
#### Fixed
- Fix validation logic for `--backfill-oldest-slot`, which was rejecting slots newer than 1056767.

3
changelog/manu-agg.md Normal file
View File

@@ -0,0 +1,3 @@
### Changed
- Pending aggregates: When multiple aggregated attestations only differing by the aggregator index are in the pending queue, only process one of them.

View File

@@ -0,0 +1,3 @@
### Changed
- Data column sidecars cache warmup: Process in parallel all sidecars for a given epoch.

3
changelog/manu-log.md Normal file
View File

@@ -0,0 +1,3 @@
### Changed
- Summarize DEBUG log corresponding to incoming via gossip data column sidecar.

View File

@@ -0,0 +1,2 @@
### Changed
- `validateDataColumn`: Remove error logs.

View File

@@ -0,0 +1,2 @@
### Changed
- Use lookahead to validate data column sidecar proposer index.

View File

@@ -0,0 +1,3 @@
### Changed
- Notify the engine about forkchoice updates in the background.

View File

@@ -0,0 +1,2 @@
### Changed
- Use a separate context when updating the slot cache.

View File

@@ -0,0 +1,3 @@
### Fixed
- Do not process slots and copy states for next epoch proposers after Fulu

View File

@@ -0,0 +1,2 @@
### Ignored
- D not send FCU on block batches.

View File

@@ -0,0 +1,3 @@
### Changed
- Do not check block signature on state transition.

View File

@@ -0,0 +1,3 @@
### Added
- Use the head state to validate attestations for the previous epoch if head is compatible with the target checkpoint.

View File

@@ -0,0 +1,3 @@
### Changed
- Extend `httperror` analyzer to more functions.

View File

@@ -0,0 +1,3 @@
### Fixed
- avoid panic when fork schedule is empty [#16175](https://github.com/OffchainLabs/prysm/pull/16175)

View File

@@ -0,0 +1,3 @@
### Changed
- Performance improvement in ProcessConsolidationRequests: Use more performance HasPendingBalanceToWithdraw instead of PendingBalanceToWithdraw as no need to calculate full total pending balance.

View File

@@ -356,9 +356,4 @@ var (
Usage: "A comma-separated list of exponents (of 2) in decreasing order, defining the state diff hierarchy levels. The last exponent must be greater than or equal to 5.",
Value: cli.NewIntSlice(21, 18, 16, 13, 11, 9, 5),
}
// PartialDataColumns specifies the regex for enabling partial messages on datacolumns
PartialDataColumns = &cli.BoolFlag{
Name: "partial-data-columns",
Usage: "Enable cell-level dissemination for PeerDAS data columns",
}
)

View File

@@ -156,7 +156,6 @@ var appFlags = []cli.Flag{
dasFlags.BackfillOldestSlot,
dasFlags.BlobRetentionEpochFlag,
flags.BatchVerifierLimit,
flags.PartialDataColumns,
}
func init() {

View File

@@ -74,7 +74,6 @@ var appHelpFlagGroups = []flagGroup{
flags.RPCHost,
flags.RPCPort,
flags.BatchVerifierLimit,
flags.PartialDataColumns,
},
},
{

View File

@@ -8,7 +8,6 @@ go_library(
"get_payload.go",
"getters.go",
"kzg.go",
"partialdatacolumn.go",
"proofs.go",
"proto.go",
"roblob.go",
@@ -35,12 +34,9 @@ go_library(
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/validator-client:go_default_library",
"//runtime/version:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_fastssz//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_prysmaticlabs_gohashtree//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
],
)
@@ -52,7 +48,6 @@ go_test(
"factory_test.go",
"getters_test.go",
"kzg_test.go",
"partialdatacolumn_invariants_test.go",
"proofs_test.go",
"proto_test.go",
"roblob_test.go",
@@ -75,9 +70,6 @@ go_test(
"//runtime/version:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages:go_default_library",
"@com_github_prysmaticlabs_fastssz//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_prysmaticlabs_gohashtree//:go_default_library",

View File

@@ -1,244 +0,0 @@
package blocks
import (
"errors"
"github.com/OffchainLabs/go-bitfield"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/libp2p/go-libp2p-pubsub/partialmessages"
"github.com/sirupsen/logrus"
)
type CellProofBundle struct {
ColumnIndex uint64
Commitment []byte
Cell []byte
Proof []byte
}
type PartialDataColumn struct {
*ethpb.DataColumnSidecar
root [fieldparams.RootLength]byte
groupID []byte
Included bitfield.Bitlist
// Parts we've received before we have any commitments to validate against.
// Happens when a peer eager pushes to us.
// TODO implement. For now, not bothering to handle the eager pushes.
// quarantine []*ethpb.PartialDataColumnSidecar
}
// const quarantineSize = 3
// NewPartialDataColumn creates a new Partial Data Column for the given block.
// It does not validate the inputs. The caller is responsible for validating the
// block header and KZG Commitment Inclusion proof.
func NewPartialDataColumn(
signedBlockHeader *ethpb.SignedBeaconBlockHeader,
columnIndex uint64,
kzgCommitments [][]byte,
kzgInclusionProof [][]byte,
) (PartialDataColumn, error) {
root, err := signedBlockHeader.Header.HashTreeRoot()
if err != nil {
return PartialDataColumn{}, err
}
sidecar := &ethpb.DataColumnSidecar{
Index: columnIndex,
KzgCommitments: kzgCommitments,
Column: make([][]byte, len(kzgCommitments)),
KzgProofs: make([][]byte, len(kzgCommitments)),
SignedBlockHeader: signedBlockHeader,
KzgCommitmentsInclusionProof: kzgInclusionProof,
}
groupID := make([]byte, len(root)+1)
copy(groupID[1:], root[:])
// Version 0
groupID[0] = 0
c := PartialDataColumn{
DataColumnSidecar: sidecar,
root: root,
groupID: groupID,
Included: bitfield.NewBitlist(uint64(len(sidecar.KzgCommitments))),
}
if len(c.Column) != len(c.KzgCommitments) {
return PartialDataColumn{}, errors.New("mismatch between number of cells and commitments")
}
if len(c.KzgProofs) != len(c.KzgCommitments) {
return PartialDataColumn{}, errors.New("mismatch between number of proofs and commitments")
}
for i := range len(c.KzgCommitments) {
if sidecar.Column[i] == nil {
continue
}
c.Included.SetBitAt(uint64(i), true)
}
return c, nil
}
func (p *PartialDataColumn) GroupID() []byte {
return p.groupID
}
func (p *PartialDataColumn) PartialMessageBytes(metadata partialmessages.PartsMetadata) ([]byte, error) {
peerHas := bitfield.Bitlist(metadata)
if peerHas.Len() != p.Included.Len() {
return nil, errors.New("metadata length does not match expected length")
}
var cellsToReturn int
for i := range peerHas.Len() {
if !peerHas.BitAt(i) && p.Included.BitAt(i) {
cellsToReturn++
}
}
if cellsToReturn == 0 {
return nil, nil
}
included := bitfield.NewBitlist(p.Included.Len())
outMessage := ethpb.PartialDataColumnSidecar{
CellsPresentBitmap: included,
PartialColumn: make([][]byte, 0, cellsToReturn),
KzgProofs: make([][]byte, 0, cellsToReturn),
}
for i := range peerHas.Len() {
if peerHas.BitAt(i) || !p.Included.BitAt(i) {
continue
}
included.SetBitAt(i, true)
outMessage.PartialColumn = append(outMessage.PartialColumn, p.Column[i])
outMessage.KzgProofs = append(outMessage.KzgProofs, p.KzgProofs[i])
}
marshalled, err := outMessage.MarshalSSZ()
if err != nil {
return nil, err
}
return marshalled, nil
}
func (p *PartialDataColumn) EagerPartialMessageBytes() ([]byte, partialmessages.PartsMetadata, error) {
// Eagerly push the PartialDataColumnHeader
outHeader := &ethpb.PartialDataColumnHeader{
KzgCommitments: p.KzgCommitments,
SignedBlockHeader: p.SignedBlockHeader,
KzgCommitmentsInclusionProof: p.KzgCommitmentsInclusionProof,
}
outMessage := &ethpb.PartialDataColumnSidecar{
CellsPresentBitmap: bitfield.NewBitlist(uint64(len(p.KzgCommitments))),
Header: []*ethpb.PartialDataColumnHeader{outHeader},
}
marshalled, err := outMessage.MarshalSSZ()
if err != nil {
return nil, nil, err
}
// Empty bitlist since we aren't including any cells here
peersNextParts := partialmessages.PartsMetadata(bitfield.NewBitlist(uint64(len(p.KzgCommitments))))
return marshalled, peersNextParts, nil
}
func (p *PartialDataColumn) PartsMetadata() partialmessages.PartsMetadata {
return partialmessages.PartsMetadata(p.Included)
}
// CellsToVerifyFromPartialMessage returns cells from the partial message that need to be verified.
func (p *PartialDataColumn) CellsToVerifyFromPartialMessage(message *ethpb.PartialDataColumnSidecar) ([]uint64, []CellProofBundle, error) {
included := message.CellsPresentBitmap
if included.Len() == 0 {
return nil, nil, nil
}
// Some basic sanity checks
includedCells := included.Count()
if uint64(len(message.KzgProofs)) != includedCells {
return nil, nil, errors.New("invalid message. Missing KZG proofs")
}
if uint64(len(message.PartialColumn)) != includedCells {
return nil, nil, errors.New("invalid message. Missing cells")
}
ourIncludedList := p.Included
if included.Len() != ourIncludedList.Len() {
return nil, nil, errors.New("invalid message. Wrong bitmap length.")
}
cellIndices := make([]uint64, 0, includedCells)
cellsToVerify := make([]CellProofBundle, 0, includedCells)
// Filter out cells we already have
for i := range included.Len() {
if len(message.PartialColumn) == 0 {
break
}
if !included.BitAt(i) {
continue
}
if !ourIncludedList.BitAt(i) {
cellIndices = append(cellIndices, i)
cellsToVerify = append(cellsToVerify, CellProofBundle{
ColumnIndex: p.Index,
Cell: message.PartialColumn[0],
Proof: message.KzgProofs[0],
// Use the commitment from our datacolumn, indexed by i since we
// have all commitments.
Commitment: p.KzgCommitments[i],
})
}
message.PartialColumn = message.PartialColumn[1:]
message.KzgProofs = message.KzgProofs[1:]
}
return cellIndices, cellsToVerify, nil
}
// ExtendFromVerfifiedCells will extend this partial column with the provided verified cells
func (p *PartialDataColumn) ExtendFromVerfifiedCell(cellIndex uint64, cell, proof []byte) bool {
if p.Included.BitAt(cellIndex) {
// We already have this cell
return false
}
p.Included.SetBitAt(cellIndex, true)
p.Column[cellIndex] = cell
p.KzgProofs[cellIndex] = proof
return true
}
// ExtendFromVerfifiedCells will extend this partial column with the provided verified cells
func (p *PartialDataColumn) ExtendFromVerfifiedCells(cellIndices []uint64, cells []CellProofBundle) /* extended */ bool {
var extended bool
for i, bundle := range cells {
if bundle.ColumnIndex != p.Index {
// Invalid column index, shouldn't happen
return false
}
if p.ExtendFromVerfifiedCell(cellIndices[i], bundle.Cell, bundle.Proof) {
extended = true
}
}
return extended
}
func (p *PartialDataColumn) Complete(logger *logrus.Logger) (VerifiedRODataColumn, bool) {
if uint64(len(p.KzgCommitments)) != p.Included.Count() {
return VerifiedRODataColumn{}, false
}
rodc, err := NewRODataColumn(p.DataColumnSidecar)
if err != nil {
// We shouldn't get an error, as we check the hash root when creating
// the partial column
logger.Error("failed to create RODataColumn", "err", err)
return VerifiedRODataColumn{}, false
}
return NewVerifiedRODataColumn(rodc), true
}

Some files were not shown because too many files have changed in this diff Show More