Compare commits

..

48 Commits

Author SHA1 Message Date
Aarsh Shah
bfd9ff8651 fix lint 2026-02-03 18:10:35 +04:00
Aarsh Shah
79301d4db6 fix bazel 2026-02-03 18:03:26 +04:00
Aarsh Shah
fbfea6f753 fix log files CI error 2026-02-03 17:54:41 +04:00
Aarsh Shah
562ef25527 fix CI 2026-02-03 17:47:40 +04:00
Aarsh Shah
488971f989 fix CI 2026-02-03 17:44:33 +04:00
Aarsh Shah
b129eaaeb8 fix lint 2026-02-03 17:01:45 +04:00
Aarsh Shah
90302adbd2 fix CI 2026-02-03 16:17:36 +04:00
Aarsh Shah
a6262ba07b go mod tidy and partial columns 2026-02-03 15:23:59 +04:00
Aarsh Shah
7d5c8d6964 Merge branch 'develop' into rebased-partial-columns 2026-02-03 10:20:18 +04:00
Preston Van Loon
d1b9281677 golangci-lint: Remove test exclusion from formatting (#16318)
**What type of PR is this?**

> Other

**What does this PR do? Why is it needed?**

**Which issues(s) does this PR fix?**

Follow up to #16311

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-02-02 17:42:05 +00:00
Marco Munizaga
38cae3f8de add todos from call
this came from a call with Aarsh and Kasey
2026-02-02 11:05:40 -06:00
Marco Munizaga
a5e6d0a3ac Implement gossipsub_mesh_peers metric 2026-02-02 11:05:40 -06:00
Marco Munizaga
c4a308e598 Track number of peers in mesh
along with tracking which peers request partial messages
2026-02-02 11:05:40 -06:00
Marco Munizaga
ba720c1b4b update go-libp2p-pubsub with new tracer
the new tracer interface provides the peer ID in tracer.RecvRPC.
2026-02-02 11:05:40 -06:00
Marco Munizaga
5a3f45f91f add todo 2026-02-02 11:05:40 -06:00
Marco Munizaga
29010edcd1 return valid if there are no datacolumns to validate 2026-02-02 11:05:40 -06:00
Marco Munizaga
045c29ccce cache partial data column header by group ID 2026-02-02 11:05:40 -06:00
Marco Munizaga
0d80bbe44f Add partial message metrics 2026-02-02 11:05:40 -06:00
Marco Munizaga
14f13ed902 more context around errors 2026-02-02 11:05:40 -06:00
Marco Munizaga
308199c2e7 fix test typo 2026-02-02 11:05:40 -06:00
Marco Munizaga
7a04c6b645 eagerly push the partial data column header 2026-02-02 11:05:40 -06:00
Marco Munizaga
93cda45e18 Include the version byte in the group ID 2026-02-02 11:05:40 -06:00
Marco Munizaga
1b4265ef3f add partial data column header 2026-02-02 11:05:40 -06:00
Marco Munizaga
90d1716fd7 Update go-libp2p-pubsub 2026-02-02 11:05:40 -06:00
Marco Munizaga
c4f1a9ac4f add todo 2026-02-02 11:05:40 -06:00
Marco Munizaga
73d948a710 add partial-data-columns flag 2026-02-02 11:05:38 -06:00
Marco Munizaga
5e2985d36b beacon-chain/sync: subscribe to partial columns 2026-02-02 11:04:53 -06:00
Marco Munizaga
7ac3f3cb68 publish partial columns when proposing a block 2026-02-02 11:04:53 -06:00
Marco Munizaga
0a759c3d15 beacon-chain/execution: return partial columns and use getBlobsV3
... if available
2026-02-02 11:04:53 -06:00
Marco Munizaga
bf7ca00780 core/peerdas: Add PartialColumns helper 2026-02-02 11:04:52 -06:00
Marco Munizaga
efcb98bcaa core/peerdas: support partial responses 2026-02-02 11:04:52 -06:00
Marco Munizaga
7b0de5ad0e beacon-chain/p2p: own and start PartialColumnBroadcaster 2026-02-02 11:04:52 -06:00
Marco Munizaga
2e15cd2068 Add metrics for gossipsub message sizes 2026-02-02 11:04:52 -06:00
Marco Munizaga
2e5192e496 Implement PartialColumnBroadcaster 2026-02-02 11:04:52 -06:00
Marco Munizaga
729c54a300 refactor DataColumn Cell KZG Proof verification 2026-02-02 11:04:52 -06:00
Marco Munizaga
4556aa266a avoid needless copy 2026-02-02 11:04:52 -06:00
Marco Munizaga
2f1cac217d Add PartialDataColumn type 2026-02-02 11:04:52 -06:00
Marco Munizaga
6bfc779ea1 proto: Add PartialDataColumnSidecar 2026-02-02 11:04:52 -06:00
Marco Munizaga
90481d6aa8 clone slice in testing util 2026-02-02 11:04:52 -06:00
Marco Munizaga
9f64007dc1 logrusadapter for slog 2026-02-02 11:04:52 -06:00
Marco Munizaga
879ea624ec fix multiaddr comparison 2026-02-02 11:04:52 -06:00
Marco Munizaga
79841f451c deps: update libp2p deps
for partial message support and simnet support
2026-02-02 11:04:52 -06:00
james-prysm
641d90990d grpc fallback improvements (#16215)
<!-- Thanks for sending a PR! Before submitting:

1. If this is your first PR, check out our contribution guide here
https://docs.prylabs.network/docs/contribute/contribution-guidelines
You will then need to sign our Contributor License Agreement (CLA),
which will show up as a comment from a bot in this pull request after
you open it. We cannot review code without a signed CLA.
2. Please file an associated tracking issue if this pull request is
non-trivial and requires context for our team to understand. All
features and most bug fixes should have
an associated issue with a design discussed and decided upon. Small bug
   fixes and documentation improvements don't need issues.
3. New features and bug fixes must have tests. Documentation may need to
be updated. If you're unsure what to update, send the PR, and we'll
discuss
   in review.
4. Note that PRs updating dependencies and new Go versions are not
accepted.
   Please file an issue instead.
5. A changelog entry is required for user facing issues.
-->

**What type of PR is this?**

## Summary

This PR implements gRPC fallback support for the validator client,
allowing it to automatically switch between multiple beacon node
endpoints when the primary node becomes unavailable or unhealthy.

## Changes

- Added `grpcConnectionProvider` to manage multiple gRPC connections
with circular failover
- Validator automatically detects unhealthy beacon nodes and switches to
the next available endpoint
- Health checks verify both node responsiveness AND sync status before
accepting a node
- Improved logging to only show "Found fully synced beacon node" when an
actual switch occurs (reduces log noise)


I removed the old middleware that uses gRPC's built in load balancer
because:

- gRPC's pick_first load balancer doesn't provide sync-status-aware
failover
- The validator needs to ensure it connects to a fully synced node, not
just a reachable one

## Test Scenario

### Setup
Deployed a 4-node Kurtosis testnet with local validator connecting to 2
beacon nodes:

```yaml
# kurtosis-grpc-fallback-test.yaml
participants:
  - el_type: nethermind
    cl_type: prysm
    validator_count: 128  # Keeps chain advancing
  - el_type: nethermind
    cl_type: prysm
    validator_count: 64
  - el_type: nethermind
    cl_type: prysm
    validator_count: 64   # Keeps chain advancing
  - el_type: nethermind
    cl_type: prysm
    validator_count: 64   # Keeps chain advancing

network_params:
  fulu_fork_epoch: 0
  seconds_per_slot: 6
```

Local validator started with:
```bash
./validator --beacon-rpc-provider=127.0.0.1:33005,127.0.0.1:33012 ...
```

### Test 1: Primary Failover (cl-1 → cl-2)

1. Stopped cl-1 beacon node
2. Validator detected failure and switched to cl-2

**Logs:**
```
WARN  Beacon node is not responding, switching host currentHost=127.0.0.1:33005 nextHost=127.0.0.1:33012
DEBUG Trying gRPC endpoint newHost=127.0.0.1:33012 previousHost=127.0.0.1:33005
INFO  Failover succeeded: connected to healthy beacon node failedAttempts=[127.0.0.1:33005] newHost=127.0.0.1:33012 previousHost=127.0.0.1:33005
```

**Result:**  PASSED - Validator continued submitting attestations on
cl-2

### Test 2: Circular Failover (cl-2 → cl-1)

1. Restarted cl-1, stopped cl-2
2. Validator detected failure and switched back to cl-1

**Logs:**
```
WARN  Beacon node is not responding, switching host currentHost=127.0.0.1:33012 nextHost=127.0.0.1:33005
DEBUG Trying gRPC endpoint newHost=127.0.0.1:33005 previousHost=127.0.0.1:33012
INFO  Failover succeeded: connected to healthy beacon node failedAttempts=[127.0.0.1:33012] newHost=127.0.0.1:33005 previousHost=127.0.0.1:33012
```

**Result:**  PASSED - Circular fallback works correctly

## Key Log Messages

| Log Level | Message | Source |
|-----------|---------|--------|
| WARN | "Beacon node is not responding, switching host" |
`changeHost()` in validator.go |
| INFO | "Switched gRPC endpoint" | `SetHost()` in
grpc_connection_provider.go |
| INFO | "Found fully synced beacon node" | `FindHealthyHost()` in
validator.go (only on actual switch) |

## Test Plan

- [x] Verify primary failover (cl-1 → cl-2)
- [x] Verify circular failover (cl-2 → cl-1)
- [x] Verify validator continues producing attestations after switch
- [x] Verify "Found fully synced beacon node" only logs on actual switch
(not every health check)

**What does this PR do? Why is it needed?**

**Which issues(s) does this PR fix?**

Fixes # https://github.com/OffchainLabs/prysm/pull/7133


**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).

---------

Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
2026-02-02 14:51:56 +00:00
terence
d2fc250f34 Run go fmt (#16311)
Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
2026-02-02 14:19:15 +00:00
Jun Song
571c6f39aa Add docs for SSZ Query package (#16299)
**What type of PR is this?**

Documentation

**What does this PR do? Why is it needed?**

Although godoc and comments are well-written in `encoding/ssz/query`
package, we (@rkapka, @fernantho, @syjn99)
[agreed](https://discord.com/channels/476244492043812875/1387734369527136297/1466075406523174944)
that it would be great to have human-readable documentation.

**Which issues(s) does this PR fix?**

Part of  #15587 & #15598 

**Other notes for review**

This documentation is first drafted by Claude Code, and then has a few
rounds of self-review.

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [ ] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).

---------

Co-authored-by: fernantho <fernantho1@gmail.com>
Co-authored-by: Radosław Kapka <radoslaw.kapka@gmail.com>
2026-02-01 03:39:53 +00:00
Justin Traglia
55fe85c887 Add ability to download nightly tests from a specific night (#16298)
**What type of PR is this?**

Feature

**What does this PR do? Why is it needed?**

This PR allows devs to test against a specific run of the nightly
reference test generator.

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-01-29 21:38:13 +00:00
Justin Traglia
31f77567dd Add a README for specrefs (#16302)
**What type of PR is this?**

Documentation

**What does this PR do? Why is it needed?**

This PR adds a basic README for the specrefs.


**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2026-01-29 20:36:29 +00:00
terence
a7fdd11777 gloas: sample PTC per committee (#16293)
This PR updates `get_ptc` construction to sample ptc
committee-by-committee instead of concatenating all beacon committees
into a large slice. No functional changes to payload attestation
verification
2026-01-29 14:21:54 +00:00
209 changed files with 5509 additions and 5398 deletions

View File

@@ -2,7 +2,7 @@ name: Go
on:
push:
branches: [ master ]
branches: [ master, develop ]
pull_request:
branches: [ '*' ]
merge_group:

View File

@@ -33,9 +33,8 @@ formatters:
generated: lax
paths:
- validator/web/site_data.go
- .*_test.go
- proto
- tools/analyzers
- third_party$
- builtin$
- examples$
- examples$

View File

@@ -3,13 +3,16 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"grpc_connection_provider.go",
"grpcutils.go",
"log.go",
"mock_grpc_provider.go",
"parameters.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/api/grpc",
visibility = ["//visibility:public"],
deps = [
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
@@ -18,12 +21,17 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["grpcutils_test.go"],
srcs = [
"grpc_connection_provider_test.go",
"grpcutils_test.go",
],
embed = [":go_default_library"],
deps = [
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//credentials/insecure:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
],
)

View File

@@ -0,0 +1,173 @@
package grpc
import (
"context"
"strings"
"sync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
// GrpcConnectionProvider manages gRPC connections for failover support.
// It allows switching between different beacon node endpoints when the current one becomes unavailable.
// Only one connection is maintained at a time - when switching hosts, the old connection is closed.
type GrpcConnectionProvider interface {
// CurrentConn returns the currently active gRPC connection.
// The connection is created lazily on first call.
// Returns nil if the provider has been closed.
CurrentConn() *grpc.ClientConn
// CurrentHost returns the address of the currently active endpoint.
CurrentHost() string
// Hosts returns all configured endpoint addresses.
Hosts() []string
// SwitchHost switches to the endpoint at the given index.
// The new connection is created lazily on next CurrentConn() call.
SwitchHost(index int) error
// Close closes the current connection.
Close()
}
type grpcConnectionProvider struct {
// Immutable after construction - no lock needed for reads
endpoints []string
ctx context.Context
dialOpts []grpc.DialOption
// Current connection state (protected by mutex)
currentIndex uint64
conn *grpc.ClientConn
mu sync.Mutex
closed bool
}
// NewGrpcConnectionProvider creates a new connection provider that manages gRPC connections.
// The endpoint parameter can be a comma-separated list of addresses (e.g., "host1:4000,host2:4000").
// Only one connection is maintained at a time, created lazily on first use.
func NewGrpcConnectionProvider(
ctx context.Context,
endpoint string,
dialOpts []grpc.DialOption,
) (GrpcConnectionProvider, error) {
endpoints := parseEndpoints(endpoint)
if len(endpoints) == 0 {
return nil, errors.New("no gRPC endpoints provided")
}
log.WithFields(logrus.Fields{
"endpoints": endpoints,
"count": len(endpoints),
}).Info("Initialized gRPC connection provider")
return &grpcConnectionProvider{
endpoints: endpoints,
ctx: ctx,
dialOpts: dialOpts,
}, nil
}
// parseEndpoints splits a comma-separated endpoint string into individual endpoints.
func parseEndpoints(endpoint string) []string {
if endpoint == "" {
return nil
}
endpoints := make([]string, 0, 1)
for p := range strings.SplitSeq(endpoint, ",") {
if p = strings.TrimSpace(p); p != "" {
endpoints = append(endpoints, p)
}
}
return endpoints
}
func (p *grpcConnectionProvider) CurrentConn() *grpc.ClientConn {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return nil
}
// Return existing connection if available
if p.conn != nil {
return p.conn
}
// Create connection lazily
ep := p.endpoints[p.currentIndex]
conn, err := grpc.DialContext(p.ctx, ep, p.dialOpts...)
if err != nil {
log.WithError(err).WithField("endpoint", ep).Error("Failed to create gRPC connection")
return nil
}
p.conn = conn
log.WithField("endpoint", ep).Debug("Created gRPC connection")
return conn
}
func (p *grpcConnectionProvider) CurrentHost() string {
p.mu.Lock()
defer p.mu.Unlock()
return p.endpoints[p.currentIndex]
}
func (p *grpcConnectionProvider) Hosts() []string {
// Return a copy to maintain immutability
hosts := make([]string, len(p.endpoints))
copy(hosts, p.endpoints)
return hosts
}
func (p *grpcConnectionProvider) SwitchHost(index int) error {
if index < 0 || index >= len(p.endpoints) {
return errors.Errorf("invalid host index %d, must be between 0 and %d", index, len(p.endpoints)-1)
}
p.mu.Lock()
defer p.mu.Unlock()
if uint64(index) == p.currentIndex {
return nil // Already on this host
}
oldHost := p.endpoints[p.currentIndex]
oldConn := p.conn
p.conn = nil // Clear immediately - new connection created lazily
p.currentIndex = uint64(index)
// Close old connection asynchronously to avoid blocking the caller
if oldConn != nil {
go func() {
if err := oldConn.Close(); err != nil {
log.WithError(err).WithField("endpoint", oldHost).Debug("Failed to close previous connection")
}
}()
}
log.WithFields(logrus.Fields{
"previousHost": oldHost,
"newHost": p.endpoints[index],
}).Debug("Switched gRPC endpoint")
return nil
}
func (p *grpcConnectionProvider) Close() {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return
}
p.closed = true
if p.conn != nil {
if err := p.conn.Close(); err != nil {
log.WithError(err).WithField("endpoint", p.endpoints[p.currentIndex]).Debug("Failed to close gRPC connection")
}
p.conn = nil
}
}

View File

@@ -0,0 +1,207 @@
package grpc
import (
"context"
"net"
"reflect"
"strings"
"testing"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func TestParseEndpoints(t *testing.T) {
tests := []struct {
name string
input string
expected []string
}{
{"single endpoint", "localhost:4000", []string{"localhost:4000"}},
{"multiple endpoints", "host1:4000,host2:4000,host3:4000", []string{"host1:4000", "host2:4000", "host3:4000"}},
{"endpoints with spaces", "host1:4000, host2:4000 , host3:4000", []string{"host1:4000", "host2:4000", "host3:4000"}},
{"empty string", "", nil},
{"only commas", ",,,", []string{}},
{"trailing comma", "host1:4000,host2:4000,", []string{"host1:4000", "host2:4000"}},
{"leading comma", ",host1:4000,host2:4000", []string{"host1:4000", "host2:4000"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := parseEndpoints(tt.input)
if !reflect.DeepEqual(tt.expected, got) {
t.Errorf("parseEndpoints(%q) = %v, want %v", tt.input, got, tt.expected)
}
})
}
}
func TestNewGrpcConnectionProvider_Errors(t *testing.T) {
t.Run("no endpoints", func(t *testing.T) {
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
_, err := NewGrpcConnectionProvider(context.Background(), "", dialOpts)
require.ErrorContains(t, "no gRPC endpoints provided", err)
})
}
func TestGrpcConnectionProvider_LazyConnection(t *testing.T) {
// Start only one server but configure provider with two endpoints
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
server := grpc.NewServer()
go func() { _ = server.Serve(lis) }()
defer server.Stop()
validAddr := lis.Addr().String()
invalidAddr := "127.0.0.1:1" // Port 1 is unlikely to be listening
// Provider should succeed even though second endpoint is invalid (lazy connections)
endpoint := validAddr + "," + invalidAddr
ctx := context.Background()
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
provider, err := NewGrpcConnectionProvider(ctx, endpoint, dialOpts)
require.NoError(t, err, "Provider creation should succeed with lazy connections")
defer func() { provider.Close() }()
// First endpoint should work
conn := provider.CurrentConn()
assert.NotNil(t, conn, "First connection should be created lazily")
}
func TestGrpcConnectionProvider_SingleConnectionModel(t *testing.T) {
// Create provider with 3 endpoints
var addrs []string
var servers []*grpc.Server
for range 3 {
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
server := grpc.NewServer()
go func() { _ = server.Serve(lis) }()
addrs = append(addrs, lis.Addr().String())
servers = append(servers, server)
}
defer func() {
for _, s := range servers {
s.Stop()
}
}()
endpoint := strings.Join(addrs, ",")
ctx := context.Background()
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
provider, err := NewGrpcConnectionProvider(ctx, endpoint, dialOpts)
require.NoError(t, err)
defer func() { provider.Close() }()
// Access the internal state to verify single connection behavior
p := provider.(*grpcConnectionProvider)
// Initially no connection
p.mu.Lock()
assert.Equal(t, (*grpc.ClientConn)(nil), p.conn, "Connection should be nil before access")
p.mu.Unlock()
// Access connection - should create one
conn0 := provider.CurrentConn()
assert.NotNil(t, conn0)
p.mu.Lock()
assert.NotNil(t, p.conn, "Connection should be created after CurrentConn()")
firstConn := p.conn
p.mu.Unlock()
// Call CurrentConn again - should return same connection
conn0Again := provider.CurrentConn()
assert.Equal(t, conn0, conn0Again, "Should return same connection")
// Switch to different host - old connection should be closed, new one created lazily
require.NoError(t, provider.SwitchHost(1))
p.mu.Lock()
assert.Equal(t, (*grpc.ClientConn)(nil), p.conn, "Connection should be nil after SwitchHost (lazy)")
p.mu.Unlock()
// Get new connection
conn1 := provider.CurrentConn()
assert.NotNil(t, conn1)
assert.NotEqual(t, firstConn, conn1, "Should be a different connection after switching hosts")
}
// testProvider creates a provider with n test servers and returns cleanup function.
func testProvider(t *testing.T, n int) (GrpcConnectionProvider, []string, func()) {
var addrs []string
var cleanups []func()
for range n {
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
server := grpc.NewServer()
go func() { _ = server.Serve(lis) }()
addrs = append(addrs, lis.Addr().String())
cleanups = append(cleanups, server.Stop)
}
endpoint := strings.Join(addrs, ",")
ctx := context.Background()
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
provider, err := NewGrpcConnectionProvider(ctx, endpoint, dialOpts)
require.NoError(t, err)
cleanup := func() {
provider.Close()
for _, c := range cleanups {
c()
}
}
return provider, addrs, cleanup
}
func TestGrpcConnectionProvider(t *testing.T) {
provider, addrs, cleanup := testProvider(t, 3)
defer cleanup()
t.Run("initial state", func(t *testing.T) {
assert.Equal(t, 3, len(provider.Hosts()))
assert.Equal(t, addrs[0], provider.CurrentHost())
assert.NotNil(t, provider.CurrentConn())
})
t.Run("SwitchHost", func(t *testing.T) {
require.NoError(t, provider.SwitchHost(1))
assert.Equal(t, addrs[1], provider.CurrentHost())
assert.NotNil(t, provider.CurrentConn()) // New connection created lazily
require.NoError(t, provider.SwitchHost(0))
assert.Equal(t, addrs[0], provider.CurrentHost())
require.ErrorContains(t, "invalid host index", provider.SwitchHost(-1))
require.ErrorContains(t, "invalid host index", provider.SwitchHost(3))
})
t.Run("SwitchHost circular", func(t *testing.T) {
// Test round-robin style switching using SwitchHost with manual index
indices := []int{1, 2, 0, 1} // Simulate circular switching
for i, idx := range indices {
require.NoError(t, provider.SwitchHost(idx))
assert.Equal(t, addrs[idx], provider.CurrentHost(), "iteration %d", i)
}
})
t.Run("Hosts returns copy", func(t *testing.T) {
hosts := provider.Hosts()
original := hosts[0]
hosts[0] = "modified"
assert.Equal(t, original, provider.Hosts()[0])
})
}
func TestGrpcConnectionProvider_Close(t *testing.T) {
provider, _, cleanup := testProvider(t, 1)
defer cleanup()
assert.NotNil(t, provider.CurrentConn())
provider.Close()
assert.Equal(t, (*grpc.ClientConn)(nil), provider.CurrentConn())
provider.Close() // Double close is safe
}

View File

@@ -0,0 +1,20 @@
package grpc
import "google.golang.org/grpc"
// MockGrpcProvider implements GrpcConnectionProvider for testing.
type MockGrpcProvider struct {
MockConn *grpc.ClientConn
MockHosts []string
}
func (m *MockGrpcProvider) CurrentConn() *grpc.ClientConn { return m.MockConn }
func (m *MockGrpcProvider) CurrentHost() string {
if len(m.MockHosts) > 0 {
return m.MockHosts[0]
}
return ""
}
func (m *MockGrpcProvider) Hosts() []string { return m.MockHosts }
func (m *MockGrpcProvider) SwitchHost(int) error { return nil }
func (m *MockGrpcProvider) Close() {}

34
api/rest/BUILD.bazel Normal file
View File

@@ -0,0 +1,34 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"log.go",
"mock_rest_provider.go",
"rest_connection_provider.go",
"rest_handler.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/api/rest",
visibility = ["//visibility:public"],
deps = [
"//api:go_default_library",
"//api/apiutil:go_default_library",
"//api/client:go_default_library",
"//config/params:go_default_library",
"//network/httputil:go_default_library",
"//runtime/version:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opentelemetry_go_contrib_instrumentation_net_http_otelhttp//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["rest_connection_provider_test.go"],
embed = [":go_default_library"],
deps = [
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
],
)

9
api/rest/log.go Normal file
View File

@@ -0,0 +1,9 @@
// Code generated by hack/gen-logs.sh; DO NOT EDIT.
// This file is created and regenerated automatically. Anything added here might get removed.
package rest
import "github.com/sirupsen/logrus"
// The prefix for logs from this package will be the text after the last slash in the package path.
// If you wish to change this, you should add your desired name in the runtime/logging/logrus-prefixed-formatter/prefix-replacement.go file.
var log = logrus.WithField("package", "api/rest")

View File

@@ -0,0 +1,49 @@
package rest
import (
"bytes"
"context"
"net/http"
)
// MockRestProvider implements RestConnectionProvider for testing.
type MockRestProvider struct {
MockClient *http.Client
MockHandler RestHandler
MockHosts []string
HostIndex int
}
func (m *MockRestProvider) HttpClient() *http.Client { return m.MockClient }
func (m *MockRestProvider) RestHandler() RestHandler { return m.MockHandler }
func (m *MockRestProvider) CurrentHost() string {
if len(m.MockHosts) > 0 {
return m.MockHosts[m.HostIndex%len(m.MockHosts)]
}
return ""
}
func (m *MockRestProvider) Hosts() []string { return m.MockHosts }
func (m *MockRestProvider) SwitchHost(index int) error { m.HostIndex = index; return nil }
// MockRestHandler implements RestHandler for testing.
type MockRestHandler struct {
MockHost string
MockClient *http.Client
}
func (m *MockRestHandler) Get(_ context.Context, _ string, _ any) error { return nil }
func (m *MockRestHandler) GetStatusCode(_ context.Context, _ string) (int, error) {
return http.StatusOK, nil
}
func (m *MockRestHandler) GetSSZ(_ context.Context, _ string) ([]byte, http.Header, error) {
return nil, nil, nil
}
func (m *MockRestHandler) Post(_ context.Context, _ string, _ map[string]string, _ *bytes.Buffer, _ any) error {
return nil
}
func (m *MockRestHandler) PostSSZ(_ context.Context, _ string, _ map[string]string, _ *bytes.Buffer) ([]byte, http.Header, error) {
return nil, nil, nil
}
func (m *MockRestHandler) HttpClient() *http.Client { return m.MockClient }
func (m *MockRestHandler) Host() string { return m.MockHost }
func (m *MockRestHandler) SwitchHost(host string) { m.MockHost = host }

View File

@@ -0,0 +1,158 @@
package rest
import (
"net/http"
"strings"
"sync/atomic"
"time"
"github.com/OffchainLabs/prysm/v7/api/client"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)
// RestConnectionProvider manages HTTP client configuration for REST API with failover support.
// It allows switching between different beacon node REST endpoints when the current one becomes unavailable.
type RestConnectionProvider interface {
// HttpClient returns the configured HTTP client with headers, timeout, and optional tracing.
HttpClient() *http.Client
// RestHandler returns the REST handler for making API requests.
RestHandler() RestHandler
// CurrentHost returns the current REST API endpoint URL.
CurrentHost() string
// Hosts returns all configured REST API endpoint URLs.
Hosts() []string
// SwitchHost switches to the endpoint at the given index.
SwitchHost(index int) error
}
// RestConnectionProviderOption is a functional option for configuring the REST connection provider.
type RestConnectionProviderOption func(*restConnectionProvider)
// WithHttpTimeout sets the HTTP client timeout.
func WithHttpTimeout(timeout time.Duration) RestConnectionProviderOption {
return func(p *restConnectionProvider) {
p.timeout = timeout
}
}
// WithHttpHeaders sets custom HTTP headers to include in all requests.
func WithHttpHeaders(headers map[string][]string) RestConnectionProviderOption {
return func(p *restConnectionProvider) {
p.headers = headers
}
}
// WithTracing enables OpenTelemetry tracing for HTTP requests.
func WithTracing() RestConnectionProviderOption {
return func(p *restConnectionProvider) {
p.enableTracing = true
}
}
type restConnectionProvider struct {
endpoints []string
httpClient *http.Client
restHandler RestHandler
currentIndex atomic.Uint64
timeout time.Duration
headers map[string][]string
enableTracing bool
}
// NewRestConnectionProvider creates a new REST connection provider that manages HTTP client configuration.
// The endpoint parameter can be a comma-separated list of URLs (e.g., "http://host1:3500,http://host2:3500").
func NewRestConnectionProvider(endpoint string, opts ...RestConnectionProviderOption) (RestConnectionProvider, error) {
endpoints := parseEndpoints(endpoint)
if len(endpoints) == 0 {
return nil, errors.New("no REST API endpoints provided")
}
p := &restConnectionProvider{
endpoints: endpoints,
}
for _, opt := range opts {
opt(p)
}
// Build the HTTP transport chain
var transport http.RoundTripper = http.DefaultTransport
// Add custom headers if configured
if len(p.headers) > 0 {
transport = client.NewCustomHeadersTransport(transport, p.headers)
}
// Add tracing if enabled
if p.enableTracing {
transport = otelhttp.NewTransport(transport)
}
p.httpClient = &http.Client{
Timeout: p.timeout,
Transport: transport,
}
// Create the REST handler with the HTTP client and initial host
p.restHandler = newRestHandler(*p.httpClient, endpoints[0])
log.WithFields(logrus.Fields{
"endpoints": endpoints,
"count": len(endpoints),
}).Info("Initialized REST connection provider")
return p, nil
}
// parseEndpoints splits a comma-separated endpoint string into individual endpoints.
func parseEndpoints(endpoint string) []string {
if endpoint == "" {
return nil
}
endpoints := make([]string, 0, 1)
for p := range strings.SplitSeq(endpoint, ",") {
if p = strings.TrimSpace(p); p != "" {
endpoints = append(endpoints, p)
}
}
return endpoints
}
func (p *restConnectionProvider) HttpClient() *http.Client {
return p.httpClient
}
func (p *restConnectionProvider) RestHandler() RestHandler {
return p.restHandler
}
func (p *restConnectionProvider) CurrentHost() string {
return p.endpoints[p.currentIndex.Load()]
}
func (p *restConnectionProvider) Hosts() []string {
// Return a copy to maintain immutability
hosts := make([]string, len(p.endpoints))
copy(hosts, p.endpoints)
return hosts
}
func (p *restConnectionProvider) SwitchHost(index int) error {
if index < 0 || index >= len(p.endpoints) {
return errors.Errorf("invalid host index %d, must be between 0 and %d", index, len(p.endpoints)-1)
}
oldIdx := p.currentIndex.Load()
p.currentIndex.Store(uint64(index))
// Update the rest handler's host
p.restHandler.SwitchHost(p.endpoints[index])
log.WithFields(logrus.Fields{
"previousHost": p.endpoints[oldIdx],
"newHost": p.endpoints[index],
}).Debug("Switched REST endpoint")
return nil
}

View File

@@ -0,0 +1,80 @@
package rest
import (
"reflect"
"testing"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
)
func TestParseEndpoints(t *testing.T) {
tests := []struct {
name string
input string
expected []string
}{
{"single endpoint", "http://localhost:3500", []string{"http://localhost:3500"}},
{"multiple endpoints", "http://host1:3500,http://host2:3500,http://host3:3500", []string{"http://host1:3500", "http://host2:3500", "http://host3:3500"}},
{"endpoints with spaces", "http://host1:3500, http://host2:3500 , http://host3:3500", []string{"http://host1:3500", "http://host2:3500", "http://host3:3500"}},
{"empty string", "", nil},
{"only commas", ",,,", []string{}},
{"trailing comma", "http://host1:3500,http://host2:3500,", []string{"http://host1:3500", "http://host2:3500"}},
{"leading comma", ",http://host1:3500,http://host2:3500", []string{"http://host1:3500", "http://host2:3500"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := parseEndpoints(tt.input)
if !reflect.DeepEqual(tt.expected, got) {
t.Errorf("parseEndpoints(%q) = %v, want %v", tt.input, got, tt.expected)
}
})
}
}
func TestNewRestConnectionProvider_Errors(t *testing.T) {
t.Run("no endpoints", func(t *testing.T) {
_, err := NewRestConnectionProvider("")
require.ErrorContains(t, "no REST API endpoints provided", err)
})
}
func TestRestConnectionProvider(t *testing.T) {
provider, err := NewRestConnectionProvider("http://host1:3500,http://host2:3500,http://host3:3500")
require.NoError(t, err)
t.Run("initial state", func(t *testing.T) {
assert.Equal(t, 3, len(provider.Hosts()))
assert.Equal(t, "http://host1:3500", provider.CurrentHost())
assert.NotNil(t, provider.HttpClient())
})
t.Run("SwitchHost", func(t *testing.T) {
require.NoError(t, provider.SwitchHost(1))
assert.Equal(t, "http://host2:3500", provider.CurrentHost())
require.NoError(t, provider.SwitchHost(0))
assert.Equal(t, "http://host1:3500", provider.CurrentHost())
require.ErrorContains(t, "invalid host index", provider.SwitchHost(-1))
require.ErrorContains(t, "invalid host index", provider.SwitchHost(3))
})
t.Run("Hosts returns copy", func(t *testing.T) {
hosts := provider.Hosts()
original := hosts[0]
hosts[0] = "modified"
assert.Equal(t, original, provider.Hosts()[0])
})
}
func TestRestConnectionProvider_WithOptions(t *testing.T) {
headers := map[string][]string{"Authorization": {"Bearer token"}}
provider, err := NewRestConnectionProvider(
"http://localhost:3500",
WithHttpHeaders(headers),
WithHttpTimeout(30000000000), // 30 seconds in nanoseconds
WithTracing(),
)
require.NoError(t, err)
assert.NotNil(t, provider.HttpClient())
assert.Equal(t, "http://localhost:3500", provider.CurrentHost())
}

View File

@@ -1,4 +1,4 @@
package beacon_api
package rest
import (
"bytes"
@@ -21,6 +21,7 @@ import (
type reqOption func(*http.Request)
// RestHandler defines the interface for making REST API requests.
type RestHandler interface {
Get(ctx context.Context, endpoint string, resp any) error
GetStatusCode(ctx context.Context, endpoint string) (int, error)
@@ -29,29 +30,34 @@ type RestHandler interface {
PostSSZ(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer) ([]byte, http.Header, error)
HttpClient() *http.Client
Host() string
SetHost(host string)
SwitchHost(host string)
}
type BeaconApiRestHandler struct {
type restHandler struct {
client http.Client
host string
reqOverrides []reqOption
}
// NewBeaconApiRestHandler returns a RestHandler
func NewBeaconApiRestHandler(client http.Client, host string) RestHandler {
brh := &BeaconApiRestHandler{
// newRestHandler returns a RestHandler (internal use)
func newRestHandler(client http.Client, host string) RestHandler {
return NewRestHandler(client, host)
}
// NewRestHandler returns a RestHandler
func NewRestHandler(client http.Client, host string) RestHandler {
rh := &restHandler{
client: client,
host: host,
}
brh.appendAcceptOverride()
return brh
rh.appendAcceptOverride()
return rh
}
// appendAcceptOverride enables the Accept header to be customized at runtime via an environment variable.
// This is specified as an env var because it is a niche option that prysm may use for performance testing or debugging
// bug which users are unlikely to need. Using an env var keeps the set of user-facing flags cleaner.
func (c *BeaconApiRestHandler) appendAcceptOverride() {
func (c *restHandler) appendAcceptOverride() {
if accept := os.Getenv(params.EnvNameOverrideAccept); accept != "" {
c.reqOverrides = append(c.reqOverrides, func(req *http.Request) {
req.Header.Set("Accept", accept)
@@ -60,18 +66,18 @@ func (c *BeaconApiRestHandler) appendAcceptOverride() {
}
// HttpClient returns the underlying HTTP client of the handler
func (c *BeaconApiRestHandler) HttpClient() *http.Client {
func (c *restHandler) HttpClient() *http.Client {
return &c.client
}
// Host returns the underlying HTTP host
func (c *BeaconApiRestHandler) Host() string {
func (c *restHandler) Host() string {
return c.host
}
// Get sends a GET request and decodes the response body as a JSON object into the passed in object.
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
func (c *BeaconApiRestHandler) Get(ctx context.Context, endpoint string, resp any) error {
func (c *restHandler) Get(ctx context.Context, endpoint string, resp any) error {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@@ -94,7 +100,7 @@ func (c *BeaconApiRestHandler) Get(ctx context.Context, endpoint string, resp an
// GetStatusCode sends a GET request and returns only the HTTP status code.
// This is useful for endpoints like /eth/v1/node/health that communicate status via HTTP codes
// (200 = ready, 206 = syncing, 503 = unavailable) rather than response bodies.
func (c *BeaconApiRestHandler) GetStatusCode(ctx context.Context, endpoint string) (int, error) {
func (c *restHandler) GetStatusCode(ctx context.Context, endpoint string) (int, error) {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@@ -113,7 +119,7 @@ func (c *BeaconApiRestHandler) GetStatusCode(ctx context.Context, endpoint strin
return httpResp.StatusCode, nil
}
func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
func (c *restHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@@ -168,7 +174,7 @@ func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]b
// Post sends a POST request and decodes the response body as a JSON object into the passed in object.
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
func (c *BeaconApiRestHandler) Post(
func (c *restHandler) Post(
ctx context.Context,
apiEndpoint string,
headers map[string]string,
@@ -204,7 +210,7 @@ func (c *BeaconApiRestHandler) Post(
}
// PostSSZ sends a POST request and prefers an SSZ (application/octet-stream) response body.
func (c *BeaconApiRestHandler) PostSSZ(
func (c *restHandler) PostSSZ(
ctx context.Context,
apiEndpoint string,
headers map[string]string,
@@ -305,6 +311,6 @@ func decodeResp(httpResp *http.Response, resp any) error {
return nil
}
func (c *BeaconApiRestHandler) SetHost(host string) {
func (c *restHandler) SwitchHost(host string) {
c.host = host
}

View File

@@ -27,7 +27,6 @@ go_library(
"receive_blob.go",
"receive_block.go",
"receive_data_column.go",
"receive_proof.go",
"service.go",
"setup_forkchoice.go",
"tracked_proposer.go",
@@ -50,7 +49,6 @@ go_library(
"//beacon-chain/core/electra:go_default_library",
"//beacon-chain/core/epoch/precompute:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
@@ -76,7 +74,6 @@ go_library(
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/features:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",

View File

@@ -110,7 +110,7 @@ func VerifyCellKZGProofBatch(commitmentsBytes []Bytes48, cellIndices []uint64, c
ckzgCells := make([]ckzg4844.Cell, len(cells))
for i := range cells {
copy(ckzgCells[i][:], cells[i][:])
ckzgCells[i] = ckzg4844.Cell(cells[i])
}
return ckzg4844.VerifyCellKZGProofBatch(commitmentsBytes, cellIndices, ckzgCells, proofsBytes)
}

View File

@@ -5,7 +5,6 @@ import (
"github.com/OffchainLabs/prysm/v7/async/event"
"github.com/OffchainLabs/prysm/v7/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/operation"
statefeed "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/db"
"github.com/OffchainLabs/prysm/v7/beacon-chain/db/filesystem"
@@ -227,14 +226,6 @@ func WithDataColumnStorage(b *filesystem.DataColumnStorage) Option {
}
}
// WithProofStorage sets the proof storage backend for the blockchain service.
func WithProofStorage(p *filesystem.ProofStorage) Option {
return func(s *Service) error {
s.proofStorage = p
return nil
}
}
// WithSyncChecker sets the sync checker for the blockchain service.
func WithSyncChecker(checker Checker) Option {
return func(s *Service) error {
@@ -275,10 +266,3 @@ func WithStartWaitingDataColumnSidecars(c chan bool) Option {
return nil
}
}
func WithOperationNotifier(operationNotifier operation.Notifier) Option {
return func(s *Service) error {
s.cfg.OperationNotifier = operationNotifier
return nil
}
}

View File

@@ -15,7 +15,6 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/db/filesystem"
forkchoicetypes "github.com/OffchainLabs/prysm/v7/beacon-chain/forkchoice/types"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v7/config/features"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
@@ -114,7 +113,6 @@ func (s *Service) postBlockProcess(cfg *postBlockProcessConfig) error {
s.updateCachesPostBlockProcessing(cfg)
}()
}
return nil
}
@@ -663,17 +661,10 @@ func (s *Service) isDataAvailable(
return errors.New("invalid nil beacon block")
}
root, blockVersion := roBlock.Root(), roBlock.Version()
root := roBlock.Root()
blockVersion := block.Version()
if blockVersion >= version.Fulu {
if err := s.areExecutionProofsAvailable(ctx, roBlock); err != nil {
return fmt.Errorf("are execution proofs available: %w", err)
}
if err := s.areDataColumnsAvailable(ctx, root, block); err != nil {
return fmt.Errorf("are data columns available: %w", err)
}
return nil
return s.areDataColumnsAvailable(ctx, root, block)
}
if blockVersion >= version.Deneb {
@@ -683,77 +674,6 @@ func (s *Service) isDataAvailable(
return nil
}
// areExecutionProofsAvailable blocks until we have enough execution proofs to import the block,
// or an error or context cancellation occurs.
// This check is only performed for lightweight verifier nodes that need zkVM proofs
// to validate block execution (nodes without execution layer + proof generation capability).
// A nil result means that the data availability check is successful.
func (s *Service) areExecutionProofsAvailable(ctx context.Context, roBlock consensusblocks.ROBlock) error {
// Return early if zkVM features are disabled (no need to check for execution proofs),
// or if the generation proof is enabled (we will generate proofs ourselves).
if !features.Get().EnableZkvm || len(flags.Get().ProofGenerationTypes) > 0 {
return nil
}
root, slot := roBlock.Root(), roBlock.Block().Slot()
requiredProofCount := params.BeaconConfig().MinProofsRequired
log := log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"slot": slot,
"requiredProofCount": requiredProofCount,
})
// Subscribe to newly execution proofs stored in the database.
subscription, identChan := s.proofStorage.Subscribe()
defer subscription.Unsubscribe()
// Return early if we already have enough proofs.
if actualProofCount := uint64(s.proofStorage.Summary(root).Count()); actualProofCount >= requiredProofCount {
log.WithField("actualProofCount", actualProofCount).Debug("Already have enough execution proofs")
return nil
}
// Log for DA checks that cross over into the next slot; helpful for debugging.
nextSlot, err := slots.StartTime(s.genesisTime, roBlock.Block().Slot()+1)
if err != nil {
return fmt.Errorf("start time: %w", err)
}
// Avoid logging if DA check is called after next slot start.
if nextSlot.After(time.Now()) {
timer := time.AfterFunc(time.Until(nextSlot), func() {
actualCount := uint64(s.proofStorage.Summary(root).Count())
if actualCount >= requiredProofCount {
return
}
log.WithField("proofsRetrieved", actualCount).Warning("Execution proofs still missing at slot end")
})
defer timer.Stop()
}
// Some proofs are missing; wait for them.
for {
select {
case <-ctx.Done():
return ctx.Err()
case proofIdent := <-identChan:
// Skip if the proof is for a different block.
if proofIdent.BlockRoot != root {
continue
}
// Return if we have enough proofs.
if actualProofCount := uint64(s.proofStorage.Summary(root).Count()); actualProofCount >= requiredProofCount {
log.WithField("actualProofCount", actualProofCount).Debug("Got enough execution proofs")
return nil
}
}
}
}
// areDataColumnsAvailable blocks until all data columns committed to in the block are available,
// or an error or context cancellation occurs. A nil result means that the data availability check is successful.
func (s *Service) areDataColumnsAvailable(
@@ -890,7 +810,14 @@ func (s *Service) areDataColumnsAvailable(
}
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), "data column sidecars slot: %d, BlockRoot: %#x, missing: %v", block.Slot(), root, helpers.SortedPrettySliceFromMap(missing))
var missingIndices any = "all"
missingIndicesCount := len(missing)
if missingIndicesCount < fieldparams.NumberOfColumns {
missingIndices = helpers.SortedPrettySliceFromMap(missing)
}
return errors.Wrapf(ctx.Err(), "data column sidecars slot: %d, BlockRoot: %#x, missing: %v", block.Slot(), root, missingIndices)
}
}
}

View File

@@ -60,12 +60,6 @@ type DataColumnReceiver interface {
ReceiveDataColumns([]blocks.VerifiedRODataColumn) error
}
// ProofReceiver interface defines the methods of chain service for receiving new
// execution proofs
type ProofReceiver interface {
ReceiveProof(proof *ethpb.ExecutionProof) error
}
// SlashingReceiver interface defines the methods of chain service for receiving validated slashing over the wire.
type SlashingReceiver interface {
ReceiveAttesterSlashing(ctx context.Context, slashing ethpb.AttSlashing)

View File

@@ -1,15 +0,0 @@
package blockchain
import (
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/pkg/errors"
)
// ReceiveProof saves an execution proof to storage.
func (s *Service) ReceiveProof(proof *ethpb.ExecutionProof) error {
if err := s.proofStorage.Save([]*ethpb.ExecutionProof{proof}); err != nil {
return errors.Wrap(err, "save proof")
}
return nil
}

View File

@@ -12,7 +12,6 @@ import (
"github.com/OffchainLabs/prysm/v7/async/event"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v7/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/operation"
statefeed "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
coreTime "github.com/OffchainLabs/prysm/v7/beacon-chain/core/time"
@@ -65,7 +64,6 @@ type Service struct {
blockBeingSynced *currentlySyncingBlock
blobStorage *filesystem.BlobStorage
dataColumnStorage *filesystem.DataColumnStorage
proofStorage *filesystem.ProofStorage
slasherEnabled bool
lcStore *lightClient.Store
startWaitingDataColumnSidecars chan bool // for testing purposes only
@@ -88,7 +86,6 @@ type config struct {
P2P p2p.Accessor
MaxRoutines int
StateNotifier statefeed.Notifier
OperationNotifier operation.Notifier
ForkChoiceStore f.ForkChoicer
AttService *attestations.Service
StateGen *stategen.State
@@ -212,8 +209,7 @@ func (s *Service) Start() {
if err := s.StartFromSavedState(s.cfg.FinalizedStateAtStartUp); err != nil {
log.Fatal(err)
}
go s.spawnProcessAttestationsRoutine()
s.spawnProcessAttestationsRoutine()
go s.runLateBlockTasks()
}

View File

@@ -89,7 +89,7 @@ func (mb *mockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
return nil
}
func (mb *mockBroadcaster) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn) error {
func (mb *mockBroadcaster) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn, _ []blocks.PartialDataColumn) error {
mb.broadcastCalled = true
return nil
}

View File

@@ -75,7 +75,6 @@ type ChainService struct {
SyncingRoot [32]byte
Blobs []blocks.VerifiedROBlob
DataColumns []blocks.VerifiedRODataColumn
Proofs []*ethpb.ExecutionProof
TargetRoot [32]byte
MockHeadSlot *primitives.Slot
}
@@ -758,12 +757,6 @@ func (c *ChainService) ReceiveDataColumns(dcs []blocks.VerifiedRODataColumn) err
return nil
}
// ReceiveProof implements the same method in chain service
func (c *ChainService) ReceiveProof(proof *ethpb.ExecutionProof) error {
c.Proofs = append(c.Proofs, proof)
return nil
}
// DependentRootForEpoch mocks the same method in the chain service
func (c *ChainService) DependentRootForEpoch(_ [32]byte, _ primitives.Epoch) ([32]byte, error) {
return c.TargetRoot, nil

View File

@@ -46,9 +46,6 @@ const (
// DataColumnReceived is sent after a data column has been seen after gossip validation rules.
DataColumnReceived = 12
// ExecutionProofReceived is sent after a execution proof object has been received from gossip or rpc.
ExecutionProofReceived = 13
)
// UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events.
@@ -80,11 +77,6 @@ type BLSToExecutionChangeReceivedData struct {
Change *ethpb.SignedBLSToExecutionChange
}
// ExecutionProofReceivedData is the data sent with ExecutionProofReceived events.
type ExecutionProofReceivedData struct {
ExecutionProof *ethpb.ExecutionProof
}
// BlobSidecarReceivedData is the data sent with BlobSidecarReceived events.
type BlobSidecarReceivedData struct {
Blob *blocks.VerifiedROBlob

View File

@@ -114,17 +114,32 @@ func payloadCommittee(ctx context.Context, st state.ReadOnlyBeaconState, slot pr
}
committeesPerSlot := helpers.SlotCommitteeCount(activeCount)
out := make([]primitives.ValidatorIndex, 0, activeCount/uint64(params.BeaconConfig().SlotsPerEpoch))
for i := primitives.CommitteeIndex(0); i < primitives.CommitteeIndex(committeesPerSlot); i++ {
committee, err := helpers.BeaconCommitteeFromState(ctx, st, slot, i)
if err != nil {
return nil, errors.Wrapf(err, "failed to get beacon committee %d", i)
selected := make([]primitives.ValidatorIndex, 0, fieldparams.PTCSize)
var i uint64
for uint64(len(selected)) < fieldparams.PTCSize {
if ctx.Err() != nil {
return nil, ctx.Err()
}
for committeeIndex := primitives.CommitteeIndex(0); committeeIndex < primitives.CommitteeIndex(committeesPerSlot); committeeIndex++ {
if uint64(len(selected)) >= fieldparams.PTCSize {
break
}
committee, err := helpers.BeaconCommitteeFromState(ctx, st, slot, committeeIndex)
if err != nil {
return nil, errors.Wrapf(err, "failed to get beacon committee %d", committeeIndex)
}
selected, i, err = selectByBalanceFill(ctx, st, committee, seed, selected, i)
if err != nil {
return nil, errors.Wrapf(err, "failed to sample beacon committee %d", committeeIndex)
}
}
out = append(out, committee...)
}
return selectByBalance(ctx, st, out, seed, fieldparams.PTCSize)
return selected, nil
}
// ptcSeed computes the seed for the payload timeliness committee.
@@ -148,33 +163,39 @@ func ptcSeed(st state.ReadOnlyBeaconState, epoch primitives.Epoch, slot primitiv
// if compute_balance_weighted_acceptance(state, indices[next], seed, i):
// selected.append(indices[next])
// i += 1
func selectByBalance(ctx context.Context, st state.ReadOnlyBeaconState, candidates []primitives.ValidatorIndex, seed [32]byte, count uint64) ([]primitives.ValidatorIndex, error) {
if len(candidates) == 0 {
return nil, errors.New("no candidates for balance weighted selection")
}
func selectByBalanceFill(
ctx context.Context,
st state.ReadOnlyBeaconState,
candidates []primitives.ValidatorIndex,
seed [32]byte,
selected []primitives.ValidatorIndex,
i uint64,
) ([]primitives.ValidatorIndex, uint64, error) {
hashFunc := hash.CustomSHA256Hasher()
// Pre-allocate buffer for hash input: seed (32 bytes) + round counter (8 bytes).
var buf [40]byte
copy(buf[:], seed[:])
maxBalance := params.BeaconConfig().MaxEffectiveBalanceElectra
selected := make([]primitives.ValidatorIndex, 0, count)
total := uint64(len(candidates))
for i := uint64(0); uint64(len(selected)) < count; i++ {
for _, idx := range candidates {
if ctx.Err() != nil {
return nil, ctx.Err()
return nil, i, ctx.Err()
}
idx := candidates[i%total]
ok, err := acceptByBalance(st, idx, buf[:], hashFunc, maxBalance, i)
if err != nil {
return nil, err
return nil, i, err
}
if ok {
selected = append(selected, idx)
}
if uint64(len(selected)) == fieldparams.PTCSize {
break
}
i++
}
return selected, nil
return selected, i, nil
}
// acceptByBalance determines if a validator is accepted based on its effective balance.

View File

@@ -33,6 +33,7 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

View File

@@ -1,11 +1,15 @@
package peerdas
import (
stderrors "errors"
"iter"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/container/trie"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/pkg/errors"
)
@@ -16,6 +20,7 @@ var (
ErrIndexTooLarge = errors.New("column index is larger than the specified columns count")
ErrNoKzgCommitments = errors.New("no KZG commitments found")
ErrMismatchLength = errors.New("mismatch in the length of the column, commitments or proofs")
ErrEmptySegment = errors.New("empty segment in batch")
ErrInvalidKZGProof = errors.New("invalid KZG proof")
ErrBadRootLength = errors.New("bad root length")
ErrInvalidInclusionProof = errors.New("invalid inclusion proof")
@@ -57,80 +62,113 @@ func VerifyDataColumnSidecar(sidecar blocks.RODataColumn) error {
return nil
}
// VerifyDataColumnsSidecarKZGProofs verifies if the KZG proofs are correct.
// CellProofBundleSegment is returned when a batch fails. The caller can call
// the `.Verify` method to verify just this segment.
type CellProofBundleSegment struct {
indices []uint64
commitments []kzg.Bytes48
cells []kzg.Cell
proofs []kzg.Bytes48
}
// Verify verifies this segment without batching.
func (s CellProofBundleSegment) Verify() error {
if len(s.cells) == 0 {
return ErrEmptySegment
}
verified, err := kzg.VerifyCellKZGProofBatch(s.commitments, s.indices, s.cells, s.proofs)
if err != nil {
return stderrors.Join(err, ErrInvalidKZGProof)
}
if !verified {
return ErrInvalidKZGProof
}
return nil
}
func VerifyDataColumnsCellsKZGProofs(sizeHint int, cellProofsIter iter.Seq[blocks.CellProofBundle]) error {
// ignore the failed segment list since we are just passing in one segment.
_, err := BatchVerifyDataColumnsCellsKZGProofs(sizeHint, []iter.Seq[blocks.CellProofBundle]{cellProofsIter})
return err
}
// BatchVerifyDataColumnsCellsKZGProofs verifies if the KZG proofs are correct.
// Note: We are slightly deviating from the specification here:
// The specification verifies the KZG proofs for each sidecar separately,
// while we are verifying all the KZG proofs from multiple sidecars in a batch.
// This is done to improve performance since the internal KZG library is way more
// efficient when verifying in batch.
// efficient when verifying in batch. If the batch fails, the failed segments
// are returned to the caller so that they may try segment by segment without
// batching. On success the failed segment list is empty.
//
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#verify_data_column_sidecar_kzg_proofs
func VerifyDataColumnsSidecarKZGProofs(sidecars []blocks.RODataColumn) error {
// Compute the total count.
count := 0
for _, sidecar := range sidecars {
count += len(sidecar.Column)
}
func BatchVerifyDataColumnsCellsKZGProofs(sizeHint int, cellProofsIters []iter.Seq[blocks.CellProofBundle]) ( /* failed segment list */ []CellProofBundleSegment, error) {
commitments := make([]kzg.Bytes48, 0, sizeHint)
indices := make([]uint64, 0, sizeHint)
cells := make([]kzg.Cell, 0, sizeHint)
proofs := make([]kzg.Bytes48, 0, sizeHint)
commitments := make([]kzg.Bytes48, 0, count)
indices := make([]uint64, 0, count)
cells := make([]kzg.Cell, 0, count)
proofs := make([]kzg.Bytes48, 0, count)
for _, sidecar := range sidecars {
for i := range sidecar.Column {
var anySegmentEmpty bool
var segments []CellProofBundleSegment
for _, cellProofsIter := range cellProofsIters {
startIdx := len(cells)
for bundle := range cellProofsIter {
var (
commitment kzg.Bytes48
cell kzg.Cell
proof kzg.Bytes48
)
commitmentBytes := sidecar.KzgCommitments[i]
cellBytes := sidecar.Column[i]
proofBytes := sidecar.KzgProofs[i]
if len(commitmentBytes) != len(commitment) ||
len(cellBytes) != len(cell) ||
len(proofBytes) != len(proof) {
return ErrMismatchLength
if len(bundle.Commitment) != len(commitment) ||
len(bundle.Cell) != len(cell) ||
len(bundle.Proof) != len(proof) {
return nil, ErrMismatchLength
}
copy(commitment[:], commitmentBytes)
copy(cell[:], cellBytes)
copy(proof[:], proofBytes)
copy(commitment[:], bundle.Commitment)
copy(cell[:], bundle.Cell)
copy(proof[:], bundle.Proof)
commitments = append(commitments, commitment)
indices = append(indices, sidecar.Index)
indices = append(indices, bundle.ColumnIndex)
cells = append(cells, cell)
proofs = append(proofs, proof)
}
if len(cells[startIdx:]) == 0 {
anySegmentEmpty = true
}
segments = append(segments, CellProofBundleSegment{
indices: indices[startIdx:],
commitments: commitments[startIdx:],
cells: cells[startIdx:],
proofs: proofs[startIdx:],
})
}
if anySegmentEmpty {
return segments, ErrEmptySegment
}
// Batch verify that the cells match the corresponding commitments and proofs.
verified, err := kzg.VerifyCellKZGProofBatch(commitments, indices, cells, proofs)
if err != nil {
return errors.Wrap(err, "verify cell KZG proof batch")
return segments, stderrors.Join(err, ErrInvalidKZGProof)
}
if !verified {
return ErrInvalidKZGProof
return segments, ErrInvalidKZGProof
}
return nil
return nil, nil
}
// VerifyDataColumnSidecarInclusionProof verifies if the given KZG commitments included in the given beacon block.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#verify_data_column_sidecar_inclusion_proof
func VerifyDataColumnSidecarInclusionProof(sidecar blocks.RODataColumn) error {
if sidecar.SignedBlockHeader == nil || sidecar.SignedBlockHeader.Header == nil {
return ErrNilBlockHeader
}
root := sidecar.SignedBlockHeader.Header.BodyRoot
if len(root) != fieldparams.RootLength {
// verifyKzgCommitmentsInclusionProof is the shared implementation for inclusion proof verification.
func verifyKzgCommitmentsInclusionProof(bodyRoot []byte, kzgCommitments [][]byte, inclusionProof [][]byte) error {
if len(bodyRoot) != fieldparams.RootLength {
return ErrBadRootLength
}
leaves := blocks.LeavesFromCommitments(sidecar.KzgCommitments)
leaves := blocks.LeavesFromCommitments(kzgCommitments)
sparse, err := trie.GenerateTrieFromItems(leaves, fieldparams.LogMaxBlobCommitments)
if err != nil {
@@ -142,7 +180,7 @@ func VerifyDataColumnSidecarInclusionProof(sidecar blocks.RODataColumn) error {
return errors.Wrap(err, "hash tree root")
}
verified := trie.VerifyMerkleProof(root, hashTreeRoot[:], kzgPosition, sidecar.KzgCommitmentsInclusionProof)
verified := trie.VerifyMerkleProof(bodyRoot, hashTreeRoot[:], kzgPosition, inclusionProof)
if !verified {
return ErrInvalidInclusionProof
}
@@ -150,6 +188,31 @@ func VerifyDataColumnSidecarInclusionProof(sidecar blocks.RODataColumn) error {
return nil
}
// VerifyDataColumnSidecarInclusionProof verifies if the given KZG commitments included in the given beacon block.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#verify_data_column_sidecar_inclusion_proof
func VerifyDataColumnSidecarInclusionProof(sidecar blocks.RODataColumn) error {
if sidecar.SignedBlockHeader == nil || sidecar.SignedBlockHeader.Header == nil {
return ErrNilBlockHeader
}
return verifyKzgCommitmentsInclusionProof(
sidecar.SignedBlockHeader.Header.BodyRoot,
sidecar.KzgCommitments,
sidecar.KzgCommitmentsInclusionProof,
)
}
// VerifyPartialDataColumnHeaderInclusionProof verifies if the KZG commitments are included in the beacon block.
func VerifyPartialDataColumnHeaderInclusionProof(header *ethpb.PartialDataColumnHeader) error {
if header.SignedBlockHeader == nil || header.SignedBlockHeader.Header == nil {
return ErrNilBlockHeader
}
return verifyKzgCommitmentsInclusionProof(
header.SignedBlockHeader.Header.BodyRoot,
header.KzgCommitments,
header.KzgCommitmentsInclusionProof,
)
}
// ComputeSubnetForDataColumnSidecar computes the subnet for a data column sidecar.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#compute_subnet_for_data_column_sidecar
func ComputeSubnetForDataColumnSidecar(columnIndex uint64) uint64 {

View File

@@ -3,6 +3,7 @@ package peerdas_test
import (
"crypto/rand"
"fmt"
"iter"
"testing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
@@ -72,7 +73,7 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
sidecars := generateRandomSidecars(t, seed, blobCount)
sidecars[0].Column[0] = sidecars[0].Column[0][:len(sidecars[0].Column[0])-1] // Remove one byte to create size mismatch
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
err := peerdas.VerifyDataColumnsCellsKZGProofs(0, blocks.RODataColumnsToCellProofBundles(sidecars))
require.ErrorIs(t, err, peerdas.ErrMismatchLength)
})
@@ -80,14 +81,15 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
sidecars := generateRandomSidecars(t, seed, blobCount)
sidecars[0].Column[0][0]++ // It is OK to overflow
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
err := peerdas.VerifyDataColumnsCellsKZGProofs(0, blocks.RODataColumnsToCellProofBundles(sidecars))
require.ErrorIs(t, err, peerdas.ErrInvalidKZGProof)
})
t.Run("nominal", func(t *testing.T) {
sidecars := generateRandomSidecars(t, seed, blobCount)
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
failedSegments, err := peerdas.BatchVerifyDataColumnsCellsKZGProofs(blobCount, []iter.Seq[blocks.CellProofBundle]{blocks.RODataColumnsToCellProofBundles(sidecars)})
require.NoError(t, err)
require.Equal(t, 0, len(failedSegments))
})
}
@@ -273,7 +275,7 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_SameCommitments_NoBatch(b *testin
for _, sidecar := range sidecars {
sidecars := []blocks.RODataColumn{sidecar}
b.StartTimer()
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
err := peerdas.VerifyDataColumnsCellsKZGProofs(0, blocks.RODataColumnsToCellProofBundles(sidecars))
b.StopTimer()
require.NoError(b, err)
}
@@ -308,7 +310,7 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_DiffCommitments_Batch(b *testing.
}
b.StartTimer()
err := peerdas.VerifyDataColumnsSidecarKZGProofs(allSidecars)
err := peerdas.VerifyDataColumnsCellsKZGProofs(0, blocks.RODataColumnsToCellProofBundles(allSidecars))
b.StopTimer()
require.NoError(b, err)
}
@@ -341,7 +343,7 @@ func BenchmarkVerifyDataColumnSidecarKZGProofs_DiffCommitments_Batch4(b *testing
for _, sidecars := range allSidecars {
b.StartTimer()
err := peerdas.VerifyDataColumnsSidecarKZGProofs(sidecars)
err := peerdas.VerifyDataColumnsCellsKZGProofs(len(allSidecars), blocks.RODataColumnsToCellProofBundles(sidecars))
b.StopTimer()
require.NoError(b, err)
}

View File

@@ -5,6 +5,7 @@ import (
"sync"
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
@@ -339,7 +340,8 @@ func ComputeCellsAndProofsFromFlat(blobs [][]byte, cellProofs [][]byte) ([][]kzg
}
// ComputeCellsAndProofsFromStructured computes the cells and proofs from blobs and cell proofs.
func ComputeCellsAndProofsFromStructured(blobsAndProofs []*pb.BlobAndProofV2) ([][]kzg.Cell, [][]kzg.Proof, error) {
// commitmentCount is required to return the correct sized bitlist even if we see a nil slice of blobsAndProofs.
func ComputeCellsAndProofsFromStructured(commitmentCount uint64, blobsAndProofs []*pb.BlobAndProofV2) (bitfield.Bitlist /* parts included */, [][]kzg.Cell, [][]kzg.Proof, error) {
start := time.Now()
defer func() {
cellsAndProofsFromStructuredComputationTime.Observe(float64(time.Since(start).Milliseconds()))
@@ -347,14 +349,24 @@ func ComputeCellsAndProofsFromStructured(blobsAndProofs []*pb.BlobAndProofV2) ([
var wg errgroup.Group
cellsPerBlob := make([][]kzg.Cell, len(blobsAndProofs))
proofsPerBlob := make([][]kzg.Proof, len(blobsAndProofs))
var blobsPresent int
for _, blobAndProof := range blobsAndProofs {
if blobAndProof != nil {
blobsPresent++
}
}
cellsPerBlob := make([][]kzg.Cell, blobsPresent)
proofsPerBlob := make([][]kzg.Proof, blobsPresent)
included := bitfield.NewBitlist(commitmentCount)
var j int
for i, blobAndProof := range blobsAndProofs {
if blobAndProof == nil {
return nil, nil, ErrNilBlobAndProof
continue
}
included.SetBitAt(uint64(i), true)
compactIndex := j
wg.Go(func() error {
var kzgBlob kzg.Blob
if copy(kzgBlob[:], blobAndProof.Blob) != len(kzgBlob) {
@@ -381,17 +393,18 @@ func ComputeCellsAndProofsFromStructured(blobsAndProofs []*pb.BlobAndProofV2) ([
kzgProofs = append(kzgProofs, kzgProof)
}
cellsPerBlob[i] = cells
proofsPerBlob[i] = kzgProofs
cellsPerBlob[compactIndex] = cells
proofsPerBlob[compactIndex] = kzgProofs
return nil
})
j++
}
if err := wg.Wait(); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
return cellsPerBlob, proofsPerBlob, nil
return included, cellsPerBlob, proofsPerBlob, nil
}
// ReconstructBlobs reconstructs blobs from data column sidecars without computing KZG proofs or creating sidecars.

View File

@@ -479,8 +479,9 @@ func TestComputeCellsAndProofsFromFlat(t *testing.T) {
func TestComputeCellsAndProofsFromStructured(t *testing.T) {
t.Run("nil blob and proof", func(t *testing.T) {
_, _, err := peerdas.ComputeCellsAndProofsFromStructured([]*pb.BlobAndProofV2{nil})
require.ErrorIs(t, err, peerdas.ErrNilBlobAndProof)
included, _, _, err := peerdas.ComputeCellsAndProofsFromStructured(0, []*pb.BlobAndProofV2{nil})
require.NoError(t, err)
require.Equal(t, uint64(0), included.Count())
})
t.Run("nominal", func(t *testing.T) {
@@ -533,7 +534,8 @@ func TestComputeCellsAndProofsFromStructured(t *testing.T) {
require.NoError(t, err)
// Test ComputeCellsAndProofs
actualCellsPerBlob, actualProofsPerBlob, err := peerdas.ComputeCellsAndProofsFromStructured(blobsAndProofs)
included, actualCellsPerBlob, actualProofsPerBlob, err := peerdas.ComputeCellsAndProofsFromStructured(uint64(len(blobsAndProofs)), blobsAndProofs)
require.Equal(t, included.Count(), uint64(len(actualCellsPerBlob)))
require.NoError(t, err)
require.Equal(t, blobCount, len(actualCellsPerBlob))

View File

@@ -3,6 +3,7 @@ package peerdas
import (
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
beaconState "github.com/OffchainLabs/prysm/v7/beacon-chain/state"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
@@ -143,6 +144,40 @@ func DataColumnSidecars(cellsPerBlob [][]kzg.Cell, proofsPerBlob [][]kzg.Proof,
return roSidecars, nil
}
func PartialColumns(included bitfield.Bitlist, cellsPerBlob [][]kzg.Cell, proofsPerBlob [][]kzg.Proof, src ConstructionPopulator) ([]blocks.PartialDataColumn, error) {
start := time.Now()
const numberOfColumns = uint64(fieldparams.NumberOfColumns)
cells, proofs, err := rotateRowsToCols(cellsPerBlob, proofsPerBlob, numberOfColumns)
if err != nil {
return nil, errors.Wrap(err, "rotate cells and proofs")
}
info, err := src.extract()
if err != nil {
return nil, errors.Wrap(err, "extract block info")
}
dataColumns := make([]blocks.PartialDataColumn, 0, numberOfColumns)
for idx := range numberOfColumns {
dc, err := blocks.NewPartialDataColumn(info.signedBlockHeader, idx, info.kzgCommitments, info.kzgInclusionProof)
if err != nil {
return nil, errors.Wrap(err, "new ro data column")
}
for i := range len(info.kzgCommitments) {
if !included.BitAt(uint64(i)) {
continue
}
dc.ExtendFromVerfifiedCell(uint64(i), cells[idx][0], proofs[idx][0])
cells[idx] = cells[idx][1:]
proofs[idx] = proofs[idx][1:]
}
dataColumns = append(dataColumns, dc)
}
dataColumnComputationTime.Observe(float64(time.Since(start).Milliseconds()))
return dataColumns, nil
}
// Slot returns the slot of the source
func (s *BlockReconstructionSource) Slot() primitives.Slot {
return s.Block().Slot()

View File

@@ -7,6 +7,7 @@ go_library(
"cache.go",
"data_column.go",
"data_column_cache.go",
"doc.go",
"iteration.go",
"layout.go",
"layout_by_epoch.go",
@@ -14,8 +15,6 @@ go_library(
"log.go",
"metrics.go",
"mock.go",
"proof.go",
"proof_cache.go",
"pruner.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/beacon-chain/db/filesystem",
@@ -31,7 +30,6 @@ go_library(
"//consensus-types/primitives:go_default_library",
"//encoding/bytesutil:go_default_library",
"//io/file:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/logging:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
@@ -54,7 +52,6 @@ go_test(
"iteration_test.go",
"layout_test.go",
"migration_test.go",
"proof_test.go",
"pruner_test.go",
],
embed = [":go_default_library"],

View File

@@ -0,0 +1,104 @@
package filesystem
// nolint:dupword
/*
Data column sidecars storage documentation
==========================================
File organisation
-----------------
- The first byte represents the version of the file structure (up to 0xff = 255).
We set it to 0x01.
Note: This is not strictly needed, but it will help a lot if, in the future,
we want to modify the file structure.
- The next 4 bytes represents the size of a SSZ encoded data column sidecar.
(See the `Computation of the maximum size of a DataColumnSidecar` section to a description
of how this value is computed).
- The next 128 bytes represent the index in the file of a given column.
The first bit of each byte in the index is set to 0 if there is no data column,
and set to 1 if there is a data column.
The remaining 7 bits (from 0 to 127) represent the index of the data column.
This sentinel bit is needed to distinguish between the column with index 0 and no column.
Example: If the column with index 5 is in the 3th position in the file, then indices[5] = 0x80 + 0x03 = 0x83.
- The rest of the file is a repeat of the SSZ encoded data column sidecars.
|------------------------------------------|------------------------------------------------------------------------------------|
| Byte offset | Description |
|------------------------------------------|------------------------------------------------------------------------------------|
| 0 | version (1 byte) | sszEncodedDataColumnSidecarSize (4 bytes) | indices (128 bytes) |
|133 + 0*sszEncodedDataColumnSidecarSize | sszEncodedDataColumnSidecar (sszEncodedDataColumnSidecarSize bytes) |
|133 + 1*sszEncodedDataColumnSidecarSize | sszEncodedDataColumnSidecar (sszEncodedDataColumnSidecarSize bytes) |
|133 + 2*sszEncodedDataColumnSidecarSize | sszEncodedDataColumnSidecar (sszEncodedDataColumnSidecarSize bytes) |
| ... | ... |
|133 + 127*sszEncodedDataColumnSidecarSize | sszEncodedDataColumnSidecar (sszEncodedDataColumnSidecarSize bytes) |
|------------------------------------------|------------------------------------------------------------------------------------|
Each file is named after the block root where the data columns were data columns are committed to.
Example: `0x259c6d2f6a0bb75e2405cea7cb248e5663dc26b9404fd3bcd777afc20de91c1e.sszs`
Database organisation
---------------------
SSZ encoded data column sidecars are stored following the `by-epoch` layout.
- The first layer is a directory corresponding to the `period`, which corresponds to the epoch divided by the 4096.
- The second layer is a directory corresponding to the epoch.
- Then all files are stored in the epoch directory.
Example:
data-columns
├── 0
│   ├── 3638
│   │   ├── 0x259c6d2f6a0bb75e2405cea7cb248e5663dc26b9404fd3bcd777afc20de91c1e.sszs
│   │   ├── 0x2a855b1f6e9a2f04f8383e336325bf7d5ba02d1eab3ef90ef183736f8c768533.sszs
│   │   ├── ...
│   │   ├── 0xeb78e2b2350a71c640f1e96fea9e42f38e65705ab7e6e100c8bc9c589f2c5f2b.sszs
│   │   └── 0xeb7ee68da988fd20d773d45aad01dd62527734367a146e2b048715bd68a4e370.sszs
│   └── 3639
│      ├── 0x0fd231fe95e57936fa44f6c712c490b9e337a481b661dfd46768901e90444330.sszs
│      ├── 0x1bf5edff6b6ba2b65b1db325ff3312bbb57da461ef2ae651bd741af851aada3a.sszs
│      ├── ...
│      ├── 0xa156a527e631f858fee79fab7ef1fde3f6117a2e1201d47c09fbab0c6780c937.sszs
│      └── 0xcd80bc535ddc467dea1d19e0c39c1160875ccd1989061bcd8ce206e3c1261c87.sszs
└── 1
├── 4096
│   ├── 0x0d244009093e2bedb72eb265280290199e8c7bf1d90d7583c41af40d9f662269.sszs
│   ├── 0x11f420928d8de41c50e735caab0369996824a5299c5f054e097965855925697d.sszs
│   ├── ...
│   ├── 0xbe91fc782877ed400d95c02c61aebfdd592635d11f8e64c94b46abd84f45c967.sszs
│   └── 0xf246189f078f02d30173ff74605cf31c9e65b5e463275ebdbeb40476638135ff.sszs
└── 4097
   ├── 0x454d000674793c479e90504c0fe9827b50bb176ae022dab4e37d6a21471ab570.sszs
   ├── 0xac5eb7437d7190c48cfa863e3c45f96a7f8af371d47ac12ccda07129a06af763.sszs
   ├── ...
   ├── 0xb7df30561d9d92ab5fafdd96bca8b44526497c8debf0fc425c7a0770b2abeb83.sszs
   └── 0xc1dd0b1ae847b6ec62303a36d08c6a4a2e9e3ec4be3ff70551972a0ee3de9c14.sszs
Computation of the maximum size of a DataColumnSidecar
------------------------------------------------------
https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#datacolumnsidecar
class DataColumnSidecar(Container):
index: ColumnIndex # Index of column in extended matrix
column: List[Cell, MAX_BLOB_COMMITMENTS_PER_BLOCK]
kzg_commitments: List[KZGCommitment, MAX_BLOB_COMMITMENTS_PER_BLOCK]
kzg_proofs: List[KZGProof, MAX_BLOB_COMMITMENTS_PER_BLOCK]
signed_block_header: SignedBeaconBlockHeader
kzg_commitments_inclusion_proof: Vector[Bytes32, KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH]
- index: 2 bytes (ColumnIndex)
- `column`: 4,096 (MAX_BLOB_COMMITMENTS_PER_BLOCK) * 64 (FIELD_ELEMENTS_PER_CELL) * 32 bytes (BYTES_PER_FIELD_ELEMENT) = 8,388,608 bytes
- kzg_commitments: 4,096 (MAX_BLOB_COMMITMENTS_PER_BLOCK) * 48 bytes (KZGCommitment) = 196,608 bytes
- kzg_proofs: 4,096 (MAX_BLOB_COMMITMENTS_PER_BLOCK) * 48 bytes (KZGProof) = 196,608 bytes
- signed_block_header: 2 bytes (Slot) + 2 bytes (ValidatorIndex) + 3 * 2 bytes (Root) + 96 bytes (BLSSignature) = 106 bytes
- kzg_commitments_inclusion_proof: 4 (KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH) * 32 bytes = 128 bytes
TOTAL: 8,782,060 bytes = 70,256,480 bits
log(70,256,480) / log(2) ~= 26.07
==> 32 bits (4 bytes) are enough to store the maximum size of a data column sidecar.
The maximum size of an SSZ encoded data column can be 2**32 bits = 536,879,912 bytes,
which left a room of 536,879,912 bytes - 8,782,060 bytes ~= 503 mega bytes to store the extra data needed by SSZ encoding (which is more than enough.)
*/

View File

@@ -1,197 +0,0 @@
# Filesystem storage documentation
This document describes the file formats and database organization for storing data column sidecars and execution proofs.
---
# Data column sidecars
## File organisation
- The first byte represents the version of the file structure (up to `0xff = 255`).
We set it to `0x01`.
_(Note: This is not strictly needed, but it will help a lot if, in the future, we want to modify the file structure.)_
- The next 4 bytes represents the size of a SSZ encoded data column sidecar.
(See the [Computation of the maximum size of a DataColumnSidecar](#computation-of-the-maximum-size-of-a-datacolumnsidecar) section for a description
of how this value is computed).
- The next 128 bytes represent the index in the file of a given column.
The first bit of each byte in the index is set to 0 if there is no data column,
and set to 1 if there is a data column.
The remaining 7 bits (from 0 to 127) represent the index of the data column.
This sentinel bit is needed to distinguish between the column with index 0 and no column.
**Example:** If the column with index 5 is in the 3rd position in the file, then `indices[5] = 0x80 + 0x03 = 0x83`.
- The rest of the file is a repeat of the SSZ encoded data column sidecars.
### File layout
| Byte offset | Description |
|-------------|-------------|
| `0` | `version (1 byte) \| sszEncodedDataColumnSidecarSize (4 bytes) \| indices (128 bytes)` |
| `133 + 0×sszEncodedDataColumnSidecarSize` | `sszEncodedDataColumnSidecar (sszEncodedDataColumnSidecarSize bytes)` |
| `133 + 1×sszEncodedDataColumnSidecarSize` | `sszEncodedDataColumnSidecar (sszEncodedDataColumnSidecarSize bytes)` |
| `133 + 2×sszEncodedDataColumnSidecarSize` | `sszEncodedDataColumnSidecar (sszEncodedDataColumnSidecarSize bytes)` |
| ... | ... |
| `133 + 127×sszEncodedDataColumnSidecarSize` | `sszEncodedDataColumnSidecar (sszEncodedDataColumnSidecarSize bytes)` |
Each file is named after the block root where the data columns are committed to.
**Example:** `0x259c6d2f6a0bb75e2405cea7cb248e5663dc26b9404fd3bcd777afc20de91c1e.sszs`
## Database organisation
SSZ encoded data column sidecars are stored following the `by-epoch` layout.
- The first layer is a directory corresponding to the `period`, which corresponds to the epoch divided by 4096.
- The second layer is a directory corresponding to the epoch.
- Then all files are stored in the epoch directory.
### Example directory structure
```
data-columns
├── 0
│ ├── 3638
│ │ ├── 0x259c6d2f6a0bb75e2405cea7cb248e5663dc26b9404fd3bcd777afc20de91c1e.sszs
│ │ ├── 0x2a855b1f6e9a2f04f8383e336325bf7d5ba02d1eab3ef90ef183736f8c768533.sszs
│ │ ├── ...
│ │ ├── 0xeb78e2b2350a71c640f1e96fea9e42f38e65705ab7e6e100c8bc9c589f2c5f2b.sszs
│ │ └── 0xeb7ee68da988fd20d773d45aad01dd62527734367a146e2b048715bd68a4e370.sszs
│ └── 3639
│ ├── 0x0fd231fe95e57936fa44f6c712c490b9e337a481b661dfd46768901e90444330.sszs
│ ├── 0x1bf5edff6b6ba2b65b1db325ff3312bbb57da461ef2ae651bd741af851aada3a.sszs
│ ├── ...
│ ├── 0xa156a527e631f858fee79fab7ef1fde3f6117a2e1201d47c09fbab0c6780c937.sszs
│ └── 0xcd80bc535ddc467dea1d19e0c39c1160875ccd1989061bcd8ce206e3c1261c87.sszs
└── 1
├── 4096
│ ├── 0x0d244009093e2bedb72eb265280290199e8c7bf1d90d7583c41af40d9f662269.sszs
│ ├── 0x11f420928d8de41c50e735caab0369996824a5299c5f054e097965855925697d.sszs
│ ├── ...
│ ├── 0xbe91fc782877ed400d95c02c61aebfdd592635d11f8e64c94b46abd84f45c967.sszs
│ └── 0xf246189f078f02d30173ff74605cf31c9e65b5e463275ebdbeb40476638135ff.sszs
└── 4097
├── 0x454d000674793c479e90504c0fe9827b50bb176ae022dab4e37d6a21471ab570.sszs
├── 0xac5eb7437d7190c48cfa863e3c45f96a7f8af371d47ac12ccda07129a06af763.sszs
├── ...
├── 0xb7df30561d9d92ab5fafdd96bca8b44526497c8debf0fc425c7a0770b2abeb83.sszs
└── 0xc1dd0b1ae847b6ec62303a36d08c6a4a2e9e3ec4be3ff70551972a0ee3de9c14.sszs
```
## Computation of the maximum size of a `DataColumnSidecar`
Reference: [Ethereum Consensus Specs - Fulu DAS Core](https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#datacolumnsidecar)
```python
class DataColumnSidecar(Container):
index: ColumnIndex # Index of column in extended matrix
column: List[Cell, MAX_BLOB_COMMITMENTS_PER_BLOCK]
kzg_commitments: List[KZGCommitment, MAX_BLOB_COMMITMENTS_PER_BLOCK]
kzg_proofs: List[KZGProof, MAX_BLOB_COMMITMENTS_PER_BLOCK]
signed_block_header: SignedBeaconBlockHeader
kzg_commitments_inclusion_proof: Vector[Bytes32, KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH]
```
### Size breakdown
| Field | Calculation | Size |
|-------|-------------|------|
| `index` | `ColumnIndex` | `2 bytes` |
| `column` | `4,096 (MAX_BLOB_COMMITMENTS_PER_BLOCK) × 64 (FIELD_ELEMENTS_PER_CELL) × 32 bytes (BYTES_PER_FIELD_ELEMENT)` | `8,388,608 bytes` |
| `kzg_commitments` | `4,096 (MAX_BLOB_COMMITMENTS_PER_BLOCK) × 48 bytes (KZGCommitment)` | `196,608 bytes` |
| `kzg_proofs` | `4,096 (MAX_BLOB_COMMITMENTS_PER_BLOCK) × 48 bytes (KZGProof)` | `196,608 bytes` |
| `signed_block_header` | `2 bytes (Slot) + 2 bytes (ValidatorIndex) + 3 × 2 bytes (Root) + 96 bytes (BLSSignature)` | `106 bytes` |
| `kzg_commitments_inclusion_proof` | `4 (KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH) × 32 bytes` | `128 bytes` |
**TOTAL:** `8,782,060 bytes = 70,256,480 bits`
```
log(70,256,480) / log(2) ≈ 26.07
```
**Conclusion:** 32 bits (4 bytes) are enough to store the maximum size of a data column sidecar.
The maximum size of an SSZ encoded data column can be `2³² bits = 536,879,912 bytes`,
which leaves a room of `536,879,912 bytes - 8,782,060 bytes ≈ 503 megabytes` to store the extra data needed by SSZ encoding (which is more than enough).
---
# Execution proofs
## File organisation
Unlike data column sidecars (which have a fixed size per block), execution proofs have variable sizes.
To handle this, we use an offset table that stores the position and size of each proof.
- The first byte represents the version of the file structure (up to `0xff = 255`).
We set it to `0x01`.
- The next 64 bytes represent the offset table with 8 slots (one per proof type).
Each slot contains:
- 4 bytes for the offset (relative to end of header)
- 4 bytes for the size of the SSZ-encoded proof
If the size is 0, the proof is not present.
- The rest of the file contains the SSZ encoded proofs, stored contiguously.
### File layout
| Byte offset | Description |
|-------------|-------------|
| `0` | `version (1 byte) \| offsetTable (64 bytes)` |
| `65 + offsetTable[0].offset` | `sszEncodedProof (offsetTable[0].size bytes)` |
| `65 + offsetTable[1].offset` | `sszEncodedProof (offsetTable[1].size bytes)` |
| ... | ... |
| `65 + offsetTable[7].offset` | `sszEncodedProof (offsetTable[7].size bytes)` |
**Header size:** 1 (version) + 64 (offset table) = **65 bytes**
### Offset table entry format
Each slot in the offset table (8 bytes per slot):
- `offset` (4 bytes, big-endian): Offset from end of header where proof data begins
- `size` (4 bytes, big-endian): Size of the SSZ-encoded proof in bytes
**Note:** Offsets are relative to the end of the header (byte 65), not the start of the file.
This maximizes the usable range of the 4-byte offset field.
### Reading a proof with `proofID=N (O(1) access)`
1. Read header (65 bytes)
2. Check slot N: if `size == 0`, proof not present
3. Seek to `(65 + offset)`, read `size` bytes, SSZ unmarshal
Each file is named after the block root.
**Example:** `0x259c6d2f6a0bb75e2405cea7cb248e5663dc26b9404fd3bcd777afc20de91c1e.sszs`
## Database Organisation
SSZ encoded execution proofs are stored following the same `by-epoch` layout as data column sidecars.
- The first layer is a directory corresponding to the `period`, which corresponds to the epoch divided by 4096.
- The second layer is a directory corresponding to the epoch.
- Then all files are stored in the epoch directory.
### Example Directory Structure
```
proofs
├── 0
│ ├── 100
│ │ ├── 0x259c6d2f6a0bb75e2405cea7cb248e5663dc26b9404fd3bcd777afc20de91c1e.sszs
│ │ ├── 0x2a855b1f6e9a2f04f8383e336325bf7d5ba02d1eab3ef90ef183736f8c768533.sszs
│ │ └── ...
│ └── 101
│ ├── 0x0fd231fe95e57936fa44f6c712c490b9e337a481b661dfd46768901e90444330.sszs
│ └── ...
└── 1
└── 4096
├── 0x0d244009093e2bedb72eb265280290199e8c7bf1d90d7583c41af40d9f662269.sszs
└── ...
```

View File

@@ -70,36 +70,4 @@ var (
Name: "data_column_prune_latency",
Help: "Latency of data column prune operations in milliseconds",
})
// Proofs
proofSaveLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "proof_storage_save_latency",
Help: "Latency of proof storage save operations in milliseconds",
Buckets: []float64{3, 5, 7, 9, 11, 13, 20, 50},
})
proofFetchLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "proof_storage_get_latency",
Help: "Latency of proof storage get operations in milliseconds",
Buckets: []float64{3, 5, 7, 9, 11, 13},
})
proofPrunedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "proof_pruned",
Help: "Number of proof files pruned.",
})
proofWrittenCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "proof_written",
Help: "Number of proof files written",
})
proofDiskCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "proof_disk_count",
Help: "Approximate number of proof files in storage",
})
proofFileSyncLatency = promauto.NewSummary(prometheus.SummaryOpts{
Name: "proof_file_sync_latency",
Help: "Latency of sync operations when saving proofs in milliseconds",
})
proofPruneLatency = promauto.NewSummary(prometheus.SummaryOpts{
Name: "proof_prune_latency",
Help: "Latency of proof prune operations in milliseconds",
})
)

View File

@@ -144,45 +144,3 @@ func NewEphemeralDataColumnStorageWithMocker(t testing.TB) (*DataColumnMocker, *
fs, dcs := NewEphemeralDataColumnStorageAndFs(t)
return &DataColumnMocker{fs: fs, dcs: dcs}, dcs
}
// Proofs
// ------
// NewEphemeralProofStorage should only be used for tests.
// The instance of ProofStorage returned is backed by an in-memory virtual filesystem,
// improving test performance and simplifying cleanup.
func NewEphemeralProofStorage(t testing.TB, opts ...ProofStorageOption) *ProofStorage {
return NewWarmedEphemeralProofStorageUsingFs(t, afero.NewMemMapFs(), opts...)
}
// NewEphemeralProofStorageAndFs can be used by tests that want access to the virtual filesystem
// in order to interact with it outside the parameters of the ProofStorage API.
func NewEphemeralProofStorageAndFs(t testing.TB, opts ...ProofStorageOption) (afero.Fs, *ProofStorage) {
fs := afero.NewMemMapFs()
ps := NewWarmedEphemeralProofStorageUsingFs(t, fs, opts...)
return fs, ps
}
// NewEphemeralProofStorageUsingFs creates a ProofStorage backed by the provided filesystem.
func NewEphemeralProofStorageUsingFs(t testing.TB, fs afero.Fs, opts ...ProofStorageOption) *ProofStorage {
defaultOpts := []ProofStorageOption{
WithProofRetentionEpochs(params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest),
WithProofFs(fs),
}
// User opts come last so they can override defaults
allOpts := append(defaultOpts, opts...)
ps, err := NewProofStorage(context.Background(), allOpts...)
if err != nil {
t.Fatal(err)
}
return ps
}
// NewWarmedEphemeralProofStorageUsingFs creates a ProofStorage with a warmed cache.
func NewWarmedEphemeralProofStorageUsingFs(t testing.TB, fs afero.Fs, opts ...ProofStorageOption) *ProofStorage {
ps := NewEphemeralProofStorageUsingFs(t, fs, opts...)
ps.WarmCache()
return ps
}

View File

@@ -1,964 +0,0 @@
package filesystem
import (
"context"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"slices"
"strconv"
"strings"
"sync"
"time"
"github.com/OffchainLabs/prysm/v7/async"
"github.com/OffchainLabs/prysm/v7/async/event"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/io/file"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/spf13/afero"
)
const (
proofVersion = 0x01
proofVersionSize = 1 // bytes
maxProofTypes = 8 // ExecutionProofId max value (EXECUTION_PROOF_TYPE_COUNT)
proofOffsetSize = 4 // bytes for offset (uint32)
proofSizeSize = 4 // bytes for size (uint32)
proofSlotSize = proofOffsetSize + proofSizeSize // 8 bytes per slot
proofOffsetTableSize = maxProofTypes * proofSlotSize // 64 bytes
proofHeaderSize = proofVersionSize + proofOffsetTableSize // 65 bytes
proofsFileExtension = "sszs"
proofPrunePeriod = 1 * time.Minute
)
var (
errProofIDTooLarge = errors.New("proof ID too large")
errWrongProofBytesWritten = errors.New("wrong number of bytes written")
errWrongProofVersion = errors.New("wrong proof version")
errWrongProofBytesRead = errors.New("wrong number of bytes read")
errNoProofBasePath = errors.New("ProofStorage base path not specified in init")
errProofAlreadyExists = errors.New("proof already exists")
)
type (
// ProofIdent is a unique identifier for a proof.
ProofIdent struct {
BlockRoot [fieldparams.RootLength]byte
Epoch primitives.Epoch
ProofID uint64
}
// ProofsIdent is a collection of unique identifiers for proofs.
ProofsIdent struct {
BlockRoot [fieldparams.RootLength]byte
Epoch primitives.Epoch
ProofIDs []uint64
}
// ProofStorage is the concrete implementation of the filesystem backend for saving and retrieving ExecutionProofs.
ProofStorage struct {
base string
retentionEpochs primitives.Epoch
fs afero.Fs
cache *proofCache
proofFeed *event.Feed
pruneMu sync.RWMutex
mu sync.Mutex // protects muChans
muChans map[[fieldparams.RootLength]byte]*proofMuChan
}
// ProofStorageOption is a functional option for configuring a ProofStorage.
ProofStorageOption func(*ProofStorage) error
proofMuChan struct {
mu *sync.RWMutex
toStore chan []*ethpb.ExecutionProof
}
// proofSlotEntry represents the offset and size for a proof in the file.
proofSlotEntry struct {
offset uint32
size uint32
}
// proofOffsetTable is the offset table with 8 slots indexed by proofID.
proofOffsetTable [maxProofTypes]proofSlotEntry
// proofFileMetadata contains metadata extracted from a proof file path.
proofFileMetadata struct {
period uint64
epoch primitives.Epoch
blockRoot [fieldparams.RootLength]byte
}
)
// WithProofBasePath is a required option that sets the base path of proof storage.
func WithProofBasePath(base string) ProofStorageOption {
return func(ps *ProofStorage) error {
ps.base = base
return nil
}
}
// WithProofRetentionEpochs is an option that changes the number of epochs proofs will be persisted.
func WithProofRetentionEpochs(e primitives.Epoch) ProofStorageOption {
return func(ps *ProofStorage) error {
ps.retentionEpochs = e
return nil
}
}
// WithProofFs allows the afero.Fs implementation to be customized.
// Used by tests to substitute an in-memory filesystem.
func WithProofFs(fs afero.Fs) ProofStorageOption {
return func(ps *ProofStorage) error {
ps.fs = fs
return nil
}
}
// NewProofStorage creates a new instance of the ProofStorage object.
func NewProofStorage(ctx context.Context, opts ...ProofStorageOption) (*ProofStorage, error) {
storage := &ProofStorage{
proofFeed: new(event.Feed),
muChans: make(map[[fieldparams.RootLength]byte]*proofMuChan),
}
for _, o := range opts {
if err := o(storage); err != nil {
return nil, fmt.Errorf("failed to create proof storage: %w", err)
}
}
// Allow tests to set up a different fs using WithProofFs.
if storage.fs == nil {
if storage.base == "" {
return nil, errNoProofBasePath
}
storage.base = path.Clean(storage.base)
if err := file.MkdirAll(storage.base); err != nil {
return nil, fmt.Errorf("failed to create proof storage at %s: %w", storage.base, err)
}
storage.fs = afero.NewBasePathFs(afero.NewOsFs(), storage.base)
}
storage.cache = newProofCache()
async.RunEvery(ctx, proofPrunePeriod, func() {
storage.pruneMu.Lock()
defer storage.pruneMu.Unlock()
storage.prune()
})
return storage, nil
}
// WarmCache warms the cache of the proof filesystem.
func (ps *ProofStorage) WarmCache() {
start := time.Now()
log.Info("Proof filesystem cache warm-up started")
ps.pruneMu.Lock()
defer ps.pruneMu.Unlock()
// List all period directories
periodFileInfos, err := afero.ReadDir(ps.fs, ".")
if err != nil {
log.WithError(err).Error("Error reading top directory during proof warm cache")
return
}
// Iterate through periods
for _, periodFileInfo := range periodFileInfos {
if !periodFileInfo.IsDir() {
continue
}
periodPath := periodFileInfo.Name()
// List all epoch directories in this period
epochFileInfos, err := afero.ReadDir(ps.fs, periodPath)
if err != nil {
log.WithError(err).WithField("period", periodPath).Error("Error reading period directory during proof warm cache")
continue
}
// Iterate through epochs
for _, epochFileInfo := range epochFileInfos {
if !epochFileInfo.IsDir() {
continue
}
epochPath := path.Join(periodPath, epochFileInfo.Name())
// List all .sszs files in this epoch
files, err := ps.listProofEpochFiles(epochPath)
if err != nil {
log.WithError(err).WithField("epoch", epochPath).Error("Error listing epoch files during proof warm cache")
continue
}
// Process all files in this epoch in parallel
ps.processProofEpochFiles(files)
}
}
// Prune the cache and the filesystem
ps.prune()
totalElapsed := time.Since(start)
log.WithField("elapsed", totalElapsed).Info("Proof filesystem cache warm-up complete")
}
// listProofEpochFiles lists all .sszs files in an epoch directory.
func (ps *ProofStorage) listProofEpochFiles(epochPath string) ([]string, error) {
fileInfos, err := afero.ReadDir(ps.fs, epochPath)
if err != nil {
return nil, fmt.Errorf("read epoch directory: %w", err)
}
files := make([]string, 0, len(fileInfos))
for _, fileInfo := range fileInfos {
if fileInfo.IsDir() {
continue
}
fileName := fileInfo.Name()
if strings.HasSuffix(fileName, "."+proofsFileExtension) {
files = append(files, path.Join(epochPath, fileName))
}
}
return files, nil
}
// processProofEpochFiles processes all proof files in an epoch in parallel.
func (ps *ProofStorage) processProofEpochFiles(files []string) {
var wg sync.WaitGroup
for _, filePath := range files {
wg.Go(func() {
if err := ps.processProofFile(filePath); err != nil {
log.WithError(err).WithField("file", filePath).Error("Error processing proof file during warm cache")
}
})
}
wg.Wait()
}
// processProofFile processes a single .sszs proof file for cache warming.
func (ps *ProofStorage) processProofFile(filePath string) error {
// Extract metadata from the file path
fileMetadata, err := extractProofFileMetadata(filePath)
if err != nil {
return fmt.Errorf("extract proof file metadata: %w", err)
}
// Open the file
f, err := ps.fs.Open(filePath)
if err != nil {
return fmt.Errorf("open file: %w", err)
}
defer func() {
if closeErr := f.Close(); closeErr != nil {
log.WithError(closeErr).WithField("file", filePath).Error("Error closing file during proof warm cache")
}
}()
// Read the offset table
offsetTable, _, err := ps.readHeader(f)
if err != nil {
return fmt.Errorf("read header: %w", err)
}
// Add all present proofs to the cache
for proofID, entry := range offsetTable {
if entry.size == 0 {
continue
}
proofIdent := ProofIdent{
BlockRoot: fileMetadata.blockRoot,
Epoch: fileMetadata.epoch,
ProofID: uint64(proofID),
}
ps.cache.set(proofIdent)
}
return nil
}
// Summary returns the ProofStorageSummary for a given root.
func (ps *ProofStorage) Summary(root [fieldparams.RootLength]byte) ProofStorageSummary {
return ps.cache.Summary(root)
}
// Save saves execution proofs into the database.
func (ps *ProofStorage) Save(proofs []*ethpb.ExecutionProof) error {
startTime := time.Now()
if len(proofs) == 0 {
return nil
}
proofsByRoot := make(map[[fieldparams.RootLength]byte][]*ethpb.ExecutionProof)
// Group proofs by root.
for _, proof := range proofs {
// Check if the proof ID is valid.
proofID := uint64(proof.ProofId)
if proofID >= maxProofTypes {
return errProofIDTooLarge
}
// Extract block root from proof.
var blockRoot [fieldparams.RootLength]byte
copy(blockRoot[:], proof.BlockRoot)
// Group proofs by root.
proofsByRoot[blockRoot] = append(proofsByRoot[blockRoot], proof)
}
for blockRoot, proofsForRoot := range proofsByRoot {
// Compute epoch from slot.
epoch := slots.ToEpoch(proofsForRoot[0].Slot)
// Save proofs in the filesystem.
if err := ps.saveFilesystem(blockRoot, epoch, proofsForRoot); err != nil {
return fmt.Errorf("save filesystem: %w", err)
}
// Get all proof IDs.
proofIDs := make([]uint64, 0, len(proofsForRoot))
for _, proof := range proofsForRoot {
proofIDs = append(proofIDs, uint64(proof.ProofId))
}
// Compute the proofs ident.
proofsIdent := ProofsIdent{BlockRoot: blockRoot, Epoch: epoch, ProofIDs: proofIDs}
// Set proofs in the cache.
ps.cache.setMultiple(proofsIdent)
// Notify the proof feed.
ps.proofFeed.Send(proofsIdent)
}
proofSaveLatency.Observe(float64(time.Since(startTime).Milliseconds()))
return nil
}
// saveFilesystem saves proofs into the database.
// This function expects all proofs to belong to the same block.
func (ps *ProofStorage) saveFilesystem(root [fieldparams.RootLength]byte, epoch primitives.Epoch, proofs []*ethpb.ExecutionProof) error {
// Compute the file path.
filePath := proofFilePath(root, epoch)
ps.pruneMu.RLock()
defer ps.pruneMu.RUnlock()
fileMu, toStore := ps.fileMutexChan(root)
toStore <- proofs
fileMu.Lock()
defer fileMu.Unlock()
// Check if the file exists.
exists, err := afero.Exists(ps.fs, filePath)
if err != nil {
return fmt.Errorf("afero exists: %w", err)
}
if exists {
if err := ps.saveProofExistingFile(filePath, toStore); err != nil {
return fmt.Errorf("save proof existing file: %w", err)
}
return nil
}
if err := ps.saveProofNewFile(filePath, toStore); err != nil {
return fmt.Errorf("save proof new file: %w", err)
}
return nil
}
// Subscribe subscribes to the proof feed.
// It returns the subscription and a 1-size buffer channel to receive proof idents.
func (ps *ProofStorage) Subscribe() (event.Subscription, <-chan ProofsIdent) {
identsChan := make(chan ProofsIdent, 1)
subscription := ps.proofFeed.Subscribe(identsChan)
return subscription, identsChan
}
// Get retrieves execution proofs from the database.
// If one of the requested proofs is not found, it is just skipped.
// If proofIDs is nil, then all stored proofs are returned.
func (ps *ProofStorage) Get(root [fieldparams.RootLength]byte, proofIDs []uint64) ([]*ethpb.ExecutionProof, error) {
ps.pruneMu.RLock()
defer ps.pruneMu.RUnlock()
fileMu, _ := ps.fileMutexChan(root)
fileMu.RLock()
defer fileMu.RUnlock()
startTime := time.Now()
// Build all proofIDs if none are provided.
if proofIDs == nil {
proofIDs = make([]uint64, maxProofTypes)
for i := range proofIDs {
proofIDs[i] = uint64(i)
}
}
summary, ok := ps.cache.get(root)
if !ok {
// Nothing found in db. Exit early.
return nil, nil
}
// Check if any requested proofID exists.
if !slices.ContainsFunc(proofIDs, summary.HasProof) {
return nil, nil
}
// Compute the file path.
filePath := proofFilePath(root, summary.epoch)
// Open the proof file.
file, err := ps.fs.Open(filePath)
if err != nil {
return nil, fmt.Errorf("proof file open: %w", err)
}
defer func() {
if closeErr := file.Close(); closeErr != nil {
log.WithError(closeErr).WithField("file", filePath).Error("Error closing proof file")
}
}()
// Read the header.
offsetTable, _, err := ps.readHeader(file)
if err != nil {
return nil, fmt.Errorf("read header: %w", err)
}
// Retrieve proofs from the file.
proofs := make([]*ethpb.ExecutionProof, 0, len(proofIDs))
for _, proofID := range proofIDs {
if proofID >= maxProofTypes {
continue
}
entry := offsetTable[proofID]
// Skip if the proof is not saved.
if entry.size == 0 {
continue
}
// Seek to the proof offset (offset is relative to end of header).
_, err = file.Seek(proofHeaderSize+int64(entry.offset), io.SeekStart)
if err != nil {
return nil, fmt.Errorf("seek: %w", err)
}
// Read the SSZ encoded proof.
sszProof := make([]byte, entry.size)
n, err := io.ReadFull(file, sszProof)
if err != nil {
return nil, fmt.Errorf("read proof: %w", err)
}
if n != int(entry.size) {
return nil, errWrongProofBytesRead
}
// Unmarshal the proof.
proof := new(ethpb.ExecutionProof)
if err := proof.UnmarshalSSZ(sszProof); err != nil {
return nil, fmt.Errorf("unmarshal proof: %w", err)
}
proofs = append(proofs, proof)
}
proofFetchLatency.Observe(float64(time.Since(startTime).Milliseconds()))
return proofs, nil
}
// Remove deletes all proofs for a given root.
func (ps *ProofStorage) Remove(blockRoot [fieldparams.RootLength]byte) error {
ps.pruneMu.RLock()
defer ps.pruneMu.RUnlock()
fileMu, _ := ps.fileMutexChan(blockRoot)
fileMu.Lock()
defer fileMu.Unlock()
summary, ok := ps.cache.get(blockRoot)
if !ok {
// Nothing found in db. Exit early.
return nil
}
// Remove the proofs from the cache.
ps.cache.evict(blockRoot)
// Remove the proof file.
filePath := proofFilePath(blockRoot, summary.epoch)
if err := ps.fs.Remove(filePath); err != nil {
return fmt.Errorf("remove: %w", err)
}
return nil
}
// Clear deletes all files on the filesystem.
func (ps *ProofStorage) Clear() error {
ps.pruneMu.Lock()
defer ps.pruneMu.Unlock()
dirs, err := listDir(ps.fs, ".")
if err != nil {
return fmt.Errorf("list dir: %w", err)
}
ps.cache.clear()
for _, dir := range dirs {
if err := ps.fs.RemoveAll(dir); err != nil {
return fmt.Errorf("remove all: %w", err)
}
}
return nil
}
// saveProofNewFile saves proofs to a new file.
func (ps *ProofStorage) saveProofNewFile(filePath string, inputProofs chan []*ethpb.ExecutionProof) (err error) {
// Initialize the offset table.
var offsetTable proofOffsetTable
var sszEncodedProofs []byte
currentOffset := uint32(0)
for {
proofs := pullProofChan(inputProofs)
if len(proofs) == 0 {
break
}
for _, proof := range proofs {
proofID := uint64(proof.ProofId)
if proofID >= maxProofTypes {
continue
}
// Skip if already in offset table (duplicate).
if offsetTable[proofID].size != 0 {
continue
}
// SSZ encode the proof.
sszProof, err := proof.MarshalSSZ()
if err != nil {
return fmt.Errorf("marshal proof SSZ: %w", err)
}
proofSize := uint32(len(sszProof))
// Update offset table.
offsetTable[proofID] = proofSlotEntry{
offset: currentOffset,
size: proofSize,
}
// Append SSZ encoded proof.
sszEncodedProofs = append(sszEncodedProofs, sszProof...)
currentOffset += proofSize
}
}
if len(sszEncodedProofs) == 0 {
// Nothing to save.
return nil
}
// Create directory structure.
dir := filepath.Dir(filePath)
if err := ps.fs.MkdirAll(dir, directoryPermissions()); err != nil {
return fmt.Errorf("mkdir all: %w", err)
}
file, err := ps.fs.Create(filePath)
if err != nil {
return fmt.Errorf("create proof file: %w", err)
}
defer func() {
closeErr := file.Close()
if closeErr != nil && err == nil {
err = closeErr
}
}()
// Build the file content.
countToWrite := proofHeaderSize + len(sszEncodedProofs)
bytes := make([]byte, 0, countToWrite)
// Write version byte.
bytes = append(bytes, byte(proofVersion))
// Write offset table.
bytes = append(bytes, encodeOffsetTable(offsetTable)...)
// Write SSZ encoded proofs.
bytes = append(bytes, sszEncodedProofs...)
countWritten, err := file.Write(bytes)
if err != nil {
return fmt.Errorf("write: %w", err)
}
if countWritten != countToWrite {
return errWrongProofBytesWritten
}
syncStart := time.Now()
if err := file.Sync(); err != nil {
return fmt.Errorf("sync: %w", err)
}
proofFileSyncLatency.Observe(float64(time.Since(syncStart).Milliseconds()))
return nil
}
// saveProofExistingFile saves proofs to an existing file.
func (ps *ProofStorage) saveProofExistingFile(filePath string, inputProofs chan []*ethpb.ExecutionProof) (err error) {
// Open the file for read/write.
file, err := ps.fs.OpenFile(filePath, os.O_RDWR, os.FileMode(0600))
if err != nil {
return fmt.Errorf("open proof file: %w", err)
}
defer func() {
closeErr := file.Close()
if closeErr != nil && err == nil {
err = closeErr
}
}()
// Read current header.
offsetTable, fileSize, err := ps.readHeader(file)
if err != nil {
return fmt.Errorf("read header: %w", err)
}
var sszEncodedProofs []byte
currentOffset := uint32(fileSize - proofHeaderSize)
modified := false
for {
proofs := pullProofChan(inputProofs)
if len(proofs) == 0 {
break
}
for _, proof := range proofs {
proofID := uint64(proof.ProofId)
if proofID >= maxProofTypes {
continue
}
// Skip if proof already exists.
if offsetTable[proofID].size != 0 {
continue
}
// SSZ encode the proof.
sszProof, err := proof.MarshalSSZ()
if err != nil {
return fmt.Errorf("marshal proof SSZ: %w", err)
}
proofSize := uint32(len(sszProof))
// Update offset table.
offsetTable[proofID] = proofSlotEntry{
offset: currentOffset,
size: proofSize,
}
// Append SSZ encoded proof.
sszEncodedProofs = append(sszEncodedProofs, sszProof...)
currentOffset += proofSize
modified = true
}
}
if !modified {
return nil
}
// Write updated offset table back to file (at position 1, after version byte).
encodedTable := encodeOffsetTable(offsetTable)
count, err := file.WriteAt(encodedTable, int64(proofVersionSize))
if err != nil {
return fmt.Errorf("write offset table: %w", err)
}
if count != proofOffsetTableSize {
return errWrongProofBytesWritten
}
// Append the SSZ encoded proofs to the end of the file.
count, err = file.WriteAt(sszEncodedProofs, fileSize)
if err != nil {
return fmt.Errorf("write SSZ encoded proofs: %w", err)
}
if count != len(sszEncodedProofs) {
return errWrongProofBytesWritten
}
syncStart := time.Now()
if err := file.Sync(); err != nil {
return fmt.Errorf("sync: %w", err)
}
proofFileSyncLatency.Observe(float64(time.Since(syncStart).Milliseconds()))
return nil
}
// readHeader reads the file header and returns the offset table and file size.
func (ps *ProofStorage) readHeader(file afero.File) (proofOffsetTable, int64, error) {
var header [proofHeaderSize]byte
countRead, err := file.ReadAt(header[:], 0)
if err != nil {
return proofOffsetTable{}, 0, fmt.Errorf("read at: %w", err)
}
if countRead != proofHeaderSize {
return proofOffsetTable{}, 0, errWrongProofBytesRead
}
// Check version.
fileVersion := int(header[0])
if fileVersion != proofVersion {
return proofOffsetTable{}, 0, errWrongProofVersion
}
// Decode offset table and compute file size.
var offsetTable proofOffsetTable
fileSize := int64(proofHeaderSize)
for i := range offsetTable {
pos := proofVersionSize + i*proofSlotSize
offsetTable[i].offset = binary.BigEndian.Uint32(header[pos : pos+proofOffsetSize])
offsetTable[i].size = binary.BigEndian.Uint32(header[pos+proofOffsetSize : pos+proofSlotSize])
fileSize += int64(offsetTable[i].size)
}
return offsetTable, fileSize, nil
}
// prune cleans the cache, the filesystem and mutexes.
func (ps *ProofStorage) prune() {
startTime := time.Now()
defer func() {
proofPruneLatency.Observe(float64(time.Since(startTime).Milliseconds()))
}()
highestStoredEpoch := ps.cache.HighestEpoch()
// Check if we need to prune.
if highestStoredEpoch < ps.retentionEpochs {
return
}
highestEpochToPrune := highestStoredEpoch - ps.retentionEpochs
highestPeriodToPrune := proofPeriod(highestEpochToPrune)
// Prune the cache.
prunedCount := ps.cache.pruneUpTo(highestEpochToPrune)
if prunedCount == 0 {
return
}
proofPrunedCounter.Add(float64(prunedCount))
// Prune the filesystem.
periodFileInfos, err := afero.ReadDir(ps.fs, ".")
if err != nil {
log.WithError(err).Error("Error encountered while reading top directory during proof prune")
return
}
for _, periodFileInfo := range periodFileInfos {
periodStr := periodFileInfo.Name()
period, err := strconv.ParseUint(periodStr, 10, 64)
if err != nil {
log.WithError(err).Errorf("Error encountered while parsing period %s", periodStr)
continue
}
if period < highestPeriodToPrune {
// Remove everything lower than highest period to prune.
if err := ps.fs.RemoveAll(periodStr); err != nil {
log.WithError(err).Error("Error encountered while removing period directory during proof prune")
}
continue
}
if period > highestPeriodToPrune {
// Do not remove anything higher than highest period to prune.
continue
}
// if period == highestPeriodToPrune
epochFileInfos, err := afero.ReadDir(ps.fs, periodStr)
if err != nil {
log.WithError(err).Error("Error encountered while reading epoch directory during proof prune")
continue
}
for _, epochFileInfo := range epochFileInfos {
epochStr := epochFileInfo.Name()
epochDir := path.Join(periodStr, epochStr)
epoch, err := strconv.ParseUint(epochStr, 10, 64)
if err != nil {
log.WithError(err).Errorf("Error encountered while parsing epoch %s", epochStr)
continue
}
if primitives.Epoch(epoch) > highestEpochToPrune {
continue
}
if err := ps.fs.RemoveAll(epochDir); err != nil {
log.WithError(err).Error("Error encountered while removing epoch directory during proof prune")
continue
}
}
}
ps.mu.Lock()
defer ps.mu.Unlock()
clear(ps.muChans)
}
// fileMutexChan returns the file mutex and channel for a given block root.
func (ps *ProofStorage) fileMutexChan(root [fieldparams.RootLength]byte) (*sync.RWMutex, chan []*ethpb.ExecutionProof) {
ps.mu.Lock()
defer ps.mu.Unlock()
mc, ok := ps.muChans[root]
if !ok {
mc = &proofMuChan{
mu: new(sync.RWMutex),
toStore: make(chan []*ethpb.ExecutionProof, 1),
}
ps.muChans[root] = mc
return mc.mu, mc.toStore
}
return mc.mu, mc.toStore
}
// pullProofChan pulls proofs from the input channel until it is empty.
func pullProofChan(inputProofs chan []*ethpb.ExecutionProof) []*ethpb.ExecutionProof {
proofs := make([]*ethpb.ExecutionProof, 0, maxProofTypes)
for {
select {
case batch := <-inputProofs:
proofs = append(proofs, batch...)
default:
return proofs
}
}
}
// proofFilePath builds the file path in database for a given root and epoch.
func proofFilePath(root [fieldparams.RootLength]byte, epoch primitives.Epoch) string {
return path.Join(
fmt.Sprintf("%d", proofPeriod(epoch)),
fmt.Sprintf("%d", epoch),
fmt.Sprintf("%#x.%s", root, proofsFileExtension),
)
}
// extractProofFileMetadata extracts the metadata from a proof file path.
func extractProofFileMetadata(path string) (*proofFileMetadata, error) {
// Use filepath.Separator to handle both Windows (\) and Unix (/) path separators
parts := strings.Split(path, string(filepath.Separator))
if len(parts) != 3 {
return nil, fmt.Errorf("unexpected proof file %s", path)
}
period, err := strconv.ParseUint(parts[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse period from %s: %w", path, err)
}
epoch, err := strconv.ParseUint(parts[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse epoch from %s: %w", path, err)
}
partsRoot := strings.Split(parts[2], ".")
if len(partsRoot) != 2 {
return nil, fmt.Errorf("failed to parse root from %s", path)
}
blockRootString := partsRoot[0]
if len(blockRootString) != 2+2*fieldparams.RootLength || blockRootString[:2] != "0x" {
return nil, fmt.Errorf("unexpected proof file name %s", path)
}
if partsRoot[1] != proofsFileExtension {
return nil, fmt.Errorf("unexpected extension %s", path)
}
blockRootSlice, err := hex.DecodeString(blockRootString[2:])
if err != nil {
return nil, fmt.Errorf("decode string from %s: %w", path, err)
}
var blockRoot [fieldparams.RootLength]byte
copy(blockRoot[:], blockRootSlice)
result := &proofFileMetadata{period: period, epoch: primitives.Epoch(epoch), blockRoot: blockRoot}
return result, nil
}
// proofPeriod computes the period of a given epoch.
func proofPeriod(epoch primitives.Epoch) uint64 {
return uint64(epoch / params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest)
}
// encodeOffsetTable encodes the offset table to bytes.
func encodeOffsetTable(table proofOffsetTable) []byte {
result := make([]byte, proofOffsetTableSize)
for i, entry := range table {
offset := i * proofSlotSize
binary.BigEndian.PutUint32(result[offset:offset+proofOffsetSize], entry.offset)
binary.BigEndian.PutUint32(result[offset+proofOffsetSize:offset+proofSlotSize], entry.size)
}
return result
}

View File

@@ -1,206 +0,0 @@
package filesystem
import (
"slices"
"sync"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
)
// ProofStorageSummary represents cached information about the proofs on disk for each root the cache knows about.
type ProofStorageSummary struct {
epoch primitives.Epoch
proofIDs map[uint64]bool
}
// HasProof returns true if the proof with the given proofID is available in the filesystem.
func (s ProofStorageSummary) HasProof(proofID uint64) bool {
if s.proofIDs == nil {
return false
}
_, ok := s.proofIDs[proofID]
return ok
}
// Count returns the number of available proofs.
func (s ProofStorageSummary) Count() int {
return len(s.proofIDs)
}
// All returns all stored proofIDs sorted in ascending order.
func (s ProofStorageSummary) All() []uint64 {
if s.proofIDs == nil {
return nil
}
ids := make([]uint64, 0, len(s.proofIDs))
for id := range s.proofIDs {
ids = append(ids, id)
}
slices.Sort(ids)
return ids
}
type proofCache struct {
mu sync.RWMutex
proofCount float64
lowestCachedEpoch primitives.Epoch
highestCachedEpoch primitives.Epoch
cache map[[fieldparams.RootLength]byte]ProofStorageSummary
}
func newProofCache() *proofCache {
return &proofCache{
cache: make(map[[fieldparams.RootLength]byte]ProofStorageSummary),
lowestCachedEpoch: params.BeaconConfig().FarFutureEpoch,
}
}
// Summary returns the ProofStorageSummary for `root`.
// The ProofStorageSummary can be used to check for the presence of proofs based on proofID.
func (pc *proofCache) Summary(root [fieldparams.RootLength]byte) ProofStorageSummary {
pc.mu.RLock()
defer pc.mu.RUnlock()
return pc.cache[root]
}
// HighestEpoch returns the highest cached epoch.
func (pc *proofCache) HighestEpoch() primitives.Epoch {
pc.mu.RLock()
defer pc.mu.RUnlock()
return pc.highestCachedEpoch
}
// set adds a proof to the cache.
func (pc *proofCache) set(ident ProofIdent) {
pc.mu.Lock()
defer pc.mu.Unlock()
summary := pc.cache[ident.BlockRoot]
if summary.proofIDs == nil {
summary.proofIDs = make(map[uint64]bool)
}
summary.epoch = ident.Epoch
if _, exists := summary.proofIDs[ident.ProofID]; exists {
pc.cache[ident.BlockRoot] = summary
return
}
summary.proofIDs[ident.ProofID] = true
pc.lowestCachedEpoch = min(pc.lowestCachedEpoch, ident.Epoch)
pc.highestCachedEpoch = max(pc.highestCachedEpoch, ident.Epoch)
pc.cache[ident.BlockRoot] = summary
pc.proofCount++
proofDiskCount.Set(pc.proofCount)
proofWrittenCounter.Inc()
}
// setMultiple adds multiple proofs to the cache.
func (pc *proofCache) setMultiple(ident ProofsIdent) {
pc.mu.Lock()
defer pc.mu.Unlock()
summary := pc.cache[ident.BlockRoot]
if summary.proofIDs == nil {
summary.proofIDs = make(map[uint64]bool)
}
summary.epoch = ident.Epoch
addedCount := 0
for _, proofID := range ident.ProofIDs {
if _, exists := summary.proofIDs[proofID]; exists {
continue
}
summary.proofIDs[proofID] = true
addedCount++
}
if addedCount == 0 {
pc.cache[ident.BlockRoot] = summary
return
}
pc.lowestCachedEpoch = min(pc.lowestCachedEpoch, ident.Epoch)
pc.highestCachedEpoch = max(pc.highestCachedEpoch, ident.Epoch)
pc.cache[ident.BlockRoot] = summary
pc.proofCount += float64(addedCount)
proofDiskCount.Set(pc.proofCount)
proofWrittenCounter.Add(float64(addedCount))
}
// get returns the ProofStorageSummary for the given block root.
// If the root is not in the cache, the second return value will be false.
func (pc *proofCache) get(blockRoot [fieldparams.RootLength]byte) (ProofStorageSummary, bool) {
pc.mu.RLock()
defer pc.mu.RUnlock()
summary, ok := pc.cache[blockRoot]
return summary, ok
}
// evict removes the ProofStorageSummary for the given block root from the cache.
func (pc *proofCache) evict(blockRoot [fieldparams.RootLength]byte) int {
pc.mu.Lock()
defer pc.mu.Unlock()
summary, ok := pc.cache[blockRoot]
if !ok {
return 0
}
deleted := len(summary.proofIDs)
delete(pc.cache, blockRoot)
if deleted > 0 {
pc.proofCount -= float64(deleted)
proofDiskCount.Set(pc.proofCount)
}
return deleted
}
// pruneUpTo removes all entries from the cache up to the given target epoch included.
func (pc *proofCache) pruneUpTo(targetEpoch primitives.Epoch) uint64 {
pc.mu.Lock()
defer pc.mu.Unlock()
prunedCount := uint64(0)
newLowestCachedEpoch := params.BeaconConfig().FarFutureEpoch
newHighestCachedEpoch := primitives.Epoch(0)
for blockRoot, summary := range pc.cache {
epoch := summary.epoch
if epoch > targetEpoch {
newLowestCachedEpoch = min(newLowestCachedEpoch, epoch)
newHighestCachedEpoch = max(newHighestCachedEpoch, epoch)
}
if epoch <= targetEpoch {
prunedCount += uint64(len(summary.proofIDs))
delete(pc.cache, blockRoot)
}
}
if prunedCount > 0 {
pc.lowestCachedEpoch = newLowestCachedEpoch
pc.highestCachedEpoch = newHighestCachedEpoch
pc.proofCount -= float64(prunedCount)
proofDiskCount.Set(pc.proofCount)
}
return prunedCount
}
// clear removes all entries from the cache.
func (pc *proofCache) clear() uint64 {
return pc.pruneUpTo(params.BeaconConfig().FarFutureEpoch)
}

View File

@@ -1,407 +0,0 @@
package filesystem
import (
"encoding/binary"
"os"
"testing"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/require"
"github.com/spf13/afero"
)
func createTestProof(t *testing.T, slot primitives.Slot, proofID uint64, blockRoot [32]byte) *ethpb.ExecutionProof {
t.Helper()
return &ethpb.ExecutionProof{
ProofId: primitives.ExecutionProofId(proofID),
Slot: slot,
BlockHash: make([]byte, 32),
BlockRoot: blockRoot[:],
ProofData: []byte("test proof data for proofID " + string(rune('0'+proofID))),
}
}
// assertProofsEqual compares two proofs by comparing their SSZ-encoded bytes.
func assertProofsEqual(t *testing.T, expected, actual *ethpb.ExecutionProof) {
t.Helper()
expectedSSZ, err := expected.MarshalSSZ()
require.NoError(t, err)
actualSSZ, err := actual.MarshalSSZ()
require.NoError(t, err)
require.DeepEqual(t, expectedSSZ, actualSSZ)
}
func TestNewProofStorage(t *testing.T) {
ctx := t.Context()
t.Run("No base path", func(t *testing.T) {
_, err := NewProofStorage(ctx)
require.ErrorIs(t, err, errNoProofBasePath)
})
t.Run("Nominal", func(t *testing.T) {
dir := t.TempDir()
storage, err := NewProofStorage(ctx, WithProofBasePath(dir))
require.NoError(t, err)
require.Equal(t, dir, storage.base)
})
}
func TestProofSaveAndGet(t *testing.T) {
t.Run("proof ID too large", func(t *testing.T) {
_, proofStorage := NewEphemeralProofStorageAndFs(t)
proof := &ethpb.ExecutionProof{
ProofId: primitives.ExecutionProofId(maxProofTypes), // too large
Slot: 1,
BlockHash: make([]byte, 32),
BlockRoot: make([]byte, 32),
ProofData: []byte("test"),
}
err := proofStorage.Save([]*ethpb.ExecutionProof{proof})
require.ErrorIs(t, err, errProofIDTooLarge)
})
t.Run("save empty slice", func(t *testing.T) {
_, proofStorage := NewEphemeralProofStorageAndFs(t)
err := proofStorage.Save([]*ethpb.ExecutionProof{})
require.NoError(t, err)
})
t.Run("save and get single proof", func(t *testing.T) {
_, proofStorage := NewEphemeralProofStorageAndFs(t)
blockRoot := [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}
proof := createTestProof(t, 32, 2, blockRoot)
err := proofStorage.Save([]*ethpb.ExecutionProof{proof})
require.NoError(t, err)
// Check summary
summary := proofStorage.Summary(blockRoot)
require.Equal(t, true, summary.HasProof(2))
require.Equal(t, false, summary.HasProof(0))
require.Equal(t, false, summary.HasProof(1))
require.Equal(t, 1, summary.Count())
// Get the proof
proofs, err := proofStorage.Get(blockRoot, []uint64{2})
require.NoError(t, err)
require.Equal(t, 1, len(proofs))
assertProofsEqual(t, proof, proofs[0])
})
t.Run("save and get multiple proofs", func(t *testing.T) {
_, proofStorage := NewEphemeralProofStorageAndFs(t)
blockRoot := [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}
// Save first proof
proof1 := createTestProof(t, 32, 0, blockRoot)
err := proofStorage.Save([]*ethpb.ExecutionProof{proof1})
require.NoError(t, err)
// Save second proof (should append to existing file)
proof2 := createTestProof(t, 32, 3, blockRoot)
err = proofStorage.Save([]*ethpb.ExecutionProof{proof2})
require.NoError(t, err)
// Save third proof
proof3 := createTestProof(t, 32, 7, blockRoot)
err = proofStorage.Save([]*ethpb.ExecutionProof{proof3})
require.NoError(t, err)
// Check summary
summary := proofStorage.Summary(blockRoot)
require.Equal(t, true, summary.HasProof(0))
require.Equal(t, false, summary.HasProof(1))
require.Equal(t, false, summary.HasProof(2))
require.Equal(t, true, summary.HasProof(3))
require.Equal(t, false, summary.HasProof(4))
require.Equal(t, false, summary.HasProof(5))
require.Equal(t, false, summary.HasProof(6))
require.Equal(t, true, summary.HasProof(7))
require.Equal(t, 3, summary.Count())
// Get all proofs
proofs, err := proofStorage.Get(blockRoot, nil)
require.NoError(t, err)
require.Equal(t, 3, len(proofs))
// Get specific proofs
proofs, err = proofStorage.Get(blockRoot, []uint64{0, 3})
require.NoError(t, err)
require.Equal(t, 2, len(proofs))
assertProofsEqual(t, proof1, proofs[0])
assertProofsEqual(t, proof2, proofs[1])
})
t.Run("duplicate proof is ignored", func(t *testing.T) {
_, proofStorage := NewEphemeralProofStorageAndFs(t)
blockRoot := [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}
proof := createTestProof(t, 32, 2, blockRoot)
// Save first time
err := proofStorage.Save([]*ethpb.ExecutionProof{proof})
require.NoError(t, err)
// Save same proof again (should be silently ignored)
err = proofStorage.Save([]*ethpb.ExecutionProof{proof})
require.NoError(t, err)
// Check count
summary := proofStorage.Summary(blockRoot)
require.Equal(t, 1, summary.Count())
// Get the proof
proofs, err := proofStorage.Get(blockRoot, nil)
require.NoError(t, err)
require.Equal(t, 1, len(proofs))
})
t.Run("get non-existent root", func(t *testing.T) {
_, proofStorage := NewEphemeralProofStorageAndFs(t)
proofs, err := proofStorage.Get([fieldparams.RootLength]byte{1}, []uint64{0, 1, 2})
require.NoError(t, err)
require.Equal(t, 0, len(proofs))
})
t.Run("get non-existent proofIDs", func(t *testing.T) {
_, proofStorage := NewEphemeralProofStorageAndFs(t)
blockRoot := [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}
proof := createTestProof(t, 32, 2, blockRoot)
err := proofStorage.Save([]*ethpb.ExecutionProof{proof})
require.NoError(t, err)
// Try to get proofIDs that don't exist
proofs, err := proofStorage.Get(blockRoot, []uint64{0, 1, 3, 4})
require.NoError(t, err)
require.Equal(t, 0, len(proofs))
})
}
func TestProofRemove(t *testing.T) {
t.Run("remove non-existent", func(t *testing.T) {
_, proofStorage := NewEphemeralProofStorageAndFs(t)
err := proofStorage.Remove([fieldparams.RootLength]byte{1})
require.NoError(t, err)
})
t.Run("remove existing", func(t *testing.T) {
_, proofStorage := NewEphemeralProofStorageAndFs(t)
blockRoot1 := [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}
blockRoot2 := [32]byte{32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1}
proof1 := createTestProof(t, 32, 0, blockRoot1)
proof2 := createTestProof(t, 64, 1, blockRoot2)
err := proofStorage.Save([]*ethpb.ExecutionProof{proof1})
require.NoError(t, err)
err = proofStorage.Save([]*ethpb.ExecutionProof{proof2})
require.NoError(t, err)
// Remove first proof
err = proofStorage.Remove(blockRoot1)
require.NoError(t, err)
// Check first proof is gone
summary := proofStorage.Summary(blockRoot1)
require.Equal(t, 0, summary.Count())
proofs, err := proofStorage.Get(blockRoot1, nil)
require.NoError(t, err)
require.Equal(t, 0, len(proofs))
// Check second proof still exists
summary = proofStorage.Summary(blockRoot2)
require.Equal(t, 1, summary.Count())
proofs, err = proofStorage.Get(blockRoot2, nil)
require.NoError(t, err)
require.Equal(t, 1, len(proofs))
})
}
func TestProofClear(t *testing.T) {
_, proofStorage := NewEphemeralProofStorageAndFs(t)
blockRoot1 := [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}
blockRoot2 := [32]byte{32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1}
proof1 := createTestProof(t, 32, 0, blockRoot1)
proof2 := createTestProof(t, 64, 1, blockRoot2)
err := proofStorage.Save([]*ethpb.ExecutionProof{proof1})
require.NoError(t, err)
err = proofStorage.Save([]*ethpb.ExecutionProof{proof2})
require.NoError(t, err)
// Clear all
err = proofStorage.Clear()
require.NoError(t, err)
// Check both are gone
summary := proofStorage.Summary(blockRoot1)
require.Equal(t, 0, summary.Count())
summary = proofStorage.Summary(blockRoot2)
require.Equal(t, 0, summary.Count())
}
func TestProofWarmCache(t *testing.T) {
fs, proofStorage := NewEphemeralProofStorageAndFs(t)
blockRoot1 := [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}
blockRoot2 := [32]byte{32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1}
// Save proofs
proof1a := createTestProof(t, 32, 0, blockRoot1)
proof1b := createTestProof(t, 32, 3, blockRoot1)
proof2 := createTestProof(t, 64, 5, blockRoot2)
err := proofStorage.Save([]*ethpb.ExecutionProof{proof1a})
require.NoError(t, err)
err = proofStorage.Save([]*ethpb.ExecutionProof{proof1b})
require.NoError(t, err)
err = proofStorage.Save([]*ethpb.ExecutionProof{proof2})
require.NoError(t, err)
// Verify files exist
files, err := afero.ReadDir(fs, "0/1")
require.NoError(t, err)
require.Equal(t, 1, len(files))
files, err = afero.ReadDir(fs, "0/2")
require.NoError(t, err)
require.Equal(t, 1, len(files))
// Create a new storage with the same filesystem
proofStorage2 := NewEphemeralProofStorageUsingFs(t, fs)
// Before warm cache, cache should be empty
summary := proofStorage2.Summary(blockRoot1)
require.Equal(t, 0, summary.Count())
// Warm cache
proofStorage2.WarmCache()
// After warm cache, cache should be populated
summary = proofStorage2.Summary(blockRoot1)
require.Equal(t, 2, summary.Count())
require.Equal(t, true, summary.HasProof(0))
require.Equal(t, true, summary.HasProof(3))
summary = proofStorage2.Summary(blockRoot2)
require.Equal(t, 1, summary.Count())
require.Equal(t, true, summary.HasProof(5))
}
func TestProofSubscribe(t *testing.T) {
_, proofStorage := NewEphemeralProofStorageAndFs(t)
sub, ch := proofStorage.Subscribe()
defer sub.Unsubscribe()
blockRoot := [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}
proof := createTestProof(t, 32, 2, blockRoot)
err := proofStorage.Save([]*ethpb.ExecutionProof{proof})
require.NoError(t, err)
// Should receive notification
ident := <-ch
require.Equal(t, blockRoot, ident.BlockRoot)
require.DeepEqual(t, []uint64{2}, ident.ProofIDs)
require.Equal(t, primitives.Epoch(1), ident.Epoch)
}
func TestProofReadHeader(t *testing.T) {
t.Run("wrong version", func(t *testing.T) {
_, proofStorage := NewEphemeralProofStorageAndFs(t)
blockRoot := [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}
proof := createTestProof(t, 32, 0, blockRoot)
err := proofStorage.Save([]*ethpb.ExecutionProof{proof})
require.NoError(t, err)
// Get the file path
filePath := proofFilePath(blockRoot, 1)
// Alter the version
file, err := proofStorage.fs.OpenFile(filePath, os.O_RDWR, os.FileMode(0600))
require.NoError(t, err)
_, err = file.Write([]byte{42}) // wrong version
require.NoError(t, err)
// Try to read header
_, _, err = proofStorage.readHeader(file)
require.ErrorIs(t, err, errWrongProofVersion)
err = file.Close()
require.NoError(t, err)
})
}
func TestEncodeOffsetTable(t *testing.T) {
var table proofOffsetTable
table[0] = proofSlotEntry{offset: 0, size: 100}
table[3] = proofSlotEntry{offset: 100, size: 200}
table[7] = proofSlotEntry{offset: 300, size: 300}
encoded := encodeOffsetTable(table)
require.Equal(t, proofOffsetTableSize, len(encoded))
// Decode manually and verify
var decoded proofOffsetTable
for i := range decoded {
pos := i * proofSlotSize
decoded[i].offset = binary.BigEndian.Uint32(encoded[pos : pos+proofOffsetSize])
decoded[i].size = binary.BigEndian.Uint32(encoded[pos+proofOffsetSize : pos+proofSlotSize])
}
require.Equal(t, table, decoded)
}
func TestProofFilePath(t *testing.T) {
blockRoot := [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}
epoch := primitives.Epoch(100)
path := proofFilePath(blockRoot, epoch)
require.Equal(t, "0/100/0x0102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f20.sszs", path)
}
func TestExtractProofFileMetadata(t *testing.T) {
t.Run("valid path", func(t *testing.T) {
path := "0/100/0x0102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f20.sszs"
metadata, err := extractProofFileMetadata(path)
require.NoError(t, err)
expectedRoot := [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}
require.Equal(t, uint64(0), metadata.period)
require.Equal(t, primitives.Epoch(100), metadata.epoch)
require.Equal(t, expectedRoot, metadata.blockRoot)
})
t.Run("invalid path - wrong number of parts", func(t *testing.T) {
_, err := extractProofFileMetadata("invalid/path.sszs")
require.ErrorContains(t, "unexpected proof file", err)
})
t.Run("invalid path - wrong extension", func(t *testing.T) {
_, err := extractProofFileMetadata("0/100/0x0102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f20.txt")
require.ErrorContains(t, "unexpected extension", err)
})
}

View File

@@ -67,7 +67,6 @@ func getSubscriptionStatusFromDB(t *testing.T, db *Store) bool {
return subscribed
}
func TestUpdateCustodyInfo(t *testing.T) {
ctx := t.Context()

View File

@@ -73,6 +73,7 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_k8s_client_go//tools/cache:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",

View File

@@ -7,6 +7,7 @@ import (
"strings"
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/execution/types"
@@ -58,6 +59,7 @@ var (
fuluEngineEndpoints = []string{
GetPayloadMethodV5,
GetBlobsV2,
GetBlobsV3,
}
)
@@ -99,6 +101,8 @@ const (
GetBlobsV1 = "engine_getBlobsV1"
// GetBlobsV2 request string for JSON-RPC.
GetBlobsV2 = "engine_getBlobsV2"
// GetBlobsV3 request string for JSON-RPC.
GetBlobsV3 = "engine_getBlobsV3"
// Defines the seconds before timing out engine endpoints with non-block execution semantics.
defaultEngineTimeout = time.Second
)
@@ -122,7 +126,7 @@ type Reconstructor interface {
ctx context.Context, blindedBlocks []interfaces.ReadOnlySignedBeaconBlock,
) ([]interfaces.SignedBeaconBlock, error)
ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [fieldparams.RootLength]byte, hi func(uint64) bool) ([]blocks.VerifiedROBlob, error)
ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error)
ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, []blocks.PartialDataColumn, error)
}
// EngineCaller defines a client that can interact with an Ethereum
@@ -553,6 +557,22 @@ func (s *Service) GetBlobsV2(ctx context.Context, versionedHashes []common.Hash)
return result, handleRPCError(err)
}
func (s *Service) GetBlobsV3(ctx context.Context, versionedHashes []common.Hash) ([]*pb.BlobAndProofV2, error) {
ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.GetBlobsV3")
defer span.End()
start := time.Now()
if !s.capabilityCache.has(GetBlobsV3) {
return nil, errors.New(fmt.Sprintf("%s is not supported", GetBlobsV3))
}
getBlobsV3RequestsTotal.Inc()
result := make([]*pb.BlobAndProofV2, len(versionedHashes))
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV3, versionedHashes)
getBlobsV3Latency.Observe(time.Since(start).Seconds())
return result, handleRPCError(err)
}
// ReconstructFullBlock takes in a blinded beacon block and reconstructs
// a beacon block with a full execution payload via the engine API.
func (s *Service) ReconstructFullBlock(
@@ -663,40 +683,47 @@ func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.
return verifiedBlobs, nil
}
func (s *Service) ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error) {
func (s *Service) ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, []blocks.PartialDataColumn, error) {
root := populator.Root()
// Fetch cells and proofs from the execution client using the KZG commitments from the sidecar.
commitments, err := populator.Commitments()
if err != nil {
return nil, wrapWithBlockRoot(err, root, "commitments")
return nil, nil, wrapWithBlockRoot(err, root, "commitments")
}
cellsPerBlob, proofsPerBlob, err := s.fetchCellsAndProofsFromExecution(ctx, commitments)
included, cellsPerBlob, proofsPerBlob, err := s.fetchCellsAndProofsFromExecution(ctx, commitments)
log.Info("Received cells and proofs from execution client", "included", included, "cells count", len(cellsPerBlob), "err", err)
if err != nil {
return nil, wrapWithBlockRoot(err, root, "fetch cells and proofs from execution client")
return nil, nil, wrapWithBlockRoot(err, root, "fetch cells and proofs from execution client")
}
// Return early if nothing is returned from the EL.
if len(cellsPerBlob) == 0 {
return nil, nil
partialColumns, err := peerdas.PartialColumns(included, cellsPerBlob, proofsPerBlob, populator)
haveAllBlobs := included.Count() == uint64(len(commitments))
log.Info("Constructed partial columns", "haveAllBlobs", haveAllBlobs)
if haveAllBlobs {
// Construct data column sidears from the signed block and cells and proofs.
roSidecars, err := peerdas.DataColumnSidecars(cellsPerBlob, proofsPerBlob, populator)
if err != nil {
return nil, nil, wrapWithBlockRoot(err, populator.Root(), "data column sidcars from column sidecar")
}
// Upgrade the sidecars to verified sidecars.
// We trust the execution layer we are connected to, so we can upgrade the sidecar into a verified one.
verifiedROSidecars := upgradeSidecarsToVerifiedSidecars(roSidecars)
return verifiedROSidecars, partialColumns, nil
}
// Construct data column sidears from the signed block and cells and proofs.
roSidecars, err := peerdas.DataColumnSidecars(cellsPerBlob, proofsPerBlob, populator)
if err != nil {
return nil, wrapWithBlockRoot(err, populator.Root(), "data column sidcars from column sidecar")
return nil, nil, wrapWithBlockRoot(err, populator.Root(), "partial columns from column sidecar")
}
// Upgrade the sidecars to verified sidecars.
// We trust the execution layer we are connected to, so we can upgrade the sidecar into a verified one.
verifiedROSidecars := upgradeSidecarsToVerifiedSidecars(roSidecars)
return verifiedROSidecars, nil
return nil, partialColumns, nil
}
// fetchCellsAndProofsFromExecution fetches cells and proofs from the execution client (using engine_getBlobsV2 execution API method)
func (s *Service) fetchCellsAndProofsFromExecution(ctx context.Context, kzgCommitments [][]byte) ([][]kzg.Cell, [][]kzg.Proof, error) {
func (s *Service) fetchCellsAndProofsFromExecution(ctx context.Context, kzgCommitments [][]byte) (bitfield.Bitlist /* included parts */, [][]kzg.Cell, [][]kzg.Proof, error) {
// Collect KZG hashes for all blobs.
versionedHashes := make([]common.Hash, 0, len(kzgCommitments))
for _, commitment := range kzgCommitments {
@@ -704,24 +731,34 @@ func (s *Service) fetchCellsAndProofsFromExecution(ctx context.Context, kzgCommi
versionedHashes = append(versionedHashes, versionedHash)
}
var blobAndProofs []*pb.BlobAndProofV2
// Fetch all blobsAndCellsProofs from the execution client.
blobAndProofV2s, err := s.GetBlobsV2(ctx, versionedHashes)
if err != nil {
return nil, nil, errors.Wrapf(err, "get blobs V2")
var err error
useV3 := s.capabilityCache.has(GetBlobsV3)
if useV3 {
// v3 can return a partial response. V2 is all or nothing
blobAndProofs, err = s.GetBlobsV3(ctx, versionedHashes)
} else {
blobAndProofs, err = s.GetBlobsV2(ctx, versionedHashes)
}
// Return early if nothing is returned from the EL.
if len(blobAndProofV2s) == 0 {
return nil, nil, nil
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "get blobs V2/3")
}
// Compute cells and proofs from the blobs and cell proofs.
cellsPerBlob, proofsPerBlob, err := peerdas.ComputeCellsAndProofsFromStructured(blobAndProofV2s)
included, cellsPerBlob, proofsPerBlob, err := peerdas.ComputeCellsAndProofsFromStructured(uint64(len(kzgCommitments)), blobAndProofs)
if err != nil {
return nil, nil, errors.Wrap(err, "compute cells and proofs")
return nil, nil, nil, errors.Wrap(err, "compute cells and proofs")
}
if included.Count() == uint64(len(kzgCommitments)) {
getBlobsV3CompleteResponsesTotal.Inc()
} else if included.Count() > 0 {
getBlobsV3PartialResponsesTotal.Inc()
}
return cellsPerBlob, proofsPerBlob, nil
return included, cellsPerBlob, proofsPerBlob, nil
}
// upgradeSidecarsToVerifiedSidecars upgrades a list of data column sidecars into verified data column sidecars.

View File

@@ -2587,7 +2587,7 @@ func TestConstructDataColumnSidecars(t *testing.T) {
ctx := context.Background()
t.Run("GetBlobsV2 is not supported", func(t *testing.T) {
_, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
_, _, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.ErrorContains(t, "engine_getBlobsV2 is not supported", err)
})
@@ -2598,7 +2598,7 @@ func TestConstructDataColumnSidecars(t *testing.T) {
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
dataColumns, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
dataColumns, _, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
require.Equal(t, 0, len(dataColumns))
})
@@ -2611,7 +2611,7 @@ func TestConstructDataColumnSidecars(t *testing.T) {
rpcClient, client := setupRpcClientV2(t, srv.URL, client)
defer rpcClient.Close()
dataColumns, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
dataColumns, _, err := client.ConstructDataColumnSidecars(ctx, peerdas.PopulateFromBlock(roBlock))
require.NoError(t, err)
require.Equal(t, 128, len(dataColumns))
})

View File

@@ -34,6 +34,25 @@ var (
Buckets: []float64{25, 50, 100, 200, 500, 1000, 2000, 4000},
},
)
getBlobsV3RequestsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "beacon_engine_getBlobsV3_requests_total",
Help: "Total number of engine_getBlobsV3 requests sent",
})
getBlobsV3CompleteResponsesTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "beacon_engine_getBlobsV3_complete_responses_total",
Help: "Total number of complete engine_getBlobsV3 successful responses received",
})
getBlobsV3PartialResponsesTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "beacon_engine_getBlobsV3_partial_responses_total",
Help: "Total number of engine_getBlobsV3 partial responses received",
})
getBlobsV3Latency = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "beacon_engine_getBlobsV3_request_duration_seconds",
Help: "Duration of engine_getBlobsV3 requests in seconds",
Buckets: []float64{0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 4},
},
)
errParseCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "execution_parse_error_count",
Help: "The number of errors that occurred while parsing execution payload",

View File

@@ -118,8 +118,8 @@ func (e *EngineClient) ReconstructBlobSidecars(context.Context, interfaces.ReadO
}
// ConstructDataColumnSidecars is a mock implementation of the ConstructDataColumnSidecars method.
func (e *EngineClient) ConstructDataColumnSidecars(context.Context, peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error) {
return e.DataColumnSidecars, e.ErrorDataColumnSidecars
func (e *EngineClient) ConstructDataColumnSidecars(context.Context, peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, []blocks.PartialDataColumn, error) {
return e.DataColumnSidecars, nil, e.ErrorDataColumnSidecars
}
// GetTerminalBlockHash --

View File

@@ -123,8 +123,6 @@ type BeaconNode struct {
BlobStorageOptions []filesystem.BlobStorageOption
DataColumnStorage *filesystem.DataColumnStorage
DataColumnStorageOptions []filesystem.DataColumnStorageOption
ProofStorage *filesystem.ProofStorage
ProofStorageOptions []filesystem.ProofStorageOption
verifyInitWaiter *verification.InitializerWaiter
lhsp *verification.LazyHeadStateProvider
syncChecker *initialsync.SyncChecker
@@ -229,15 +227,6 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
return nil, errors.Wrap(err, "could not clear data column storage")
}
if beacon.ProofStorage == nil {
proofStorage, err := filesystem.NewProofStorage(cliCtx.Context, beacon.ProofStorageOptions...)
if err != nil {
return nil, errors.Wrap(err, "new proof storage")
}
beacon.ProofStorage = proofStorage
}
bfs, err := startBaseServices(cliCtx, beacon, depositAddress, dbClearer)
if err != nil {
return nil, errors.Wrap(err, "could not start modules")
@@ -679,6 +668,7 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
DB: b.db,
StateGen: b.stateGen,
ClockWaiter: b.ClockWaiter,
PartialDataColumns: b.cliCtx.Bool(flags.PartialDataColumns.Name),
})
if err != nil {
return err
@@ -758,13 +748,11 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st
blockchain.WithSyncComplete(syncComplete),
blockchain.WithBlobStorage(b.BlobStorage),
blockchain.WithDataColumnStorage(b.DataColumnStorage),
blockchain.WithProofStorage(b.ProofStorage),
blockchain.WithTrackedValidatorsCache(b.trackedValidatorsCache),
blockchain.WithPayloadIDCache(b.payloadIDCache),
blockchain.WithSyncChecker(b.syncChecker),
blockchain.WithSlasherEnabled(b.slasherEnabled),
blockchain.WithLightClientStore(b.lcStore),
blockchain.WithOperationNotifier(b),
)
blockchainService, err := blockchain.NewService(b.ctx, opts...)
@@ -849,7 +837,6 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil
regularsync.WithStateNotifier(b),
regularsync.WithBlobStorage(b.BlobStorage),
regularsync.WithDataColumnStorage(b.DataColumnStorage),
regularsync.WithExecutionProofStorage(b.ProofStorage),
regularsync.WithVerifierWaiter(b.verifyInitWaiter),
regularsync.WithAvailableBlocker(bFillStore),
regularsync.WithTrackedValidatorsCache(b.trackedValidatorsCache),
@@ -976,7 +963,6 @@ func (b *BeaconNode) registerRPCService(router *http.ServeMux) error {
BlockReceiver: chainService,
BlobReceiver: chainService,
DataColumnReceiver: chainService,
ProofReceiver: chainService,
AttestationReceiver: chainService,
GenesisTimeFetcher: chainService,
GenesisFetcher: chainService,

View File

@@ -35,13 +35,6 @@ func WithBuilderFlagOptions(opts []builder.Option) Option {
}
}
func WithConfigOptions(opt ...params.Option) Option {
return func(bn *BeaconNode) error {
bn.ConfigOptions = append(bn.ConfigOptions, opt...)
return nil
}
}
// WithBlobStorage sets the BlobStorage backend for the BeaconNode
func WithBlobStorage(bs *filesystem.BlobStorage) Option {
return func(bn *BeaconNode) error {
@@ -59,6 +52,13 @@ func WithBlobStorageOptions(opt ...filesystem.BlobStorageOption) Option {
}
}
func WithConfigOptions(opt ...params.Option) Option {
return func(bn *BeaconNode) error {
bn.ConfigOptions = append(bn.ConfigOptions, opt...)
return nil
}
}
// WithDataColumnStorage sets the DataColumnStorage backend for the BeaconNode
func WithDataColumnStorage(bs *filesystem.DataColumnStorage) Option {
return func(bn *BeaconNode) error {
@@ -75,20 +75,3 @@ func WithDataColumnStorageOptions(opt ...filesystem.DataColumnStorageOption) Opt
return nil
}
}
// WithDataColumnStorage sets the DataColumnStorage backend for the BeaconNode
func WithProofStorage(bs *filesystem.ProofStorage) Option {
return func(bn *BeaconNode) error {
bn.ProofStorage = bs
return nil
}
}
// WithDataColumnStorageOptions appends 1 or more filesystem.DataColumnStorageOption on the beacon node,
// to be used when initializing data column storage.
func WithProofStorageOption(opt ...filesystem.ProofStorageOption) Option {
return func(bn *BeaconNode) error {
bn.ProofStorageOptions = append(bn.ProofStorageOptions, opt...)
return nil
}
}

View File

@@ -52,6 +52,7 @@ go_library(
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/partialdatacolumnbroadcaster:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
@@ -166,7 +167,6 @@ go_test(
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state/stategen/mock:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/features:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",

View File

@@ -343,7 +343,7 @@ func (s *Service) BroadcastLightClientFinalityUpdate(ctx context.Context, update
// there is at least one peer in each needed subnet. If not, it will attempt to find one before broadcasting.
// This function is non-blocking. It stops trying to broadcast a given sidecar when more than one slot has passed, or the context is
// cancelled (whichever comes first).
func (s *Service) BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn) error {
func (s *Service) BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn, partialColumns []blocks.PartialDataColumn) error {
// Increase the number of broadcast attempts.
dataColumnSidecarBroadcastAttempts.Add(float64(len(sidecars)))
@@ -353,16 +353,15 @@ func (s *Service) BroadcastDataColumnSidecars(ctx context.Context, sidecars []bl
return errors.Wrap(err, "current fork digest")
}
go s.broadcastDataColumnSidecars(ctx, forkDigest, sidecars)
go s.broadcastDataColumnSidecars(ctx, forkDigest, sidecars, partialColumns)
return nil
}
// broadcastDataColumnSidecars broadcasts multiple data column sidecars to the p2p network.
// For sidecars with available peers, it uses batch publishing.
// For sidecars without peers, it finds peers first and then publishes individually.
// Both paths run in parallel. It returns when all broadcasts are complete, or the context is cancelled.
func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [fieldparams.VersionLength]byte, sidecars []blocks.VerifiedRODataColumn) {
// broadcastDataColumnSidecars broadcasts multiple data column sidecars to the p2p network, after ensuring
// there is at least one peer in each needed subnet. If not, it will attempt to find one before broadcasting.
// It returns when all broadcasts are complete, or the context is cancelled (whichever comes first).
func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [fieldparams.VersionLength]byte, sidecars []blocks.VerifiedRODataColumn, partialColumns []blocks.PartialDataColumn) {
type rootAndIndex struct {
root [fieldparams.RootLength]byte
index uint64
@@ -405,7 +404,7 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
// Batch publish sidecars that already have peers
var messageBatch pubsub.MessageBatch
for _, sidecar := range sidecarsWithPeers {
for i, sidecar := range sidecarsWithPeers {
batchWg.Go(func() {
_, span := trace.StartSpan(ctx, "p2p.broadcastDataColumnSidecars")
ctx := trace.NewContext(s.ctx, span)
@@ -419,6 +418,25 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
return
}
if s.partialColumnBroadcaster != nil && i < len(partialColumns) {
fullTopicStr := topic + s.Encoding().ProtocolSuffix()
if err := s.partialColumnBroadcaster.Publish(fullTopicStr, partialColumns[i]); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot partial broadcast data column sidecar")
}
}
// Broadcast the data column sidecar to the network.
if err := s.broadcastObject(ctx, sidecar, topic); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot broadcast data column sidecar")
return
}
// Increase the number of successful broadcasts.
dataColumnSidecarBroadcasts.Inc()
// Record the timing for log purposes.
if logLevel >= logrus.DebugLevel {
root := sidecar.BlockRoot()
timings.Store(rootAndIndex{root: root, index: sidecar.Index}, time.Now())

View File

@@ -224,6 +224,7 @@ func TestService_BroadcastAttestation(t *testing.T) {
}
func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
t.Skip("TODO: Unship flaky test once done rebasing branch")
const port = uint(2000)
// The DB has to be shared in all peers to avoid the
@@ -803,7 +804,7 @@ func TestService_BroadcastDataColumn(t *testing.T) {
}, 5*time.Second, 10*time.Millisecond, "libp2p mesh did not establish")
// Broadcast to peers and wait.
err = service.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{verifiedRoSidecar})
err = service.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{verifiedRoSidecar}, nil)
require.NoError(t, err)
// Receive the message.
@@ -867,7 +868,7 @@ func (*rpcOrderTracer) DeliverMessage(*pubsub.Message) {}
func (*rpcOrderTracer) RejectMessage(*pubsub.Message, string) {}
func (*rpcOrderTracer) DuplicateMessage(*pubsub.Message) {}
func (*rpcOrderTracer) ThrottlePeer(peer.ID) {}
func (*rpcOrderTracer) RecvRPC(*pubsub.RPC) {}
func (*rpcOrderTracer) RecvRPC(*pubsub.RPC, peer.ID) {}
func (*rpcOrderTracer) DropRPC(*pubsub.RPC, peer.ID) {}
func (*rpcOrderTracer) UndeliverableMessage(*pubsub.Message) {}
@@ -878,6 +879,7 @@ func (*rpcOrderTracer) UndeliverableMessage(*pubsub.Message) {}
// Without batch publishing: A,A,A,A,B,B,B,B (all peers for column A, then all for column B)
// With batch publishing: A,B,A,B,A,B,A,B (interleaved by message ID)
func TestService_BroadcastDataColumnRoundRobin(t *testing.T) {
t.Skip("TODO: Aarsh will fix batch publishing for partial data columns")
const (
port = 2100
topicFormat = DataColumnSubnetTopicFormat
@@ -969,7 +971,7 @@ func TestService_BroadcastDataColumnRoundRobin(t *testing.T) {
time.Sleep(100 * time.Millisecond)
// Broadcast all sidecars.
err = service.BroadcastDataColumnSidecars(ctx, verifiedRoSidecars)
err = service.BroadcastDataColumnSidecars(ctx, verifiedRoSidecars, nil)
require.NoError(t, err)
// Give some time for messages to be sent.
time.Sleep(100 * time.Millisecond)

View File

@@ -26,6 +26,7 @@ const (
// Config for the p2p service. These parameters are set from application level flags
// to initialize the p2p service.
type Config struct {
PartialDataColumns bool
NoDiscovery bool
EnableUPnP bool
StaticPeerID bool

View File

@@ -589,11 +589,6 @@ func (s *Service) createLocalNode(
localNode.Set(quicEntry)
}
if features.Get().EnableZkvm {
zkvmKeyEntry := enr.WithEntry(zkvmEnabledKeyEnrKey, true)
localNode.Set(zkvmKeyEntry)
}
localNode.SetFallbackIP(ipAddr)
localNode.SetFallbackUDP(udpPort)

View File

@@ -25,7 +25,6 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers/scorers"
testp2p "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v7/config/features"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/wrapper"
leakybucket "github.com/OffchainLabs/prysm/v7/container/leaky-bucket"
@@ -244,19 +243,12 @@ func TestCreateLocalNode(t *testing.T) {
name string
cfg *Config
expectedError bool
zkvmEnabled bool
}{
{
name: "valid config",
cfg: &Config{},
expectedError: false,
},
{
name: "valid config with zkVM enabled",
cfg: &Config{},
expectedError: false,
zkvmEnabled: true,
},
{
name: "invalid host address",
cfg: &Config{HostAddress: "invalid"},
@@ -281,15 +273,6 @@ func TestCreateLocalNode(t *testing.T) {
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
if tt.zkvmEnabled {
resetCfg := features.InitWithReset(&features.Flags{
EnableZkvm: true,
})
t.Cleanup(func() {
resetCfg()
})
}
// Define ports. Use unique ports since this test validates ENR content.
const (
udpPort = 3100
@@ -365,14 +348,6 @@ func TestCreateLocalNode(t *testing.T) {
custodyGroupCount := new(uint64)
require.NoError(t, localNode.Node().Record().Load(enr.WithEntry(params.BeaconNetworkConfig().CustodyGroupCountKey, custodyGroupCount)))
require.Equal(t, custodyRequirement, *custodyGroupCount)
// Check zkVM enabled key if applicable.
if tt.zkvmEnabled {
zkvmEnabled := new(bool)
require.NoError(t, localNode.Node().Record().Load(enr.WithEntry(params.BeaconNetworkConfig().ZkvmEnabledKey, zkvmEnabled)))
require.Equal(t, features.Get().EnableZkvm, *zkvmEnabled)
}
})
}
}

View File

@@ -52,9 +52,6 @@ const (
// lightClientFinalityUpdateWeight specifies the scoring weight that we apply to
// our light client finality update topic.
lightClientFinalityUpdateWeight = 0.05
// executionProofWeight specifies the scoring weight that we apply to
// our execution proof topic.
executionProofWeight = 0.05
// maxInMeshScore describes the max score a peer can attain from being in the mesh.
maxInMeshScore = 10
@@ -148,8 +145,6 @@ func (s *Service) topicScoreParams(topic string) (*pubsub.TopicScoreParams, erro
return defaultLightClientOptimisticUpdateTopicParams(), nil
case strings.Contains(topic, GossipLightClientFinalityUpdateMessage):
return defaultLightClientFinalityUpdateTopicParams(), nil
case strings.Contains(topic, GossipExecutionProofMessage):
return defaultExecutionProofTopicParams(), nil
default:
return nil, errors.Errorf("unrecognized topic provided for parameter registration: %s", topic)
}
@@ -515,28 +510,6 @@ func defaultBlsToExecutionChangeTopicParams() *pubsub.TopicScoreParams {
}
}
func defaultExecutionProofTopicParams() *pubsub.TopicScoreParams {
return &pubsub.TopicScoreParams{
TopicWeight: executionProofWeight,
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: 2,
FirstMessageDeliveriesDecay: scoreDecay(oneHundredEpochs),
FirstMessageDeliveriesCap: 5,
MeshMessageDeliveriesWeight: 0,
MeshMessageDeliveriesDecay: 0,
MeshMessageDeliveriesCap: 0,
MeshMessageDeliveriesThreshold: 0,
MeshMessageDeliveriesWindow: 0,
MeshMessageDeliveriesActivation: 0,
MeshFailurePenaltyWeight: 0,
MeshFailurePenaltyDecay: 0,
InvalidMessageDeliveriesWeight: -2000,
InvalidMessageDeliveriesDecay: scoreDecay(invalidDecayPeriod),
}
}
func defaultLightClientOptimisticUpdateTopicParams() *pubsub.TopicScoreParams {
return &pubsub.TopicScoreParams{
TopicWeight: lightClientOptimisticUpdateWeight,

View File

@@ -25,7 +25,6 @@ var gossipTopicMappings = map[string]func() proto.Message{
LightClientOptimisticUpdateTopicFormat: func() proto.Message { return &ethpb.LightClientOptimisticUpdateAltair{} },
LightClientFinalityUpdateTopicFormat: func() proto.Message { return &ethpb.LightClientFinalityUpdateAltair{} },
DataColumnSubnetTopicFormat: func() proto.Message { return &ethpb.DataColumnSidecar{} },
ExecutionProofSubnetTopicFormat: func() proto.Message { return &ethpb.ExecutionProof{} },
}
// GossipTopicMappings is a function to return the assigned data type

View File

@@ -4,6 +4,7 @@ import (
"context"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
@@ -28,6 +29,7 @@ type (
Broadcaster
SetStreamHandler
PubSubProvider
PartialColumnBroadcasterProvider
PubSubTopicUser
SenderEncoder
PeerManager
@@ -52,7 +54,7 @@ type (
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
BroadcastLightClientOptimisticUpdate(ctx context.Context, update interfaces.LightClientOptimisticUpdate) error
BroadcastLightClientFinalityUpdate(ctx context.Context, update interfaces.LightClientFinalityUpdate) error
BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn) error
BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn, partialColumns []blocks.PartialDataColumn) error
}
// SetStreamHandler configures p2p to handle streams of a certain topic ID.
@@ -92,6 +94,11 @@ type (
PubSub() *pubsub.PubSub
}
// PubSubProvider provides the p2p pubsub protocol.
PartialColumnBroadcasterProvider interface {
PartialColumnBroadcaster() *partialdatacolumnbroadcaster.PartialColumnBroadcaster
}
// PeerManager abstracts some peer management methods from libp2p.
PeerManager interface {
Disconnect(peer.ID) error

View File

@@ -157,6 +157,11 @@ var (
Help: "The number of publish messages received via rpc for a particular topic",
},
[]string{"topic"})
pubsubRPCPubRecvSize = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_recv_pub_size_total",
Help: "The total size of publish messages received via rpc for a particular topic",
},
[]string{"topic", "is_partial"})
pubsubRPCDrop = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_drop_total",
Help: "The number of messages dropped via rpc for a particular control message",
@@ -171,6 +176,11 @@ var (
Help: "The number of publish messages dropped via rpc for a particular topic",
},
[]string{"topic"})
pubsubRPCPubDropSize = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_drop_pub_size_total",
Help: "The total size of publish messages dropped via rpc for a particular topic",
},
[]string{"topic", "is_partial"})
pubsubRPCSent = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_sent_total",
Help: "The number of messages sent via rpc for a particular control message",
@@ -185,6 +195,16 @@ var (
Help: "The number of publish messages sent via rpc for a particular topic",
},
[]string{"topic"})
pubsubRPCPubSentSize = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gossipsub_pubsub_rpc_sent_pub_size_total",
Help: "The total size of publish messages sent via rpc for a particular topic",
},
[]string{"topic", "is_partial"})
pubsubMeshPeers = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "gossipsub_mesh_peers",
Help: "The number of capable peers in mesh",
},
[]string{"topic", "supports_partial"})
)
func (s *Service) updateMetrics() {

View File

@@ -0,0 +1,28 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"log.go",
"metrics.go",
"partial.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster",
visibility = ["//visibility:public"],
deps = [
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//internal/logrusadapter:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages/bitmap:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -0,0 +1,25 @@
load("@prysm//tools/go:def.bzl", "go_test")
go_test(
name = "go_default_test",
size = "medium",
srcs = ["two_node_test.go"],
deps = [
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/partialdatacolumnbroadcaster:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p//x/simlibp2p:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_marcopolo_simnet//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -0,0 +1,239 @@
package integrationtest
import (
"context"
"crypto/rand"
"fmt"
"testing"
"testing/synctest"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
"github.com/OffchainLabs/prysm/v7/testing/util"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
simlibp2p "github.com/libp2p/go-libp2p/x/simlibp2p"
"github.com/marcopolo/simnet"
"github.com/sirupsen/logrus"
)
// TestTwoNodePartialColumnExchange tests that two nodes can exchange partial columns
// and reconstruct the complete column. Node 1 has cells 0-2, Node 2 has cells 3-5.
// After exchange, both should have all cells.
func TestTwoNodePartialColumnExchange(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
// Create a simulated libp2p network
latency := time.Millisecond * 10
network, meta, err := simlibp2p.SimpleLibp2pNetwork([]simlibp2p.NodeLinkSettingsAndCount{
{LinkSettings: simnet.NodeBiDiLinkSettings{
Downlink: simnet.LinkSettings{BitsPerSecond: 20 * simlibp2p.OneMbps, Latency: latency / 2},
Uplink: simnet.LinkSettings{BitsPerSecond: 20 * simlibp2p.OneMbps, Latency: latency / 2},
}, Count: 2},
}, simlibp2p.NetworkSettings{UseBlankHost: true})
require.NoError(t, err)
require.NoError(t, network.Start())
defer func() {
require.NoError(t, network.Close())
}()
defer func() {
for _, node := range meta.Nodes {
err := node.Close()
if err != nil {
panic(err)
}
}
}()
h1 := meta.Nodes[0]
h2 := meta.Nodes[1]
logger := logrus.New()
logger.SetLevel(logrus.DebugLevel)
broadcaster1 := partialdatacolumnbroadcaster.NewBroadcaster(logger)
broadcaster2 := partialdatacolumnbroadcaster.NewBroadcaster(logger)
opts1 := broadcaster1.AppendPubSubOpts([]pubsub.Option{
pubsub.WithMessageSigning(false),
pubsub.WithStrictSignatureVerification(false),
})
opts2 := broadcaster2.AppendPubSubOpts([]pubsub.Option{
pubsub.WithMessageSigning(false),
pubsub.WithStrictSignatureVerification(false),
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ps1, err := pubsub.NewGossipSub(ctx, h1, opts1...)
require.NoError(t, err)
ps2, err := pubsub.NewGossipSub(ctx, h2, opts2...)
require.NoError(t, err)
go broadcaster1.Start()
go broadcaster2.Start()
defer func() {
broadcaster1.Stop()
broadcaster2.Stop()
}()
// Generate Test Data
var blockRoot [fieldparams.RootLength]byte
copy(blockRoot[:], []byte("test-block-root"))
numCells := 6
commitments := make([][]byte, numCells)
cells := make([][]byte, numCells)
proofs := make([][]byte, numCells)
for i := range numCells {
commitments[i] = make([]byte, 48)
cells[i] = make([]byte, 2048)
_, err := rand.Read(cells[i])
require.NoError(t, err)
proofs[i] = make([]byte, 48)
_ = fmt.Appendf(proofs[i][:0], "proof %d", i)
}
roDC, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{
{
BodyRoot: blockRoot[:],
KzgCommitments: commitments,
Column: cells,
KzgProofs: proofs,
},
})
pc1, err := blocks.NewPartialDataColumn(roDC[0].DataColumnSidecar.SignedBlockHeader, roDC[0].Index, roDC[0].KzgCommitments, roDC[0].KzgCommitmentsInclusionProof)
require.NoError(t, err)
pc2, err := blocks.NewPartialDataColumn(roDC[0].DataColumnSidecar.SignedBlockHeader, roDC[0].Index, roDC[0].KzgCommitments, roDC[0].KzgCommitmentsInclusionProof)
require.NoError(t, err)
// Split data
for i := range numCells {
if i%2 == 0 {
pc1.ExtendFromVerfifiedCell(uint64(i), roDC[0].Column[i], roDC[0].KzgProofs[i])
} else {
pc2.ExtendFromVerfifiedCell(uint64(i), roDC[0].Column[i], roDC[0].KzgProofs[i])
}
}
// Setup Topic and Subscriptions
digest := params.ForkDigest(0)
columnIndex := uint64(12)
subnet := peerdas.ComputeSubnetForDataColumnSidecar(columnIndex)
topicStr := fmt.Sprintf(p2p.DataColumnSubnetTopicFormat, digest, subnet) +
encoder.SszNetworkEncoder{}.ProtocolSuffix()
time.Sleep(100 * time.Millisecond)
topic1, err := ps1.Join(topicStr, pubsub.RequestPartialMessages())
require.NoError(t, err)
topic2, err := ps2.Join(topicStr, pubsub.RequestPartialMessages())
require.NoError(t, err)
// Header validator that verifies the inclusion proof
headerValidator := func(header *ethpb.PartialDataColumnHeader) (reject bool, err error) {
if header == nil {
return false, fmt.Errorf("nil header")
}
if header.SignedBlockHeader == nil || header.SignedBlockHeader.Header == nil {
return true, fmt.Errorf("nil signed block header")
}
if len(header.KzgCommitments) == 0 {
return true, fmt.Errorf("empty kzg commitments")
}
// Verify inclusion proof
if err := peerdas.VerifyPartialDataColumnHeaderInclusionProof(header); err != nil {
return true, fmt.Errorf("invalid inclusion proof: %w", err)
}
t.Log("Header validation passed")
return false, nil
}
cellValidator := func(_ []blocks.CellProofBundle) error {
return nil
}
node1Complete := make(chan blocks.VerifiedRODataColumn, 1)
node2Complete := make(chan blocks.VerifiedRODataColumn, 1)
handler1 := func(topic string, col blocks.VerifiedRODataColumn) {
t.Logf("Node 1: Completed! Column has %d cells", len(col.Column))
node1Complete <- col
}
handler2 := func(topic string, col blocks.VerifiedRODataColumn) {
t.Logf("Node 2: Completed! Column has %d cells", len(col.Column))
node2Complete <- col
}
// Connect hosts
err = h1.Connect(context.Background(), peer.AddrInfo{
ID: h2.ID(),
Addrs: h2.Addrs(),
})
require.NoError(t, err)
time.Sleep(300 * time.Millisecond)
// Subscribe to regular GossipSub (critical for partial message RPC exchange!)
sub1, err := topic1.Subscribe()
require.NoError(t, err)
defer sub1.Cancel()
sub2, err := topic2.Subscribe()
require.NoError(t, err)
defer sub2.Cancel()
err = broadcaster1.Subscribe(topic1, headerValidator, cellValidator, handler1)
require.NoError(t, err)
err = broadcaster2.Subscribe(topic2, headerValidator, cellValidator, handler2)
require.NoError(t, err)
// Wait for mesh to form
time.Sleep(2 * time.Second)
// Publish
t.Log("Publishing from Node 1")
err = broadcaster1.Publish(topicStr, pc1)
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
t.Log("Publishing from Node 2")
err = broadcaster2.Publish(topicStr, pc2)
require.NoError(t, err)
// Wait for Completion
timeout := time.After(10 * time.Second)
var col1, col2 blocks.VerifiedRODataColumn
receivedCount := 0
for receivedCount < 2 {
select {
case col1 = <-node1Complete:
t.Log("Node 1 completed reconstruction")
receivedCount++
case col2 = <-node2Complete:
t.Log("Node 2 completed reconstruction")
receivedCount++
case <-timeout:
t.Fatalf("Timeout: Only %d/2 nodes completed", receivedCount)
}
}
// Verify both columns have all cells
assert.Equal(t, numCells, len(col1.Column), "Node 1 should have all cells")
assert.Equal(t, numCells, len(col2.Column), "Node 2 should have all cells")
assert.DeepSSZEqual(t, cells, col1.Column, "Node 1 cell mismatch")
assert.DeepSSZEqual(t, cells, col2.Column, "Node 2 cell mismatch")
})
}

View File

@@ -0,0 +1,9 @@
// Code generated by hack/gen-logs.sh; DO NOT EDIT.
// This file is created and regenerated automatically. Anything added here might get removed.
package partialdatacolumnbroadcaster
import "github.com/sirupsen/logrus"
// The prefix for logs from this package will be the text after the last slash in the package path.
// If you wish to change this, you should add your desired name in the runtime/logging/logrus-prefixed-formatter/prefix-replacement.go file.
var log = logrus.WithField("package", "beacon-chain/p2p/partialdatacolumnbroadcaster")

View File

@@ -0,0 +1,18 @@
package partialdatacolumnbroadcaster
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
partialMessageUsefulCellsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "beacon_partial_message_useful_cells_total",
Help: "Number of useful cells received via a partial message",
}, []string{"column_index"})
partialMessageCellsReceivedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "beacon_partial_message_cells_received_total",
Help: "Number of total cells received via a partial message",
}, []string{"column_index"})
)

View File

@@ -0,0 +1,544 @@
package partialdatacolumnbroadcaster
import (
"bytes"
"log/slog"
"regexp"
"strconv"
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/internal/logrusadapter"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p-pubsub/partialmessages"
"github.com/libp2p/go-libp2p-pubsub/partialmessages/bitmap"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// TODOs:
// different eager push strategies:
// - no eager push
// - full column eager push
// - With debouncing - some factor of RTT
// - eager push missing cells
const TTLInSlots = 3
const maxConcurrentValidators = 128
var dataColumnTopicRegex = regexp.MustCompile(`data_column_sidecar_(\d+)`)
func extractColumnIndexFromTopic(topic string) (uint64, error) {
matches := dataColumnTopicRegex.FindStringSubmatch(topic)
if len(matches) < 2 {
return 0, errors.New("could not extract column index from topic")
}
return strconv.ParseUint(matches[1], 10, 64)
}
// HeaderValidator validates a PartialDataColumnHeader.
// Returns (reject, err) where:
// - reject=true, err!=nil: REJECT - peer should be penalized
// - reject=false, err!=nil: IGNORE - don't penalize, just ignore
// - reject=false, err=nil: valid header
type HeaderValidator func(header *ethpb.PartialDataColumnHeader) (reject bool, err error)
type ColumnValidator func(cells []blocks.CellProofBundle) error
type PartialColumnBroadcaster struct {
logger *logrus.Logger
ps *pubsub.PubSub
stop chan struct{}
// map topic -> headerValidators
headerValidators map[string]HeaderValidator
// map topic -> Validator
validators map[string]ColumnValidator
// map topic -> handler
handlers map[string]SubHandler
// map topic -> *pubsub.Topic
topics map[string]*pubsub.Topic
concurrentValidatorSemaphore chan struct{}
// map topic -> map[groupID]PartialColumn
partialMsgStore map[string]map[string]*blocks.PartialDataColumn
groupTTL map[string]int8
// validHeaderCache caches validated headers by group ID (works across topics)
validHeaderCache map[string]*ethpb.PartialDataColumnHeader
incomingReq chan request
}
type requestKind uint8
const (
requestKindPublish requestKind = iota
requestKindSubscribe
requestKindUnsubscribe
requestKindHandleIncomingRPC
requestKindCellsValidated
)
type request struct {
kind requestKind
response chan error
sub subscribe
unsub unsubscribe
publish publish
incomingRPC rpcWithFrom
cellsValidated *cellsValidated
}
type publish struct {
topic string
c blocks.PartialDataColumn
}
type subscribe struct {
t *pubsub.Topic
headerValidator HeaderValidator
validator ColumnValidator
handler SubHandler
}
type unsubscribe struct {
topic string
}
type rpcWithFrom struct {
*pubsub_pb.PartialMessagesExtension
from peer.ID
}
type cellsValidated struct {
validationTook time.Duration
topic string
group []byte
cellIndices []uint64
cells []blocks.CellProofBundle
}
func NewBroadcaster(logger *logrus.Logger) *PartialColumnBroadcaster {
return &PartialColumnBroadcaster{
validators: make(map[string]ColumnValidator),
headerValidators: make(map[string]HeaderValidator),
handlers: make(map[string]SubHandler),
topics: make(map[string]*pubsub.Topic),
partialMsgStore: make(map[string]map[string]*blocks.PartialDataColumn),
groupTTL: make(map[string]int8),
validHeaderCache: make(map[string]*ethpb.PartialDataColumnHeader),
// GossipSub sends the messages to this channel. The buffer should be
// big enough to avoid dropping messages. We don't want to block the gossipsub event loop for this.
incomingReq: make(chan request, 128*16),
logger: logger,
concurrentValidatorSemaphore: make(chan struct{}, maxConcurrentValidators),
}
}
// AppendPubSubOpts adds the necessary pubsub options to enable partial messages.
func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubsub.Option {
slogger := slog.New(logrusadapter.Handler{Logger: p.logger})
opts = append(opts,
pubsub.WithPartialMessagesExtension(&partialmessages.PartialMessagesExtension{
Logger: slogger,
MergePartsMetadata: func(topic string, left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
if len(left) == 0 {
return right
}
merged, err := bitfield.Bitlist(left).Or(bitfield.Bitlist(right))
if err != nil {
p.logger.Warn("Failed to merge bitfields", "err", err, "left", left, "right", right)
return left
}
return partialmessages.PartsMetadata(merged)
},
ValidateRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
// TODO. Add some basic and fast sanity checks
return nil
},
OnIncomingRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
select {
case p.incomingReq <- request{
kind: requestKindHandleIncomingRPC,
incomingRPC: rpcWithFrom{rpc, from},
}:
default:
p.logger.Warn("Dropping incoming partial RPC", "rpc", rpc)
}
return nil
},
}),
func(ps *pubsub.PubSub) error {
p.ps = ps
return nil
},
)
return opts
}
// Start starts the event loop of the PartialColumnBroadcaster. Should be called
// within a goroutine (go p.Start())
func (p *PartialColumnBroadcaster) Start() {
if p.stop != nil {
return
}
p.stop = make(chan struct{})
p.loop()
}
func (p *PartialColumnBroadcaster) loop() {
cleanup := time.NewTicker(time.Second * time.Duration(params.BeaconConfig().SecondsPerSlot))
defer cleanup.Stop()
for {
select {
case <-p.stop:
return
case <-cleanup.C:
for groupID, ttl := range p.groupTTL {
if ttl > 0 {
p.groupTTL[groupID] = ttl - 1
continue
}
delete(p.groupTTL, groupID)
delete(p.validHeaderCache, groupID)
for topic, msgStore := range p.partialMsgStore {
delete(msgStore, groupID)
if len(msgStore) == 0 {
delete(p.partialMsgStore, topic)
}
}
}
case req := <-p.incomingReq:
switch req.kind {
case requestKindPublish:
req.response <- p.publish(req.publish.topic, req.publish.c)
case requestKindSubscribe:
req.response <- p.subscribe(req.sub.t, req.sub.headerValidator, req.sub.validator, req.sub.handler)
case requestKindUnsubscribe:
req.response <- p.unsubscribe(req.unsub.topic)
case requestKindHandleIncomingRPC:
err := p.handleIncomingRPC(req.incomingRPC)
if err != nil {
p.logger.Error("Failed to handle incoming partial RPC", "err", err)
}
case requestKindCellsValidated:
err := p.handleCellsValidated(req.cellsValidated)
if err != nil {
p.logger.Error("Failed to handle cells validated", "err", err)
}
default:
p.logger.Error("Unknown request kind", "kind", req.kind)
}
}
}
}
func (p *PartialColumnBroadcaster) getDataColumn(topic string, group []byte) *blocks.PartialDataColumn {
topicStore, ok := p.partialMsgStore[topic]
if !ok {
return nil
}
msg, ok := topicStore[string(group)]
if !ok {
return nil
}
return msg
}
func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) error {
if p.ps == nil {
return errors.New("pubsub not initialized")
}
hasMessage := len(rpcWithFrom.PartialMessage) > 0
var message ethpb.PartialDataColumnSidecar
if hasMessage {
err := message.UnmarshalSSZ(rpcWithFrom.PartialMessage)
if err != nil {
return errors.Wrap(err, "failed to unmarshal partial message data")
}
}
topicID := rpcWithFrom.GetTopicID()
groupID := rpcWithFrom.GroupID
ourDataColumn := p.getDataColumn(topicID, groupID)
var shouldRepublish bool
if ourDataColumn == nil && hasMessage {
var header *ethpb.PartialDataColumnHeader
// Check cache first for this group
if cachedHeader, ok := p.validHeaderCache[string(groupID)]; ok {
header = cachedHeader
} else {
// We haven't seen this group before. Check if we have a valid header.
if len(message.Header) == 0 {
p.logger.Debug("No partial column found and no header in message, ignoring")
return nil
}
header = message.Header[0]
headerValidator, ok := p.headerValidators[topicID]
if !ok || headerValidator == nil {
p.logger.Debug("No header validator registered for topic")
return nil
}
reject, err := headerValidator(header)
if err != nil {
p.logger.Debug("Header validation failed", "err", err, "reject", reject)
if reject {
// REJECT case: penalize the peer
_ = p.ps.PeerFeedback(topicID, rpcWithFrom.from, pubsub.PeerFeedbackInvalidMessage)
}
// Both REJECT and IGNORE: don't process further
return nil
}
// Cache the valid header
p.validHeaderCache[string(groupID)] = header
// TODO: We now have the information we need to call GetBlobsV3, we should do that to see what we have locally.
}
columnIndex, err := extractColumnIndexFromTopic(topicID)
if err != nil {
return err
}
newColumn, err := blocks.NewPartialDataColumn(
header.SignedBlockHeader,
columnIndex,
header.KzgCommitments,
header.KzgCommitmentsInclusionProof,
)
if err != nil {
p.logger.WithError(err).WithFields(logrus.Fields{
"topic": topicID,
"columnIndex": columnIndex,
"numCommitments": len(header.KzgCommitments),
}).Error("Failed to create partial data column from header")
return err
}
// Save to store
topicStore, ok := p.partialMsgStore[topicID]
if !ok {
topicStore = make(map[string]*blocks.PartialDataColumn)
p.partialMsgStore[topicID] = topicStore
}
topicStore[string(newColumn.GroupID())] = &newColumn
p.groupTTL[string(newColumn.GroupID())] = TTLInSlots
ourDataColumn = &newColumn
shouldRepublish = true
}
if ourDataColumn == nil {
// We don't have a partial column for this. Can happen if we got cells
// without a header.
return nil
}
logger := p.logger.WithFields(logrus.Fields{
"from": rpcWithFrom.from,
"topic": topicID,
"group": groupID,
})
validator, validatorOK := p.validators[topicID]
if len(rpcWithFrom.PartialMessage) > 0 && validatorOK {
// TODO: is there any penalty we want to consider for giving us data we didn't request?
// Note that we need to be careful around race conditions and eager data.
// Also note that protobufs by design allow extra data that we don't parse.
// Marco's thoughts. No, we don't need to do anything else here.
cellIndices, cellsToVerify, err := ourDataColumn.CellsToVerifyFromPartialMessage(&message)
if err != nil {
return err
}
// Track cells received via partial message
if len(cellIndices) > 0 {
columnIndexStr := strconv.FormatUint(ourDataColumn.Index, 10)
partialMessageCellsReceivedTotal.WithLabelValues(columnIndexStr).Add(float64(len(cellIndices)))
}
if len(cellsToVerify) > 0 {
p.concurrentValidatorSemaphore <- struct{}{}
go func() {
defer func() {
<-p.concurrentValidatorSemaphore
}()
start := time.Now()
err := validator(cellsToVerify)
if err != nil {
logger.Error("failed to validate cells", "err", err)
_ = p.ps.PeerFeedback(topicID, rpcWithFrom.from, pubsub.PeerFeedbackInvalidMessage)
return
}
_ = p.ps.PeerFeedback(topicID, rpcWithFrom.from, pubsub.PeerFeedbackUsefulMessage)
p.incomingReq <- request{
kind: requestKindCellsValidated,
cellsValidated: &cellsValidated{
validationTook: time.Since(start),
topic: topicID,
group: groupID,
cells: cellsToVerify,
cellIndices: cellIndices,
},
}
}()
}
}
peerHas := bitmap.Bitmap(rpcWithFrom.PartsMetadata)
iHave := bitmap.Bitmap(ourDataColumn.PartsMetadata())
if !shouldRepublish && len(peerHas) > 0 && !bytes.Equal(peerHas, iHave) {
// Either we have something they don't or vice versa
shouldRepublish = true
logger.Debug("republishing due to parts metadata difference")
}
if shouldRepublish {
err := p.ps.PublishPartialMessage(topicID, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
return err
}
}
return nil
}
func (p *PartialColumnBroadcaster) handleCellsValidated(cells *cellsValidated) error {
ourDataColumn := p.getDataColumn(cells.topic, cells.group)
if ourDataColumn == nil {
return errors.New("data column not found for verified cells")
}
extended := ourDataColumn.ExtendFromVerfifiedCells(cells.cellIndices, cells.cells)
p.logger.Debug("Extended partial message", "duration", cells.validationTook, "extended", extended)
columnIndexStr := strconv.FormatUint(ourDataColumn.Index, 10)
if extended {
// Track useful cells (cells that extended our data)
partialMessageUsefulCellsTotal.WithLabelValues(columnIndexStr).Add(float64(len(cells.cells)))
// TODO: we could use the heuristic here that if this data was
// useful to us, it's likely useful to our peers and we should
// republish eagerly
if col, ok := ourDataColumn.Complete(p.logger); ok {
p.logger.Info("Completed partial column", "topic", cells.topic, "group", cells.group)
handler, handlerOK := p.handlers[cells.topic]
if handlerOK {
go handler(cells.topic, col)
}
} else {
p.logger.Info("Extended partial column", "topic", cells.topic, "group", cells.group)
}
err := p.ps.PublishPartialMessage(cells.topic, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
return err
}
}
return nil
}
func (p *PartialColumnBroadcaster) Stop() {
if p.stop != nil {
close(p.stop)
p.stop = nil
}
}
// Publish publishes the partial column.
func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataColumn) error {
if p.ps == nil {
return errors.New("pubsub not initialized")
}
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindPublish,
response: respCh,
publish: publish{
topic: topic,
c: c,
},
}
return <-respCh
}
func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataColumn) error {
topicStore, ok := p.partialMsgStore[topic]
if !ok {
topicStore = make(map[string]*blocks.PartialDataColumn)
p.partialMsgStore[topic] = topicStore
}
topicStore[string(c.GroupID())] = &c
p.groupTTL[string(c.GroupID())] = TTLInSlots
return p.ps.PublishPartialMessage(topic, &c, partialmessages.PublishOptions{})
}
type SubHandler func(topic string, col blocks.VerifiedRODataColumn)
func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindSubscribe,
sub: subscribe{
t: t,
headerValidator: headerValidator,
validator: validator,
handler: handler,
},
response: respCh,
}
return <-respCh
}
func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
topic := t.String()
if _, ok := p.topics[topic]; ok {
return errors.New("already subscribed")
}
p.topics[topic] = t
p.headerValidators[topic] = headerValidator
p.validators[topic] = validator
p.handlers[topic] = handler
return nil
}
func (p *PartialColumnBroadcaster) Unsubscribe(topic string) error {
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindUnsubscribe,
unsub: unsubscribe{
topic: topic,
},
response: respCh,
}
return <-respCh
}
func (p *PartialColumnBroadcaster) unsubscribe(topic string) error {
t, ok := p.topics[topic]
if !ok {
return errors.New("topic not found")
}
delete(p.topics, topic)
delete(p.partialMsgStore, topic)
delete(p.headerValidators, topic)
delete(p.validators, topic)
delete(p.handlers, topic)
return t.Close()
}

View File

@@ -602,33 +602,6 @@ func (p *Status) All() []peer.ID {
return pids
}
// ZkvmEnabledPeers returns all connected peers that have zkvm enabled in their ENR.
func (p *Status) ZkvmEnabledPeers() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState != Connected {
continue
}
if peerData.Enr == nil {
continue
}
var enabled bool
entry := enr.WithEntry(params.BeaconNetworkConfig().ZkvmEnabledKey, &enabled)
if err := peerData.Enr.Load(entry); err != nil {
continue
}
if enabled {
peers = append(peers, pid)
}
}
return peers
}
// Prune clears out and removes outdated and disconnected peers.
func (p *Status) Prune() {
p.store.Lock()

View File

@@ -58,7 +58,7 @@ func TestPeerExplicitAdd(t *testing.T) {
resAddress, err := p.Address(id)
require.NoError(t, err)
assert.Equal(t, address, resAddress, "Unexpected address")
assert.Equal(t, address.Equal(resAddress), true, "Unexpected address")
resDirection, err := p.Direction(id)
require.NoError(t, err)
@@ -72,7 +72,7 @@ func TestPeerExplicitAdd(t *testing.T) {
resAddress2, err := p.Address(id)
require.NoError(t, err)
assert.Equal(t, address2, resAddress2, "Unexpected address")
assert.Equal(t, address2.Equal(resAddress2), true, "Unexpected address")
resDirection2, err := p.Direction(id)
require.NoError(t, err)
@@ -1341,75 +1341,3 @@ func createPeer(t *testing.T, p *peers.Status, addr ma.Multiaddr,
p.SetConnectionState(id, state)
return id
}
func TestZkvmEnabledPeers(t *testing.T) {
p := peers.NewStatus(t.Context(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 1,
},
},
})
// Create peer 1: Connected, zkVM enabled
pid1 := addPeer(t, p, peers.Connected)
record1 := new(enr.Record)
zkvmEnabled := true
record1.Set(enr.WithEntry(params.BeaconNetworkConfig().ZkvmEnabledKey, &zkvmEnabled))
p.Add(record1, pid1, nil, network.DirOutbound)
p.SetConnectionState(pid1, peers.Connected)
// Create peer 2: Connected, zkVM disabled
pid2 := addPeer(t, p, peers.Connected)
record2 := new(enr.Record)
zkvmDisabled := false
record2.Set(enr.WithEntry(params.BeaconNetworkConfig().ZkvmEnabledKey, &zkvmDisabled))
p.Add(record2, pid2, nil, network.DirOutbound)
p.SetConnectionState(pid2, peers.Connected)
// Create peer 3: Connected, zkVM enabled
pid3 := addPeer(t, p, peers.Connected)
record3 := new(enr.Record)
record3.Set(enr.WithEntry(params.BeaconNetworkConfig().ZkvmEnabledKey, &zkvmEnabled))
p.Add(record3, pid3, nil, network.DirOutbound)
p.SetConnectionState(pid3, peers.Connected)
// Create peer 4: Disconnected, zkVM enabled (should not be included)
pid4 := addPeer(t, p, peers.Disconnected)
record4 := new(enr.Record)
record4.Set(enr.WithEntry(params.BeaconNetworkConfig().ZkvmEnabledKey, &zkvmEnabled))
p.Add(record4, pid4, nil, network.DirOutbound)
p.SetConnectionState(pid4, peers.Disconnected)
// Create peer 5: Connected, no ENR (should not be included)
pid5 := addPeer(t, p, peers.Connected)
p.Add(nil, pid5, nil, network.DirOutbound)
p.SetConnectionState(pid5, peers.Connected)
// Create peer 6: Connected, no zkVM key in ENR (should not be included)
pid6 := addPeer(t, p, peers.Connected)
record6 := new(enr.Record)
record6.Set(enr.WithEntry("other_key", "other_value"))
p.Add(record6, pid6, nil, network.DirOutbound)
p.SetConnectionState(pid6, peers.Connected)
// Get zkVM enabled peers
zkvmPeers := p.ZkvmEnabledPeers()
// Should return only pid1 and pid3 (connected peers with zkVM enabled)
assert.Equal(t, 2, len(zkvmPeers), "Expected 2 zkVM enabled peers")
// Verify the returned peers are correct
zkvmPeerMap := make(map[peer.ID]bool)
for _, pid := range zkvmPeers {
zkvmPeerMap[pid] = true
}
assert.Equal(t, true, zkvmPeerMap[pid1], "pid1 should be in zkVM enabled peers")
assert.Equal(t, true, zkvmPeerMap[pid3], "pid3 should be in zkVM enabled peers")
assert.Equal(t, false, zkvmPeerMap[pid2], "pid2 should not be in zkVM enabled peers (disabled)")
assert.Equal(t, false, zkvmPeerMap[pid4], "pid4 should not be in zkVM enabled peers (disconnected)")
assert.Equal(t, false, zkvmPeerMap[pid5], "pid5 should not be in zkVM enabled peers (no ENR)")
assert.Equal(t, false, zkvmPeerMap[pid6], "pid6 should not be in zkVM enabled peers (no zkVM key)")
}

View File

@@ -170,7 +170,7 @@ func (s *Service) pubsubOptions() []pubsub.Option {
pubsub.WithPeerScore(peerScoringParams(s.cfg.IPColocationWhitelist)),
pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute),
pubsub.WithGossipSubParams(pubsubGossipParam()),
pubsub.WithRawTracer(gossipTracer{host: s.host}),
pubsub.WithRawTracer(&gossipTracer{host: s.host}),
}
if len(s.cfg.StaticPeers) > 0 {
@@ -181,6 +181,9 @@ func (s *Service) pubsubOptions() []pubsub.Option {
}
psOpts = append(psOpts, pubsub.WithDirectPeers(directPeersAddrInfos))
}
if s.partialColumnBroadcaster != nil {
psOpts = s.partialColumnBroadcaster.AppendPubSubOpts(psOpts)
}
return psOpts
}

View File

@@ -1,6 +1,8 @@
package p2p
import (
"sync"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
@@ -8,7 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
var _ = pubsub.RawTracer(gossipTracer{})
var _ = pubsub.RawTracer(&gossipTracer{})
// Initializes the values for the pubsub rpc action.
type action int
@@ -23,85 +25,146 @@ const (
// and broadcasted through gossipsub.
type gossipTracer struct {
host host.Host
mu sync.Mutex
// map topic -> Set(peerID). Peer is in set if it supports partial messages.
partialMessagePeers map[string]map[peer.ID]struct{}
// map topic -> Set(peerID). Peer is in set if in the mesh.
meshPeers map[string]map[peer.ID]struct{}
}
// AddPeer .
func (g gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {
func (g *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {
// no-op
}
// RemovePeer .
func (g gossipTracer) RemovePeer(p peer.ID) {
// no-op
func (g *gossipTracer) RemovePeer(p peer.ID) {
g.mu.Lock()
defer g.mu.Unlock()
for _, peers := range g.partialMessagePeers {
delete(peers, p)
}
for topic, peers := range g.meshPeers {
if _, ok := peers[p]; ok {
delete(peers, p)
g.updateMeshPeersMetric(topic)
}
}
}
// Join .
func (g gossipTracer) Join(topic string) {
func (g *gossipTracer) Join(topic string) {
pubsubTopicsActive.WithLabelValues(topic).Set(1)
g.mu.Lock()
defer g.mu.Unlock()
if g.partialMessagePeers == nil {
g.partialMessagePeers = make(map[string]map[peer.ID]struct{})
}
if g.partialMessagePeers[topic] == nil {
g.partialMessagePeers[topic] = make(map[peer.ID]struct{})
}
if g.meshPeers == nil {
g.meshPeers = make(map[string]map[peer.ID]struct{})
}
if g.meshPeers[topic] == nil {
g.meshPeers[topic] = make(map[peer.ID]struct{})
}
}
// Leave .
func (g gossipTracer) Leave(topic string) {
func (g *gossipTracer) Leave(topic string) {
pubsubTopicsActive.WithLabelValues(topic).Set(0)
g.mu.Lock()
defer g.mu.Unlock()
delete(g.partialMessagePeers, topic)
delete(g.meshPeers, topic)
}
// Graft .
func (g gossipTracer) Graft(p peer.ID, topic string) {
func (g *gossipTracer) Graft(p peer.ID, topic string) {
pubsubTopicsGraft.WithLabelValues(topic).Inc()
g.mu.Lock()
defer g.mu.Unlock()
if m, ok := g.meshPeers[topic]; ok {
m[p] = struct{}{}
}
g.updateMeshPeersMetric(topic)
}
// Prune .
func (g gossipTracer) Prune(p peer.ID, topic string) {
func (g *gossipTracer) Prune(p peer.ID, topic string) {
pubsubTopicsPrune.WithLabelValues(topic).Inc()
g.mu.Lock()
defer g.mu.Unlock()
if m, ok := g.meshPeers[topic]; ok {
delete(m, p)
}
g.updateMeshPeersMetric(topic)
}
// ValidateMessage .
func (g gossipTracer) ValidateMessage(msg *pubsub.Message) {
func (g *gossipTracer) ValidateMessage(msg *pubsub.Message) {
pubsubMessageValidate.WithLabelValues(*msg.Topic).Inc()
}
// DeliverMessage .
func (g gossipTracer) DeliverMessage(msg *pubsub.Message) {
func (g *gossipTracer) DeliverMessage(msg *pubsub.Message) {
pubsubMessageDeliver.WithLabelValues(*msg.Topic).Inc()
}
// RejectMessage .
func (g gossipTracer) RejectMessage(msg *pubsub.Message, reason string) {
func (g *gossipTracer) RejectMessage(msg *pubsub.Message, reason string) {
pubsubMessageReject.WithLabelValues(*msg.Topic, reason).Inc()
}
// DuplicateMessage .
func (g gossipTracer) DuplicateMessage(msg *pubsub.Message) {
func (g *gossipTracer) DuplicateMessage(msg *pubsub.Message) {
pubsubMessageDuplicate.WithLabelValues(*msg.Topic).Inc()
}
// UndeliverableMessage .
func (g gossipTracer) UndeliverableMessage(msg *pubsub.Message) {
func (g *gossipTracer) UndeliverableMessage(msg *pubsub.Message) {
pubsubMessageUndeliverable.WithLabelValues(*msg.Topic).Inc()
}
// ThrottlePeer .
func (g gossipTracer) ThrottlePeer(p peer.ID) {
func (g *gossipTracer) ThrottlePeer(p peer.ID) {
agent := agentFromPid(p, g.host.Peerstore())
pubsubPeerThrottle.WithLabelValues(agent).Inc()
}
// RecvRPC .
func (g gossipTracer) RecvRPC(rpc *pubsub.RPC) {
g.setMetricFromRPC(recv, pubsubRPCSubRecv, pubsubRPCPubRecv, pubsubRPCRecv, rpc)
func (g *gossipTracer) RecvRPC(rpc *pubsub.RPC, from peer.ID) {
g.setMetricFromRPC(recv, pubsubRPCSubRecv, pubsubRPCPubRecv, pubsubRPCPubRecvSize, pubsubRPCRecv, rpc)
g.mu.Lock()
defer g.mu.Unlock()
for _, sub := range rpc.Subscriptions {
m, ok := g.partialMessagePeers[sub.GetTopicid()]
if !ok {
continue
}
if sub.GetSubscribe() && sub.GetRequestsPartial() {
m[from] = struct{}{}
} else {
delete(m, from)
}
}
}
// SendRPC .
func (g gossipTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {
g.setMetricFromRPC(send, pubsubRPCSubSent, pubsubRPCPubSent, pubsubRPCSent, rpc)
func (g *gossipTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {
g.setMetricFromRPC(send, pubsubRPCSubSent, pubsubRPCPubSent, pubsubRPCPubSentSize, pubsubRPCSent, rpc)
}
// DropRPC .
func (g gossipTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {
g.setMetricFromRPC(drop, pubsubRPCSubDrop, pubsubRPCPubDrop, pubsubRPCDrop, rpc)
func (g *gossipTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {
g.setMetricFromRPC(drop, pubsubRPCSubDrop, pubsubRPCPubDrop, pubsubRPCPubDropSize, pubsubRPCDrop, rpc)
}
func (g gossipTracer) setMetricFromRPC(act action, subCtr prometheus.Counter, pubCtr, ctrlCtr *prometheus.CounterVec, rpc *pubsub.RPC) {
func (g *gossipTracer) setMetricFromRPC(act action, subCtr prometheus.Counter, pubCtr, pubSizeCtr, ctrlCtr *prometheus.CounterVec, rpc *pubsub.RPC) {
subCtr.Add(float64(len(rpc.Subscriptions)))
if rpc.Control != nil {
ctrlCtr.WithLabelValues("graft").Add(float64(len(rpc.Control.Graft)))
@@ -110,12 +173,41 @@ func (g gossipTracer) setMetricFromRPC(act action, subCtr prometheus.Counter, pu
ctrlCtr.WithLabelValues("iwant").Add(float64(len(rpc.Control.Iwant)))
ctrlCtr.WithLabelValues("idontwant").Add(float64(len(rpc.Control.Idontwant)))
}
// For incoming messages from pubsub, we do not record metrics for them as these values
// could be junk.
if act == recv {
return
}
for _, msg := range rpc.Publish {
// For incoming messages from pubsub, we do not record metrics for them as these values
// could be junk.
if act == recv {
continue
}
pubCtr.WithLabelValues(*msg.Topic).Inc()
pubCtr.WithLabelValues(msg.GetTopic()).Inc()
pubSizeCtr.WithLabelValues(msg.GetTopic(), "false").Add(float64(msg.Size()))
}
if rpc.Partial != nil {
pubCtr.WithLabelValues(rpc.Partial.GetTopicID()).Inc()
pubSizeCtr.WithLabelValues(rpc.Partial.GetTopicID(), "true").Add(float64(rpc.Partial.Size()))
}
}
// updateMeshPeersMetric requires the caller to hold the state mutex
func (g *gossipTracer) updateMeshPeersMetric(topic string) {
meshPeers, ok := g.meshPeers[topic]
if !ok {
return
}
partialPeers, ok := g.partialMessagePeers[topic]
if !ok {
return
}
var supportsPartial, doesNotSupportPartial float64
for p := range meshPeers {
if _, ok := partialPeers[p]; ok {
supportsPartial++
} else {
doesNotSupportPartial++
}
}
pubsubMeshPeers.WithLabelValues(topic, "true").Set(supportsPartial)
pubsubMeshPeers.WithLabelValues(topic, "false").Set(doesNotSupportPartial)
}

View File

@@ -67,9 +67,6 @@ const (
// DataColumnSidecarsByRangeName is the name for the DataColumnSidecarsByRange v1 message topic.
DataColumnSidecarsByRangeName = "/data_column_sidecars_by_range"
// ExecutionProofsByRootName is the name for the ExecutionProofsByRoot v1 message topic.
ExecutionProofsByRootName = "/execution_proofs_by_root"
)
const (
@@ -109,9 +106,6 @@ const (
// RPCDataColumnSidecarsByRangeTopicV1 is a topic for requesting data column sidecars by their slot.
// /eth2/beacon_chain/req/data_column_sidecars_by_range/1 - New in Fulu.
RPCDataColumnSidecarsByRangeTopicV1 = protocolPrefix + DataColumnSidecarsByRangeName + SchemaVersionV1
// RPCExecutionProofsByRootTopicV1 is a topic for requesting execution proofs by their block root.
// /eth2/beacon_chain/req/execution_proofs_by_root/1 - New in Fulu.
RPCExecutionProofsByRootTopicV1 = protocolPrefix + ExecutionProofsByRootName + SchemaVersionV1
// V2 RPC Topics
// RPCStatusTopicV2 defines the v1 topic for the status rpc method.
@@ -176,9 +170,6 @@ var (
// DataColumnSidecarsByRoot v1 Message
RPCDataColumnSidecarsByRootTopicV1: p2ptypes.DataColumnsByRootIdentifiers{},
// ExecutionProofsByRoot v1 Message
RPCExecutionProofsByRootTopicV1: new(pb.ExecutionProofsByRootRequest),
}
// Maps all registered protocol prefixes.
@@ -202,7 +193,6 @@ var (
LightClientOptimisticUpdateName: true,
DataColumnSidecarsByRootName: true,
DataColumnSidecarsByRangeName: true,
ExecutionProofsByRootName: true,
}
// Maps all the RPC messages which are to updated in altair.

View File

@@ -11,6 +11,7 @@ import (
"github.com/OffchainLabs/prysm/v7/async"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers/scorers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/types"
@@ -77,6 +78,7 @@ type Service struct {
privKey *ecdsa.PrivateKey
metaData metadata.Metadata
pubsub *pubsub.PubSub
partialColumnBroadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster
joinedTopics map[string]*pubsub.Topic
joinedTopicsLock sync.RWMutex
subnetsLock map[uint64]*sync.RWMutex
@@ -147,6 +149,10 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
custodyInfoSet: make(chan struct{}),
}
if cfg.PartialDataColumns {
s.partialColumnBroadcaster = partialdatacolumnbroadcaster.NewBroadcaster(log.Logger)
}
ipAddr := prysmnetwork.IPAddr()
opts, err := s.buildOptions(ipAddr, s.privKey)
@@ -305,6 +311,10 @@ func (s *Service) Start() {
logExternalDNSAddr(s.host.ID(), p2pHostDNS, p2pTCPPort)
}
go s.forkWatcher()
if s.partialColumnBroadcaster != nil {
go s.partialColumnBroadcaster.Start()
}
}
// Stop the p2p service and terminate all peer connections.
@@ -314,6 +324,10 @@ func (s *Service) Stop() error {
if s.dv5Listener != nil {
s.dv5Listener.Close()
}
if s.partialColumnBroadcaster != nil {
s.partialColumnBroadcaster.Stop()
}
return nil
}
@@ -350,6 +364,10 @@ func (s *Service) PubSub() *pubsub.PubSub {
return s.pubsub
}
func (s *Service) PartialColumnBroadcaster() *partialdatacolumnbroadcaster.PartialColumnBroadcaster {
return s.partialColumnBroadcaster
}
// Host returns the currently running libp2p
// host of the service.
func (s *Service) Host() host.Host {

View File

@@ -36,7 +36,6 @@ var (
attSubnetEnrKey = params.BeaconNetworkConfig().AttSubnetKey
syncCommsSubnetEnrKey = params.BeaconNetworkConfig().SyncCommsSubnetKey
custodyGroupCountEnrKey = params.BeaconNetworkConfig().CustodyGroupCountKey
zkvmEnabledKeyEnrKey = params.BeaconNetworkConfig().ZkvmEnabledKey
)
// The value used with the subnet, in order

View File

@@ -21,6 +21,7 @@ go_library(
deps = [
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/partialdatacolumnbroadcaster:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//config/fieldparams:go_default_library",

View File

@@ -4,6 +4,7 @@ import (
"context"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
@@ -108,6 +109,10 @@ func (*FakeP2P) PubSub() *pubsub.PubSub {
return nil
}
func (*FakeP2P) PartialColumnBroadcaster() *partialdatacolumnbroadcaster.PartialColumnBroadcaster {
return nil
}
// MetadataSeq -- fake.
func (*FakeP2P) MetadataSeq() uint64 {
return 0
@@ -169,7 +174,7 @@ func (*FakeP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interfac
}
// BroadcastDataColumnSidecar -- fake.
func (*FakeP2P) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn) error {
func (*FakeP2P) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn, _ []blocks.PartialDataColumn) error {
return nil
}

View File

@@ -63,7 +63,7 @@ func (m *MockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
}
// BroadcastDataColumnSidecar broadcasts a data column for mock.
func (m *MockBroadcaster) BroadcastDataColumnSidecars(context.Context, []blocks.VerifiedRODataColumn) error {
func (m *MockBroadcaster) BroadcastDataColumnSidecars(context.Context, []blocks.VerifiedRODataColumn, []blocks.PartialDataColumn) error {
m.BroadcastCalled.Store(true)
return nil
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers/scorers"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
@@ -242,7 +243,7 @@ func (p *TestP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interf
}
// BroadcastDataColumnSidecar broadcasts a data column for mock.
func (p *TestP2P) BroadcastDataColumnSidecars(context.Context, []blocks.VerifiedRODataColumn) error {
func (p *TestP2P) BroadcastDataColumnSidecars(context.Context, []blocks.VerifiedRODataColumn, []blocks.PartialDataColumn) error {
p.BroadcastCalled.Store(true)
return nil
}
@@ -308,6 +309,10 @@ func (p *TestP2P) PubSub() *pubsub.PubSub {
return p.pubsub
}
func (p *TestP2P) PartialColumnBroadcaster() *partialdatacolumnbroadcaster.PartialColumnBroadcaster {
return nil
}
// Disconnect from a peer.
func (p *TestP2P) Disconnect(pid peer.ID) error {
return p.BHost.Network().ClosePeer(pid)

View File

@@ -46,8 +46,6 @@ const (
GossipLightClientOptimisticUpdateMessage = "light_client_optimistic_update"
// GossipDataColumnSidecarMessage is the name for the data column sidecar message type.
GossipDataColumnSidecarMessage = "data_column_sidecar"
// GossipExecutionProofMessage is the name for the execution proof message type.
GossipExecutionProofMessage = "execution_proof"
// Topic Formats
//
@@ -77,8 +75,6 @@ const (
LightClientOptimisticUpdateTopicFormat = GossipProtocolAndDigest + GossipLightClientOptimisticUpdateMessage
// DataColumnSubnetTopicFormat is the topic format for the data column subnet.
DataColumnSubnetTopicFormat = GossipProtocolAndDigest + GossipDataColumnSidecarMessage + "_%d"
// ExecutionProofSubnetTopicFormat is the topic format for the execution proof subnet.
ExecutionProofSubnetTopicFormat = GossipProtocolAndDigest + GossipExecutionProofMessage // + "_%d" (PoC only have one global topic)
)
// topic is a struct representing a single gossipsub topic.
@@ -162,7 +158,6 @@ func (s *Service) allTopics() []topic {
newTopic(altair, future, empty, GossipLightClientOptimisticUpdateMessage),
newTopic(altair, future, empty, GossipLightClientFinalityUpdateMessage),
newTopic(capella, future, empty, GossipBlsToExecutionChangeMessage),
newTopic(fulu, future, empty, GossipExecutionProofMessage),
}
last := params.GetNetworkScheduleEntry(genesis)
schedule := []params.NetworkScheduleEntry{last}

View File

@@ -26,8 +26,8 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits/mock"
p2pMock "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core"
mockSync "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/initial-sync/testing"
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
mockSync "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/initial-sync/testing"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/crypto/bls"

View File

@@ -178,11 +178,6 @@ func TestGetSpec(t *testing.T) {
config.BuilderPaymentThresholdNumerator = 104
config.BuilderPaymentThresholdDenominator = 105
// EIP-8025
config.MaxProofDataBytes = 200
config.MinEpochsForExecutionProofRequests = 201
config.MinProofsRequired = 202
var dbp [4]byte
copy(dbp[:], []byte{'0', '0', '0', '1'})
config.DomainBeaconProposer = dbp
@@ -615,12 +610,6 @@ func TestGetSpec(t *testing.T) {
assert.Equal(t, "102", v)
case "SYNC_MESSAGE_DUE_BPS":
assert.Equal(t, "103", v)
case "MAX_PROOF_DATA_BYTES":
assert.Equal(t, "200", v)
case "MIN_EPOCHS_FOR_EXECUTION_PROOF_REQUESTS":
assert.Equal(t, "201", v)
case "MIN_PROOFS_REQUIRED":
assert.Equal(t, "202", v)
case "BUILDER_PAYMENT_THRESHOLD_NUMERATOR":
assert.Equal(t, "104", v)
case "BUILDER_PAYMENT_THRESHOLD_DENOMINATOR":

View File

@@ -42,7 +42,6 @@ go_library(
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/builder:go_default_library",
"//beacon-chain/cache:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//beacon-chain/cache/depositsnapshot:go_default_library",
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/electra:go_default_library",

View File

@@ -7,6 +7,7 @@ import (
"sync"
"time"
"github.com/OffchainLabs/go-bitfield"
builderapi "github.com/OffchainLabs/prysm/v7/api/client/builder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v7/beacon-chain/builder"
@@ -19,7 +20,6 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition"
"github.com/OffchainLabs/prysm/v7/beacon-chain/db/kv"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
@@ -309,6 +309,7 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
}
rob, err := blocks.NewROBlockWithRoot(block, root)
var partialColumns []blocks.PartialDataColumn
if block.IsBlinded() {
block, blobSidecars, err = vs.handleBlindedBlock(ctx, block)
if errors.Is(err, builderapi.ErrBadGateway) {
@@ -316,99 +317,47 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
}
} else if block.Version() >= version.Deneb {
blobSidecars, dataColumnSidecars, err = vs.handleUnblindedBlock(rob, req)
blobSidecars, dataColumnSidecars, partialColumns, err = vs.handleUnblindedBlock(rob, req)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "%s: %v", "handle block failed", err)
}
var wg errgroup.Group
blockBroadcastDone := make(chan bool)
var wg sync.WaitGroup
errChan := make(chan error, 1)
wg.Go(func() error {
if err := vs.broadcastReceiveBlock(ctx, blockBroadcastDone, block, root); err != nil {
return fmt.Errorf("broadcast receive block: %w", err)
wg.Add(1)
go func() {
if err := vs.broadcastReceiveBlock(ctx, &wg, block, root); err != nil {
errChan <- errors.Wrap(err, "broadcast/receive block failed")
return
}
errChan <- nil
}()
return nil
})
wg.Wait()
wg.Go(func() error {
if err := vs.broadcastAndReceiveSidecars(ctx, blockBroadcastDone, block, root, blobSidecars, dataColumnSidecars); err != nil {
return fmt.Errorf("broadcast and receive sidecars: %w", err)
}
return nil
})
if err := wg.Wait(); err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive block/sidecars: %v", err)
if err := vs.broadcastAndReceiveSidecars(ctx, block, root, blobSidecars, dataColumnSidecars, partialColumns); err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive sidecars: %v", err)
}
if err := <-errChan; err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive block: %v", err)
}
// Generate and broadcast execution proofs.
go vs.generateAndBroadcastExecutionProofs(ctx, rob)
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
}
// TODO: This is a duplicate from the same function in the sync package.
func (vs *Server) generateAndBroadcastExecutionProofs(ctx context.Context, roBlock blocks.ROBlock) {
const delay = 2 * time.Second
proofTypes := flags.Get().ProofGenerationTypes
if len(proofTypes) == 0 {
return
}
var wg errgroup.Group
for _, proofType := range proofTypes {
wg.Go(func() error {
execProof, err := generateExecProof(roBlock, primitives.ExecutionProofId(proofType), delay)
if err != nil {
return fmt.Errorf("generate exec proof: %w", err)
}
if err := vs.P2P.Broadcast(ctx, execProof); err != nil {
return fmt.Errorf("broadcast exec proof: %w", err)
}
// Save the proof to storage.
if vs.ProofReceiver != nil {
if err := vs.ProofReceiver.ReceiveProof(execProof); err != nil {
return fmt.Errorf("receive proof: %w", err)
}
}
return nil
})
}
if err := wg.Wait(); err != nil {
log.WithError(err).Error("Failed to generate and broadcast execution proofs")
}
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", roBlock.Root()),
"slot": roBlock.Block().Slot(),
"count": len(proofTypes),
}).Debug("Generated and broadcasted execution proofs")
}
// broadcastAndReceiveSidecars broadcasts and receives sidecars.
func (vs *Server) broadcastAndReceiveSidecars(
ctx context.Context,
blockBroadcastDone <-chan bool,
block interfaces.SignedBeaconBlock,
root [fieldparams.RootLength]byte,
blobSidecars []*ethpb.BlobSidecar,
dataColumnSidecars []blocks.RODataColumn,
partialColumns []blocks.PartialDataColumn,
) error {
// Wait for block broadcast to complete before broadcasting sidecars.
<-blockBroadcastDone
if block.Version() >= version.Fulu {
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSidecars); err != nil {
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSidecars, partialColumns); err != nil {
return errors.Wrap(err, "broadcast and receive data columns")
}
return nil
@@ -457,45 +406,49 @@ func (vs *Server) handleBlindedBlock(ctx context.Context, block interfaces.Signe
func (vs *Server) handleUnblindedBlock(
block blocks.ROBlock,
req *ethpb.GenericSignedBeaconBlock,
) ([]*ethpb.BlobSidecar, []blocks.RODataColumn, error) {
) ([]*ethpb.BlobSidecar, []blocks.RODataColumn, []blocks.PartialDataColumn, error) {
rawBlobs, proofs, err := blobsAndProofs(req)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if block.Version() >= version.Fulu {
// Compute cells and proofs from the blobs and cell proofs.
cellsPerBlob, proofsPerBlob, err := peerdas.ComputeCellsAndProofsFromFlat(rawBlobs, proofs)
if err != nil {
return nil, nil, errors.Wrap(err, "compute cells and proofs")
return nil, nil, nil, errors.Wrap(err, "compute cells and proofs")
}
// Construct data column sidecars from the signed block and cells and proofs.
roDataColumnSidecars, err := peerdas.DataColumnSidecars(cellsPerBlob, proofsPerBlob, peerdas.PopulateFromBlock(block))
if err != nil {
return nil, nil, errors.Wrap(err, "data column sidcars")
return nil, nil, nil, errors.Wrap(err, "data column sidcars")
}
return nil, roDataColumnSidecars, nil
included := bitfield.NewBitlist(uint64(len(cellsPerBlob)))
included = included.Not() // all bits set to 1
partialColumns, err := peerdas.PartialColumns(included, cellsPerBlob, proofsPerBlob, peerdas.PopulateFromBlock(block))
if err != nil {
return nil, nil, nil, errors.Wrap(err, "data column sidcars")
}
return nil, roDataColumnSidecars, partialColumns, nil
}
blobSidecars, err := BuildBlobSidecars(block, rawBlobs, proofs)
if err != nil {
return nil, nil, errors.Wrap(err, "build blob sidecars")
return nil, nil, nil, errors.Wrap(err, "build blob sidecars")
}
return blobSidecars, nil, nil
return blobSidecars, nil, nil, nil
}
// broadcastReceiveBlock broadcasts a block and handles its reception.
// It closes the blockBroadcastDone channel once broadcasting is complete (but before receiving the block).
func (vs *Server) broadcastReceiveBlock(ctx context.Context, blockBroadcastDone chan<- bool, block interfaces.SignedBeaconBlock, root [fieldparams.RootLength]byte) error {
if err := vs.broadcastBlock(ctx, block, root); err != nil {
func (vs *Server) broadcastReceiveBlock(ctx context.Context, wg *sync.WaitGroup, block interfaces.SignedBeaconBlock, root [fieldparams.RootLength]byte) error {
if err := vs.broadcastBlock(ctx, wg, block, root); err != nil {
return errors.Wrap(err, "broadcast block")
}
close(blockBroadcastDone)
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
Type: blockfeed.ReceivedBlock,
Data: &blockfeed.ReceivedBlockData{SignedBlock: block},
@@ -508,7 +461,9 @@ func (vs *Server) broadcastReceiveBlock(ctx context.Context, blockBroadcastDone
return nil
}
func (vs *Server) broadcastBlock(ctx context.Context, block interfaces.SignedBeaconBlock, root [fieldparams.RootLength]byte) error {
func (vs *Server) broadcastBlock(ctx context.Context, wg *sync.WaitGroup, block interfaces.SignedBeaconBlock, root [fieldparams.RootLength]byte) error {
defer wg.Done()
protoBlock, err := block.Proto()
if err != nil {
return errors.Wrap(err, "protobuf conversion failed")
@@ -552,7 +507,7 @@ func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethp
}
// broadcastAndReceiveDataColumns handles the broadcasting and reception of data columns sidecars.
func (vs *Server) broadcastAndReceiveDataColumns(ctx context.Context, roSidecars []blocks.RODataColumn) error {
func (vs *Server) broadcastAndReceiveDataColumns(ctx context.Context, roSidecars []blocks.RODataColumn, partialColumns []blocks.PartialDataColumn) error {
// We built this block ourselves, so we can upgrade the read only data column sidecar into a verified one.
verifiedSidecars := make([]blocks.VerifiedRODataColumn, 0, len(roSidecars))
for _, sidecar := range roSidecars {
@@ -561,7 +516,7 @@ func (vs *Server) broadcastAndReceiveDataColumns(ctx context.Context, roSidecars
}
// Broadcast sidecars (non blocking).
if err := vs.P2P.BroadcastDataColumnSidecars(ctx, verifiedSidecars); err != nil {
if err := vs.P2P.BroadcastDataColumnSidecars(ctx, verifiedSidecars, partialColumns); err != nil {
return errors.Wrap(err, "broadcast data column sidecars")
}
@@ -764,57 +719,3 @@ func blobsAndProofs(req *ethpb.GenericSignedBeaconBlock) ([][]byte, [][]byte, er
return nil, nil, errors.Errorf("unknown request type provided: %T", req)
}
}
// generateExecProof returns a dummy execution proof after the specified delay.
// TODO: This is a duplicate from the same function in the sync package.
func generateExecProof(roBlock blocks.ROBlock, proofID primitives.ExecutionProofId, delay time.Duration) (*ethpb.ExecutionProof, error) {
// Simulate proof generation work
time.Sleep(delay)
// Create a dummy proof with some deterministic data
block := roBlock.Block()
if block == nil {
return nil, errors.New("nil block")
}
body := block.Body()
if body == nil {
return nil, errors.New("nil block body")
}
executionData, err := body.Execution()
if err != nil {
return nil, fmt.Errorf("execution: %w", err)
}
if executionData == nil {
return nil, errors.New("nil execution data")
}
hash, err := executionData.HashTreeRoot()
if err != nil {
return nil, fmt.Errorf("hash tree root: %w", err)
}
proofData := []byte{
0xFF, // Magic byte for dummy proof
byte(proofID),
// Include some payload hash bytes
hash[0],
hash[1],
hash[2],
hash[3],
}
blockRoot := roBlock.Root()
proof := &ethpb.ExecutionProof{
ProofId: proofID,
Slot: block.Slot(),
BlockHash: hash[:],
BlockRoot: blockRoot[:],
ProofData: proofData,
}
return proof, nil
}

View File

@@ -70,7 +70,6 @@ type Server struct {
BlockReceiver blockchain.BlockReceiver
BlobReceiver blockchain.BlobReceiver
DataColumnReceiver blockchain.DataColumnReceiver
ProofReceiver blockchain.ProofReceiver
MockEth1Votes bool
Eth1BlockFetcher execution.POWBlockFetcher
PendingDepositsFetcher depositsnapshot.PendingDepositsFetcher

View File

@@ -90,7 +90,6 @@ type Config struct {
BlockReceiver blockchain.BlockReceiver
BlobReceiver blockchain.BlobReceiver
DataColumnReceiver blockchain.DataColumnReceiver
ProofReceiver blockchain.ProofReceiver
ExecutionChainService execution.Chain
ChainStartFetcher execution.ChainStartFetcher
ExecutionChainInfoFetcher execution.ChainInfoFetcher
@@ -241,7 +240,6 @@ func NewService(ctx context.Context, cfg *Config) *Service {
BlockReceiver: s.cfg.BlockReceiver,
BlobReceiver: s.cfg.BlobReceiver,
DataColumnReceiver: s.cfg.DataColumnReceiver,
ProofReceiver: s.cfg.ProofReceiver,
MockEth1Votes: s.cfg.MockEth1Votes,
Eth1BlockFetcher: s.cfg.ExecutionChainService,
PendingDepositsFetcher: s.cfg.PendingDepositFetcher,

View File

@@ -14,7 +14,6 @@ go_library(
"decode_pubsub.go",
"doc.go",
"error.go",
"exec_proofs.go",
"fork_watcher.go",
"fuzz_exports.go", # keep
"log.go",
@@ -32,7 +31,6 @@ go_library(
"rpc_chunked_response.go",
"rpc_data_column_sidecars_by_range.go",
"rpc_data_column_sidecars_by_root.go",
"rpc_execution_proofs_by_root_topic.go",
"rpc_goodbye.go",
"rpc_light_client.go",
"rpc_metadata.go",
@@ -48,7 +46,6 @@ go_library(
"subscriber_blob_sidecar.go",
"subscriber_bls_to_execution_change.go",
"subscriber_data_column_sidecar.go",
"subscriber_execution_proofs.go",
"subscriber_handlers.go",
"subscriber_sync_committee_message.go",
"subscriber_sync_contribution_proof.go",
@@ -60,8 +57,8 @@ go_library(
"validate_blob.go",
"validate_bls_to_execution_change.go",
"validate_data_column.go",
"validate_execution_proof.go",
"validate_light_client.go",
"validate_partial_header.go",
"validate_proposer_slashing.go",
"validate_sync_committee_message.go",
"validate_sync_contribution_proof.go",
@@ -102,6 +99,7 @@ go_library(
"//beacon-chain/operations/voluntaryexits:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/partialdatacolumnbroadcaster:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/types:go_default_library",
"//beacon-chain/slasher/types:go_default_library",

View File

@@ -2,8 +2,10 @@ package sync
import (
"context"
"iter"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/crypto/bls"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing"
@@ -19,9 +21,16 @@ type signatureVerifier struct {
resChan chan error
}
type errorWithSegment struct {
err error
// segment is only available if the batched verification failed
segment peerdas.CellProofBundleSegment
}
type kzgVerifier struct {
dataColumns []blocks.RODataColumn
resChan chan error
sizeHint int
cellProofs iter.Seq[blocks.CellProofBundle]
resChan chan errorWithSegment
}
// A routine that runs in the background to perform batch

View File

@@ -1,65 +0,0 @@
package sync
import (
"fmt"
"time"
"errors"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
)
// generateExecProof returns a dummy execution proof after the specified delay.
func generateExecProof(roBlock blocks.ROBlock, proofID primitives.ExecutionProofId, delay time.Duration) (*ethpb.ExecutionProof, error) {
// Simulate proof generation work
time.Sleep(delay)
// Create a dummy proof with some deterministic data
block := roBlock.Block()
if block == nil {
return nil, errors.New("nil block")
}
body := block.Body()
if body == nil {
return nil, errors.New("nil block body")
}
executionData, err := body.Execution()
if err != nil {
return nil, fmt.Errorf("execution: %w", err)
}
if executionData == nil {
return nil, errors.New("nil execution data")
}
hash, err := executionData.HashTreeRoot()
if err != nil {
return nil, fmt.Errorf("hash tree root: %w", err)
}
proofData := []byte{
0xFF, // Magic byte for dummy proof
byte(proofID),
// Include some payload hash bytes
hash[0],
hash[1],
hash[2],
hash[3],
}
blockRoot := roBlock.Root()
proof := &ethpb.ExecutionProof{
ProofId: proofID,
Slot: block.Slot(),
BlockHash: hash[:],
BlockRoot: blockRoot[:],
ProofData: proofData,
}
return proof, nil
}

View File

@@ -256,6 +256,16 @@ var (
Help: "Count the number of data column sidecars obtained via the execution layer.",
},
)
usefulFullColumnsReceivedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "beacon_useful_full_columns_received_total",
Help: "Number of useful full columns (any cell being useful) received",
}, []string{"column_index"})
partialMessageColumnCompletionsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "beacon_partial_message_column_completions_total",
Help: "How often the partial message first completed the column",
}, []string{"column_index"})
)
func (s *Service) updateMetrics() {

View File

@@ -167,25 +167,17 @@ func WithStateNotifier(n statefeed.Notifier) Option {
}
// WithBlobStorage gives the sync package direct access to BlobStorage.
func WithBlobStorage(storage *filesystem.BlobStorage) Option {
func WithBlobStorage(b *filesystem.BlobStorage) Option {
return func(s *Service) error {
s.cfg.blobStorage = storage
s.cfg.blobStorage = b
return nil
}
}
// WithDataColumnStorage gives the sync package direct access to DataColumnStorage.
func WithDataColumnStorage(storage *filesystem.DataColumnStorage) Option {
func WithDataColumnStorage(b *filesystem.DataColumnStorage) Option {
return func(s *Service) error {
s.cfg.dataColumnStorage = storage
return nil
}
}
// WithDataColumnStorage gives the sync package direct access to DataColumnStorage.
func WithExecutionProofStorage(storage *filesystem.ProofStorage) Option {
return func(s *Service) error {
s.cfg.proofStorage = storage
s.cfg.dataColumnStorage = b
return nil
}
}

View File

@@ -259,10 +259,6 @@ func (s *Service) processBlock(ctx context.Context, b interfaces.ReadOnlySignedB
return errors.Wrap(err, "request and save missing data column sidecars")
}
if err := s.requestAndSaveMissingExecutionProofs([]blocks.ROBlock{roBlock}); err != nil {
return errors.Wrap(err, "request and save missing execution proofs")
}
return nil
}

View File

@@ -100,10 +100,6 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {
topicMap[addEncoding(p2p.RPCDataColumnSidecarsByRootTopicV1)] = dataColumnSidecars
// DataColumnSidecarsByRangeV1
topicMap[addEncoding(p2p.RPCDataColumnSidecarsByRangeTopicV1)] = dataColumnSidecars
executionProofs := leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */);
// ExecutionProofsByRootV1
topicMap[addEncoding(p2p.RPCExecutionProofsByRootTopicV1)] = executionProofs
// General topic for all rpc requests.
topicMap[rpcLimiterTopic] = leakybucket.NewCollector(5, defaultBurstLimit*2, leakyBucketPeriod, false /* deleteEmptyBuckets */)

View File

@@ -17,7 +17,7 @@ import (
func TestNewRateLimiter(t *testing.T) {
rlimiter := newRateLimiter(mockp2p.NewTestP2P(t))
assert.Equal(t, len(rlimiter.limiterMap), 21, "correct number of topics not registered")
assert.Equal(t, len(rlimiter.limiterMap), 20, "correct number of topics not registered")
}
func TestNewRateLimiter_FreeCorrectly(t *testing.T) {

View File

@@ -51,7 +51,6 @@ func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandle
p2p.RPCBlobSidecarsByRangeTopicV1: s.blobSidecarsByRangeRPCHandler, // Modified in Fulu
p2p.RPCDataColumnSidecarsByRootTopicV1: s.dataColumnSidecarByRootRPCHandler, // Added in Fulu
p2p.RPCDataColumnSidecarsByRangeTopicV1: s.dataColumnSidecarsByRangeRPCHandler, // Added in Fulu
p2p.RPCExecutionProofsByRootTopicV1: s.executionProofsByRootRPCHandler, // Added in Fulu
}, nil
}

View File

@@ -11,14 +11,11 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v7/beacon-chain/sync/verify"
"github.com/OffchainLabs/prysm/v7/beacon-chain/verification"
"github.com/OffchainLabs/prysm/v7/config/features"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
eth "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/time/slots"
libp2pcore "github.com/libp2p/go-libp2p/core"
@@ -90,84 +87,9 @@ func (s *Service) sendBeaconBlocksRequest(ctx context.Context, requests *types.B
return errors.Wrap(err, "request and save missing data columns")
}
if err := s.requestAndSaveMissingExecutionProofs(postFuluBlocks); err != nil {
return errors.Wrap(err, "request and save missing execution proofs")
}
return err
}
func (s *Service) requestAndSaveMissingExecutionProofs(blks []blocks.ROBlock) error {
if len(blks) == 0 {
return nil
}
// TODO: Parallelize requests for multiple blocks.
for _, blk := range blks {
if err := s.sendAndSaveExecutionProofs(s.ctx, blk); err != nil {
return err
}
}
return nil
}
func (s *Service) sendAndSaveExecutionProofs(ctx context.Context, block blocks.ROBlock) error {
if !features.Get().EnableZkvm {
return nil
}
// Check proof retention period.
blockEpoch := slots.ToEpoch(block.Block().Slot())
currentEpoch := slots.ToEpoch(s.cfg.clock.CurrentSlot())
if !params.WithinExecutionProofPeriod(blockEpoch, currentEpoch) {
return nil
}
// Check how many proofs are needed with Execution Proof Pool.
// TODO: All should return the same type ExecutionProofId.
root := block.Root()
proofStorage := s.cfg.proofStorage
storedIds := proofStorage.Summary(root).All()
count := uint64(len(storedIds))
if count >= params.BeaconConfig().MinProofsRequired {
return nil
}
alreadyHave := make([]primitives.ExecutionProofId, 0, len(storedIds))
for _, id := range storedIds {
alreadyHave = append(alreadyHave, primitives.ExecutionProofId(id))
}
// Construct request
req := &ethpb.ExecutionProofsByRootRequest{
BlockRoot: root[:],
CountNeeded: params.BeaconConfig().MinProofsRequired - count,
AlreadyHave: alreadyHave,
}
// Call SendExecutionProofByRootRequest
zkvmEnabledPeers := s.cfg.p2p.Peers().ZkvmEnabledPeers()
if len(zkvmEnabledPeers) == 0 {
return fmt.Errorf("no zkVM enabled peers available to request execution proofs")
}
// TODO: For simplicity, just pick the first peer for now.
// In the future, we can implement better peer selection logic.
pid := zkvmEnabledPeers[0]
proofs, err := SendExecutionProofsByRootRequest(ctx, s.cfg.clock, s.cfg.p2p, pid, req)
if err != nil {
return fmt.Errorf("send execution proofs by root request: %w", err)
}
// Save the proofs into storage.
if err := proofStorage.Save(proofs); err != nil {
return fmt.Errorf("proof storage save: %w", err)
}
return nil
}
// requestAndSaveMissingDataColumns checks if the data columns are missing for the given block.
// If so, requests them and saves them to the storage.
func (s *Service) requestAndSaveMissingDataColumnSidecars(blks []blocks.ROBlock) error {

View File

@@ -182,21 +182,3 @@ func WriteDataColumnSidecarChunk(stream libp2pcore.Stream, tor blockchain.Tempor
return nil
}
func WriteExecutionProofChunk(stream libp2pcore.Stream, encoding encoder.NetworkEncoding, proof *ethpb.ExecutionProof) error {
// Success response code.
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
return errors.Wrap(err, "stream write")
}
ctxBytes := params.ForkDigest(slots.ToEpoch(proof.Slot))
if err := writeContextToStream(ctxBytes[:], stream); err != nil {
return errors.Wrap(err, "write context to stream")
}
// Execution proof.
if _, err := encoding.EncodeWithMaxLength(stream, proof); err != nil {
return errors.Wrap(err, "encode with max length")
}
return nil
}

View File

@@ -1,228 +0,0 @@
package sync
import (
"context"
"errors"
"fmt"
"io"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/time/slots"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/sirupsen/logrus"
)
// SendExecutionProofsByRootRequest sends ExecutionProofsByRoot request and returns fetched execution proofs, if any.
func SendExecutionProofsByRootRequest(
ctx context.Context,
clock blockchain.TemporalOracle,
p2pProvider p2p.P2P,
pid peer.ID,
req *ethpb.ExecutionProofsByRootRequest,
) ([]*ethpb.ExecutionProof, error) {
// Validate request
if req.CountNeeded == 0 {
return nil, errors.New("count_needed must be greater than 0")
}
topic, err := p2p.TopicFromMessage(p2p.ExecutionProofsByRootName, slots.ToEpoch(clock.CurrentSlot()))
if err != nil {
return nil, err
}
log.WithFields(logrus.Fields{
"topic": topic,
"block_root": bytesutil.ToBytes32(req.BlockRoot),
"count": req.CountNeeded,
"already": len(req.AlreadyHave),
}).Debug("Sending execution proofs by root request")
stream, err := p2pProvider.Send(ctx, req, topic, pid)
if err != nil {
return nil, err
}
defer closeStream(stream, log)
// Read execution proofs from stream
proofs := make([]*ethpb.ExecutionProof, 0, req.CountNeeded)
alreadyHaveSet := make(map[primitives.ExecutionProofId]struct{})
for _, id := range req.AlreadyHave {
alreadyHaveSet[id] = struct{}{}
}
for i := uint64(0); i < req.CountNeeded; i++ {
isFirstChunk := i == 0
proof, err := ReadChunkedExecutionProof(stream, p2pProvider, isFirstChunk)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
}
// Validate proof
if err := validateExecutionProof(proof, req, alreadyHaveSet); err != nil {
return nil, err
}
proofs = append(proofs, proof)
}
return proofs, nil
}
// ReadChunkedExecutionProof reads a chunked execution proof from the stream.
func ReadChunkedExecutionProof(
stream libp2pcore.Stream,
encoding p2p.EncodingProvider,
isFirstChunk bool,
) (*ethpb.ExecutionProof, error) {
// Read status code for each chunk (like data columns, not like blocks)
code, errMsg, err := ReadStatusCode(stream, encoding.Encoding())
if err != nil {
return nil, err
}
if code != 0 {
return nil, errors.New(errMsg)
}
// Read context bytes (fork digest)
_, err = readContextFromStream(stream)
if err != nil {
return nil, fmt.Errorf("read context from stream: %w", err)
}
// Decode the proof
proof := &ethpb.ExecutionProof{}
if err := encoding.Encoding().DecodeWithMaxLength(stream, proof); err != nil {
return nil, err
}
return proof, nil
}
// validateExecutionProof validates a received execution proof against the request.
func validateExecutionProof(
proof *ethpb.ExecutionProof,
req *ethpb.ExecutionProofsByRootRequest,
alreadyHaveSet map[primitives.ExecutionProofId]struct{},
) error {
// Check block root matches
proofRoot := bytesutil.ToBytes32(proof.BlockRoot)
reqRoot := bytesutil.ToBytes32(req.BlockRoot)
if proofRoot != reqRoot {
return fmt.Errorf("proof block root %#x does not match requested root %#x",
proofRoot, reqRoot)
}
// Check we didn't already have this proof
if _, ok := alreadyHaveSet[proof.ProofId]; ok {
return fmt.Errorf("received proof we already have: proof_id=%d", proof.ProofId)
}
// Check proof ID is valid (within max range)
if !proof.ProofId.IsValid() {
return fmt.Errorf("invalid proof_id: %d", proof.ProofId)
}
return nil
}
// executionProofsByRootRPCHandler handles incoming ExecutionProofsByRoot RPC requests.
func (s *Service) executionProofsByRootRPCHandler(ctx context.Context, msg any, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.executionProofsByRootRPCHandler")
defer span.End()
_, cancel := context.WithTimeout(ctx, ttfbTimeout)
defer cancel()
req, ok := msg.(*ethpb.ExecutionProofsByRootRequest)
if !ok {
return errors.New("message is not type ExecutionProofsByRootRequest")
}
remotePeer := stream.Conn().RemotePeer()
SetRPCStreamDeadlines(stream)
// Validate request
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
return err
}
// Penalize peers that send invalid requests.
if err := validateExecutionProofsByRootRequest(req); err != nil {
s.downscorePeer(remotePeer, "executionProofsByRootRPCHandlerValidationError")
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
return fmt.Errorf("validate execution proofs by root request: %w", err)
}
blockRoot := bytesutil.ToBytes32(req.BlockRoot)
log := log.WithFields(logrus.Fields{
"blockroot": fmt.Sprintf("%#x", blockRoot),
"neededCount": req.CountNeeded,
"alreadyHave": req.AlreadyHave,
"peer": remotePeer.String(),
})
s.rateLimiter.add(stream, 1)
defer closeStream(stream, log)
// Get proofs from execution proof pool
summary := s.cfg.proofStorage.Summary(blockRoot)
// Filter out not requested proofs
alreadyHave := make(map[primitives.ExecutionProofId]bool)
for _, id := range req.AlreadyHave {
alreadyHave[id] = true
}
// Determine which proofs to fetch (not already had by requester)
proofIDsToFetch := make([]uint64, 0, len(summary.All()))
for _, proofId := range summary.All() {
if !alreadyHave[primitives.ExecutionProofId(proofId)] {
proofIDsToFetch = append(proofIDsToFetch, proofId)
}
}
// Load all proofs at once
proofs, err := s.cfg.proofStorage.Get(blockRoot, proofIDsToFetch)
if err != nil {
return fmt.Errorf("proof storage get: %w", err)
}
// Send proofs
sentCount := uint64(0)
for _, proof := range proofs {
if sentCount >= req.CountNeeded {
break
}
// Write proof to stream
SetStreamWriteDeadline(stream, defaultWriteDuration)
if err := WriteExecutionProofChunk(stream, s.cfg.p2p.Encoding(), proof); err != nil {
log.WithError(err).Debug("Could not send execution proof")
s.writeErrorResponseToStream(responseCodeServerError, "could not send execution proof", stream)
return err
}
sentCount++
}
log.WithField("sentCount", sentCount).Debug("Responded to execution proofs by root request")
return nil
}
func validateExecutionProofsByRootRequest(req *ethpb.ExecutionProofsByRootRequest) error {
if req.CountNeeded == 0 {
return errors.New("count_needed must be greater than 0")
}
return nil
}

View File

@@ -70,7 +70,6 @@ const (
seenProposerSlashingSize = 100
badBlockSize = 1000
syncMetricsInterval = 10 * time.Second
seenExecutionProofSize = 100
)
var (
@@ -110,7 +109,6 @@ type config struct {
stateNotifier statefeed.Notifier
blobStorage *filesystem.BlobStorage
dataColumnStorage *filesystem.DataColumnStorage
proofStorage *filesystem.ProofStorage
batchVerifierLimit int
}
@@ -119,7 +117,6 @@ type blockchainService interface {
blockchain.BlockReceiver
blockchain.BlobReceiver
blockchain.DataColumnReceiver
blockchain.ProofReceiver
blockchain.HeadFetcher
blockchain.FinalizationFetcher
blockchain.ForkFetcher
@@ -152,7 +149,6 @@ type Service struct {
seenBlobLock sync.RWMutex
seenBlobCache *lru.Cache
seenDataColumnCache *slotAwareCache
seenProofCache *slotAwareCache
seenAggregatedAttestationLock sync.RWMutex
seenAggregatedAttestationCache *lru.Cache
seenUnAggregatedAttestationLock sync.RWMutex
@@ -177,7 +173,6 @@ type Service struct {
verifierWaiter *verification.InitializerWaiter
newBlobVerifier verification.NewBlobVerifier
newColumnsVerifier verification.NewDataColumnsVerifier
newProofsVerifier verification.NewExecutionProofsVerifier
columnSidecarsExecSingleFlight singleflight.Group
reconstructionSingleFlight singleflight.Group
availableBlocker coverage.AvailableBlocker
@@ -239,6 +234,7 @@ func NewService(ctx context.Context, opts ...Option) *Service {
r.subHandler = newSubTopicHandler()
r.rateLimiter = newRateLimiter(r.cfg.p2p)
r.initCaches()
return r
}
@@ -254,12 +250,6 @@ func newDataColumnsVerifierFromInitializer(ini *verification.Initializer) verifi
}
}
func newExecutionProofsVerifierFromInitializer(ini *verification.Initializer) verification.NewExecutionProofsVerifier {
return func(proofs []blocks.ROExecutionProof, reqs []verification.Requirement) verification.ExecutionProofsVerifier {
return ini.NewExecutionProofsVerifier(proofs, reqs)
}
}
// Start the regular sync service.
func (s *Service) Start() {
v, err := s.verifierWaiter.WaitForInitializer(s.ctx)
@@ -269,7 +259,6 @@ func (s *Service) Start() {
}
s.newBlobVerifier = newBlobVerifierFromInitializer(v)
s.newColumnsVerifier = newDataColumnsVerifierFromInitializer(v)
s.newProofsVerifier = newExecutionProofsVerifierFromInitializer(v)
go s.verifierRoutine()
go s.startDiscoveryAndSubscriptions()
@@ -359,7 +348,6 @@ func (s *Service) initCaches() {
s.seenBlockCache = lruwrpr.New(seenBlockSize)
s.seenBlobCache = lruwrpr.New(seenBlockSize * params.BeaconConfig().DeprecatedMaxBlobsPerBlockElectra)
s.seenDataColumnCache = newSlotAwareCache(seenDataColumnSize)
s.seenProofCache = newSlotAwareCache(seenExecutionProofSize)
s.seenAggregatedAttestationCache = lruwrpr.New(seenAggregatedAttSize)
s.seenUnAggregatedAttestationCache = lruwrpr.New(seenUnaggregatedAttSize)
s.seenSyncMessageCache = lruwrpr.New(seenSyncMsgSize)

View File

@@ -5,6 +5,8 @@ import (
"fmt"
"reflect"
"runtime/debug"
"slices"
"strconv"
"strings"
"sync"
"time"
@@ -14,11 +16,13 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v7/config/features"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
@@ -61,6 +65,15 @@ type subscribeParameters struct {
// getSubnetsRequiringPeers is a function that returns all subnets that require peers to be found
// but for which no subscriptions are needed.
getSubnetsRequiringPeers func(currentSlot primitives.Slot) map[uint64]bool
partial *partialSubscribeParameters
}
type partialSubscribeParameters struct {
broadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster
validateHeader partialdatacolumnbroadcaster.HeaderValidator
validate partialdatacolumnbroadcaster.ColumnValidator
handle partialdatacolumnbroadcaster.SubHandler
}
// shortTopic is a less verbose version of topic strings used for logging.
@@ -320,6 +333,35 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
// New gossip topic in Fulu.
if params.BeaconConfig().FuluForkEpoch <= nse.Epoch {
s.spawn(func() {
var ps *partialSubscribeParameters
broadcaster := s.cfg.p2p.PartialColumnBroadcaster()
if broadcaster != nil {
ps = &partialSubscribeParameters{
broadcaster: broadcaster,
validateHeader: func(header *ethpb.PartialDataColumnHeader) (bool, error) {
return s.validatePartialDataColumnHeader(context.TODO(), header)
},
validate: func(cellsToVerify []blocks.CellProofBundle) error {
return peerdas.VerifyDataColumnsCellsKZGProofs(len(cellsToVerify), slices.Values(cellsToVerify))
},
handle: func(topic string, col blocks.VerifiedRODataColumn) {
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
defer cancel()
slot := col.SignedBlockHeader.Header.Slot
proposerIndex := col.SignedBlockHeader.Header.ProposerIndex
if !s.hasSeenDataColumnIndex(slot, proposerIndex, col.Index) {
s.setSeenDataColumnIndex(slot, proposerIndex, col.Index)
// This column was completed from a partial message.
partialMessageColumnCompletionsTotal.WithLabelValues(strconv.FormatUint(col.Index, 10)).Inc()
}
err := s.verifiedRODataColumnSubscriber(ctx, col)
if err != nil {
log.WithError(err).Error("Failed to handle verified RO data column subscriber")
}
},
}
}
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.DataColumnSubnetTopicFormat,
validate: s.validateDataColumn,
@@ -327,19 +369,9 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
nse: nse,
getSubnetsToJoin: s.dataColumnSubnetIndices,
getSubnetsRequiringPeers: s.allDataColumnSubnets,
partial: ps,
})
})
if features.Get().EnableZkvm {
s.spawn(func() {
s.subscribe(
p2p.ExecutionProofSubnetTopicFormat,
s.validateExecutionProof,
s.executionProofSubscriber,
nse,
)
})
}
}
return true
}
@@ -376,11 +408,10 @@ func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandle
// Impossible condition as it would mean topic does not exist.
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) // lint:nopanic -- Impossible condition.
}
s.subscribeWithBase(s.addDigestToTopic(topic, nse.ForkDigest), validator, handle)
s.subscribeWithBase(s.addDigestToTopic(topic, nse.ForkDigest)+s.cfg.p2p.Encoding().ProtocolSuffix(), validator, handle)
}
func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription {
topic += s.cfg.p2p.Encoding().ProtocolSuffix()
log := log.WithField("topic", topic)
// Do not resubscribe already seen subscriptions.
@@ -543,7 +574,11 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
func (s *Service) pruneNotWanted(t *subnetTracker, wantedSubnets map[uint64]bool) {
for _, subnet := range t.unwanted(wantedSubnets) {
t.cancelSubscription(subnet)
s.unSubscribeFromTopic(t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix()))
topic := t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix())
if t.partial != nil {
_ = t.partial.broadcaster.Unsubscribe(topic)
}
s.unSubscribeFromTopic(topic)
}
}
@@ -590,9 +625,34 @@ func (s *Service) trySubscribeSubnets(t *subnetTracker) {
subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot())
s.pruneNotWanted(t, subnetsToJoin)
for _, subnet := range t.missing(subnetsToJoin) {
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
topic := t.fullTopic(subnet, "")
t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle))
topicStr := t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix())
topicOpts := make([]pubsub.TopicOpt, 0, 2)
requestPartial := t.partial != nil
if requestPartial {
// TODO: do we want the ability to support partial messages without requesting them?
topicOpts = append(topicOpts, pubsub.RequestPartialMessages())
}
topic, err := s.cfg.p2p.JoinTopic(topicStr, topicOpts...)
if err != nil {
log.WithError(err).Error("Failed to join topic")
return
}
if requestPartial {
log.Info("Subscribing to partial columns on", topicStr)
err = t.partial.broadcaster.Subscribe(topic, t.partial.validateHeader, t.partial.validate, t.partial.handle)
if err != nil {
log.WithError(err).Error("Failed to subscribe to partial column")
}
}
// We still need to subscribe to the full columns as well as partial in
// case our peers don't support partial messages.
t.track(subnet, s.subscribeWithBase(topicStr, t.validate, t.handle))
}
}

View File

@@ -11,7 +11,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition/interop"
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v7/config/features"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
@@ -23,7 +23,6 @@ import (
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)
@@ -79,11 +78,6 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
return err
}
// We use the service context to ensure this context is not cancelled
// when the current function returns.
// TODO: Do not broadcast proofs for blocks we have already seen.
go s.generateAndBroadcastExecutionProofs(s.ctx, roBlock)
if err := s.processPendingAttsForBlock(ctx, root); err != nil {
return errors.Wrap(err, "process pending atts for block")
}
@@ -91,47 +85,6 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
return nil
}
func (s *Service) generateAndBroadcastExecutionProofs(ctx context.Context, roBlock blocks.ROBlock) {
const delay = 2 * time.Second
proofTypes := flags.Get().ProofGenerationTypes
// Exit early if proof generation is disabled.
if len(proofTypes) == 0 {
return
}
var wg errgroup.Group
for _, proofType := range proofTypes {
wg.Go(func() error {
execProof, err := generateExecProof(roBlock, primitives.ExecutionProofId(proofType), delay)
if err != nil {
return fmt.Errorf("generate exec proof: %w", err)
}
if err := s.cfg.p2p.Broadcast(ctx, execProof); err != nil {
return fmt.Errorf("broadcast exec proof: %w", err)
}
if err := s.cfg.chain.ReceiveProof(execProof); err != nil {
return errors.Wrap(err, "receive proof")
}
return nil
})
}
if err := wg.Wait(); err != nil {
log.WithError(err).Error("Failed to generate and broadcast execution proofs")
}
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", roBlock.Root()),
"slot": roBlock.Block().Slot(),
"count": len(proofTypes),
}).Debug("Generated and broadcasted execution proofs")
}
// processSidecarsFromExecutionFromBlock retrieves (if available) sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) error {
@@ -249,6 +202,16 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
return nil, errors.Wrap(err, "column indices to sample")
}
// TODO: the deadline here was removed in https://github.com/OffchainLabs/prysm/pull/16155/files
// make sure that reintroducing it does not cause issues.
secondsPerHalfSlot := time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second
ctx, cancel := context.WithTimeout(ctx, secondsPerHalfSlot)
defer cancel()
digest, err := s.currentForkDigest()
if err != nil {
return nil, err
}
log := log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", source.Root()),
"slot": source.Slot(),
@@ -279,11 +242,30 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
}
// Try to reconstruct data column constructedSidecars from the execution client.
constructedSidecars, err := s.cfg.executionReconstructor.ConstructDataColumnSidecars(ctx, source)
constructedSidecars, partialColumns, err := s.cfg.executionReconstructor.ConstructDataColumnSidecars(ctx, source)
if err != nil {
return nil, errors.Wrap(err, "reconstruct data column sidecars")
}
partialBroadcaster := s.cfg.p2p.PartialColumnBroadcaster()
if partialBroadcaster != nil {
log.WithField("len(partialColumns)", len(partialColumns)).Debug("Publishing partial columns")
for i := range uint64(len(partialColumns)) {
if !columnIndicesToSample[i] {
continue
}
subnet := peerdas.ComputeSubnetForDataColumnSidecar(i)
topic := fmt.Sprintf(p2p.DataColumnSubnetTopicFormat, digest, subnet) + s.cfg.p2p.Encoding().ProtocolSuffix()
// Publish the partial column. This is idempotent if we republish the same data twice.
// Note, the "partial column" may indeed be complete. We still
// should publish to help our peers.
err = partialBroadcaster.Publish(topic, partialColumns[i])
if err != nil {
log.WithError(err).Warn("Failed to publish partial column")
}
}
}
// No sidecars are retrieved from the EL, retry later
constructedCount := uint64(len(constructedSidecars))
@@ -355,7 +337,7 @@ func (s *Service) broadcastAndReceiveUnseenDataColumnSidecars(
}
// Broadcast all the data column sidecars we reconstructed but did not see via gossip (non blocking).
if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, unseenSidecars); err != nil {
if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, unseenSidecars, nil); err != nil {
return nil, errors.Wrap(err, "broadcast data column sidecars")
}

View File

@@ -3,6 +3,7 @@ package sync
import (
"context"
"fmt"
"strconv"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed"
opfeed "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/operation"
@@ -24,6 +25,13 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
return fmt.Errorf("message was not type blocks.VerifiedRODataColumn, type=%T", msg)
}
// Track useful full columns received via gossip (not previously seen)
slot := sidecar.SignedBlockHeader.Header.Slot
proposerIndex := sidecar.SignedBlockHeader.Header.ProposerIndex
if !s.hasSeenDataColumnIndex(slot, proposerIndex, sidecar.Index) {
usefulFullColumnsReceivedTotal.WithLabelValues(strconv.FormatUint(sidecar.Index, 10)).Inc()
}
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
return wrapDataColumnError(sidecar, "receive data column sidecar", err)
}
@@ -57,6 +65,38 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
return nil
}
func (s *Service) verifiedRODataColumnSubscriber(ctx context.Context, sidecar blocks.VerifiedRODataColumn) error {
log.WithField("slot", sidecar.Slot()).WithField("column", sidecar.Index).Info("Received data column sidecar")
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
return errors.Wrap(err, "receive data column sidecar")
}
var wg errgroup.Group
wg.Go(func() error {
if err := s.processDataColumnSidecarsFromReconstruction(ctx, sidecar); err != nil {
return errors.Wrap(err, "process data column sidecars from reconstruction")
}
return nil
})
wg.Go(func() error {
// Broadcast our complete column for peers that don't use partial messages
if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{sidecar}, nil); err != nil {
return errors.Wrap(err, "process data column sidecars from execution")
}
return nil
})
if err := wg.Wait(); err != nil {
return err
}
return nil
}
// receiveDataColumnSidecar receives a single data column sidecar: marks it as seen and saves it to the chain.
// Do not loop over this function to receive multiple sidecars, use receiveDataColumnSidecars instead.
func (s *Service) receiveDataColumnSidecar(ctx context.Context, sidecar blocks.VerifiedRODataColumn) error {

View File

@@ -1,36 +0,0 @@
package sync
import (
"context"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed"
opfeed "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/operation"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
)
func (s *Service) executionProofSubscriber(_ context.Context, msg proto.Message) error {
verifiedProof, ok := msg.(blocks.VerifiedROExecutionProof)
if !ok {
return errors.Errorf("incorrect type of message received, wanted %T but got %T", blocks.VerifiedROExecutionProof{}, msg)
}
// Insert the execution proof into the pool
s.setSeenProof(verifiedProof.Slot(), verifiedProof.BlockRoot(), verifiedProof.ProofId())
// Save the proof to storage.
if err := s.cfg.chain.ReceiveProof(verifiedProof.ExecutionProof); err != nil {
return errors.Wrap(err, "receive proof")
}
// Notify subscribers about the new execution proof
s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{
Type: opfeed.ExecutionProofReceived,
Data: &opfeed.ExecutionProofReceivedData{
ExecutionProof: verifiedProof.ExecutionProof,
},
})
return nil
}

View File

@@ -71,6 +71,7 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
roDataColumns := []blocks.RODataColumn{roDataColumn}
// Create the verifier.
// Question(marco): Do we want the multiple columns verifier? Is batching used only for kzg proofs?
verifier := s.newColumnsVerifier(roDataColumns, verification.GossipDataColumnSidecarRequirements)
// Start the verification process.

View File

@@ -1,110 +0,0 @@
package sync
import (
"context"
"fmt"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v7/beacon-chain/verification"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/sirupsen/logrus"
)
func (s *Service) validateExecutionProof(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
// Always accept our own messages.
if pid == s.cfg.p2p.PeerID() {
return pubsub.ValidationAccept, nil
}
// Ignore messages during initial sync.
if s.cfg.initialSync.Syncing() {
return pubsub.ValidationIgnore, nil
}
// Reject messages with a nil topic.
if msg.Topic == nil {
return pubsub.ValidationReject, p2p.ErrInvalidTopic
}
// Decode the message, reject if it fails.
m, err := s.decodePubsubMessage(msg)
if err != nil {
log.WithError(err).Error("Failed to decode message")
return pubsub.ValidationReject, err
}
// Reject messages that are not of the expected type.
executionProof, ok := m.(*ethpb.ExecutionProof)
if !ok {
log.WithField("message", m).Error("Message is not of type *ethpb.ExecutionProof")
return pubsub.ValidationReject, errWrongMessage
}
// Convert to ROExecutionProof.
roProof, err := blocks.NewROExecutionProof(executionProof)
if err != nil {
return pubsub.ValidationReject, err
}
// Check if the proof has already been seen.
if s.hasSeenProof(roProof.BlockRoot(), roProof.ProofId()) {
return pubsub.ValidationIgnore, nil
}
// Create the verifier with gossip requirements.
verifier := s.newProofsVerifier([]blocks.ROExecutionProof{roProof}, verification.GossipExecutionProofRequirements)
// Run verifications.
if err := verifier.NotFromFutureSlot(); err != nil {
return pubsub.ValidationReject, err
}
if err := verifier.ProofSizeLimits(); err != nil {
return pubsub.ValidationReject, err
}
if err := verifier.ProofVerified(); err != nil {
return pubsub.ValidationReject, err
}
// Get verified proofs.
verifiedProofs, err := verifier.VerifiedROExecutionProofs()
if err != nil {
return pubsub.ValidationIgnore, err
}
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", roProof.BlockRoot()),
"slot": roProof.Slot(),
"id": roProof.ProofId(),
}).Debug("Accepted execution proof")
// Set validator data to the verified proof.
msg.ValidatorData = verifiedProofs[0]
return pubsub.ValidationAccept, nil
}
// hasSeenProof returns true if the proof with the same block root and proof ID has been seen before.
func (s *Service) hasSeenProof(blockRoot [32]byte, proofId primitives.ExecutionProofId) bool {
key := computeProofCacheKey(blockRoot, proofId)
_, seen := s.seenProofCache.Get(key)
return seen
}
// setSeenProof marks the proof with the given block root and proof ID as seen.
func (s *Service) setSeenProof(slot primitives.Slot, blockRoot [32]byte, proofId primitives.ExecutionProofId) {
key := computeProofCacheKey(blockRoot, proofId)
s.seenProofCache.Add(slot, key, true)
}
func computeProofCacheKey(blockRoot [32]byte, proofId primitives.ExecutionProofId) string {
key := make([]byte, 0, 33)
key = append(key, blockRoot[:]...)
key = append(key, bytesutil.Bytes1(uint64(proofId))...)
return string(key)
}

View File

@@ -0,0 +1,143 @@
package sync
import (
"context"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/signing"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/pkg/errors"
)
var (
// REJECT errors - peer should be penalized
errHeaderEmptyCommitments = errors.New("header has no kzg commitments")
errHeaderParentInvalid = errors.New("header parent invalid")
errHeaderSlotNotAfterParent = errors.New("header slot not after parent")
errHeaderNotFinalizedDescendant = errors.New("header not finalized descendant")
errHeaderInvalidInclusionProof = errors.New("invalid inclusion proof")
errHeaderInvalidSignature = errors.New("invalid proposer signature")
errHeaderUnexpectedProposer = errors.New("unexpected proposer index")
// IGNORE errors - don't penalize peer
errHeaderNil = errors.New("nil header")
errHeaderFromFuture = errors.New("header is from future slot")
errHeaderNotAboveFinalized = errors.New("header slot not above finalized")
errHeaderParentNotSeen = errors.New("header parent not seen")
)
// validatePartialDataColumnHeader validates a PartialDataColumnHeader per the consensus spec.
// Returns (reject, err) where reject=true means the peer should be penalized.
// TODO: we should consolidate this with the existing DataColumn validation pipeline.
func (s *Service) validatePartialDataColumnHeader(ctx context.Context, header *ethpb.PartialDataColumnHeader) (reject bool, err error) {
if header == nil || header.SignedBlockHeader == nil || header.SignedBlockHeader.Header == nil {
return false, errHeaderNil // IGNORE
}
blockHeader := header.SignedBlockHeader.Header
headerSlot := blockHeader.Slot
parentRoot := bytesutil.ToBytes32(blockHeader.ParentRoot)
// [REJECT] kzg_commitments list is non-empty
if len(header.KzgCommitments) == 0 {
return true, errHeaderEmptyCommitments
}
// [IGNORE] Not from future slot (with MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance)
currentSlot := s.cfg.clock.CurrentSlot()
if headerSlot > currentSlot {
maxDisparity := params.BeaconConfig().MaximumGossipClockDisparityDuration()
slotStart, err := s.cfg.clock.SlotStart(headerSlot)
if err != nil {
return false, err
}
if s.cfg.clock.Now().Before(slotStart.Add(-maxDisparity)) {
return false, errHeaderFromFuture // IGNORE
}
}
// [IGNORE] Slot above finalized
finalizedCheckpoint := s.cfg.chain.FinalizedCheckpt()
startSlot, err := slots.EpochStart(finalizedCheckpoint.Epoch)
if err != nil {
return false, err
}
if headerSlot <= startSlot {
return false, errHeaderNotAboveFinalized // IGNORE
}
// [IGNORE] Parent has been seen
if !s.cfg.chain.HasBlock(ctx, parentRoot) {
return false, errHeaderParentNotSeen // IGNORE
}
// [REJECT] Parent passes validation (not a bad block)
if s.hasBadBlock(parentRoot) {
return true, errHeaderParentInvalid
}
// [REJECT] Header slot > parent slot
parentSlot, err := s.cfg.chain.RecentBlockSlot(parentRoot)
if err != nil {
return false, errors.Wrap(err, "get parent slot")
}
if headerSlot <= parentSlot {
return true, errHeaderSlotNotAfterParent
}
// [REJECT] Finalized checkpoint is ancestor (parent is in forkchoice)
if !s.cfg.chain.InForkchoice(parentRoot) {
return true, errHeaderNotFinalizedDescendant
}
// [REJECT] Inclusion proof valid
if err := peerdas.VerifyPartialDataColumnHeaderInclusionProof(header); err != nil {
return true, errHeaderInvalidInclusionProof
}
// [REJECT] Valid proposer signature
parentState, err := s.cfg.stateGen.StateByRoot(ctx, parentRoot)
if err != nil {
return false, errors.Wrap(err, "get parent state")
}
proposerIdx := blockHeader.ProposerIndex
proposer, err := parentState.ValidatorAtIndex(proposerIdx)
if err != nil {
return false, errors.Wrap(err, "get proposer")
}
domain, err := signing.Domain(
parentState.Fork(),
slots.ToEpoch(headerSlot),
params.BeaconConfig().DomainBeaconProposer,
parentState.GenesisValidatorsRoot(),
)
if err != nil {
return false, errors.Wrap(err, "get domain")
}
if err := signing.VerifyBlockHeaderSigningRoot(
blockHeader,
proposer.PublicKey,
header.SignedBlockHeader.Signature,
domain,
); err != nil {
return true, errHeaderInvalidSignature
}
// [REJECT] Expected proposer for slot
expectedProposer, err := helpers.BeaconProposerIndexAtSlot(ctx, parentState, headerSlot)
if err != nil {
return false, errors.Wrap(err, "compute expected proposer")
}
if expectedProposer != proposerIdx {
return true, errHeaderUnexpectedProposer
}
return false, nil // Valid header
}

View File

@@ -8,7 +8,6 @@ go_library(
"cache.go",
"data_column.go",
"error.go",
"execution_proof.go",
"fake.go",
"filesystem.go",
"initializer.go",

Some files were not shown because too many files have changed in this diff Show More