mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-11 06:18:05 -05:00
Compare commits
6 Commits
prysmctl-l
...
topic-bug
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c603321733 | ||
|
|
8f816d6f49 | ||
|
|
0a8603268b | ||
|
|
6fd2d5f268 | ||
|
|
8056f55522 | ||
|
|
9ab42d18da |
1
.bazelrc
1
.bazelrc
@@ -34,7 +34,6 @@ build:minimal --@io_bazel_rules_go//go/config:tags=minimal
|
||||
build:release --compilation_mode=opt
|
||||
build:release --stamp
|
||||
build:release --define pgo_enabled=1
|
||||
build:release --strip=always
|
||||
|
||||
# Build binary with cgo symbolizer for debugging / profiling.
|
||||
build:cgo_symbolizer --copt=-g
|
||||
|
||||
2
.github/workflows/changelog.yml
vendored
2
.github/workflows/changelog.yml
vendored
@@ -9,7 +9,7 @@ on:
|
||||
|
||||
jobs:
|
||||
run-changelog-check:
|
||||
runs-on: ubuntu-4
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout source code
|
||||
uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0
|
||||
|
||||
2
.github/workflows/check-specrefs.yml
vendored
2
.github/workflows/check-specrefs.yml
vendored
@@ -3,7 +3,7 @@ on: [push, pull_request]
|
||||
|
||||
jobs:
|
||||
check-specrefs:
|
||||
runs-on: ubuntu-4
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
|
||||
2
.github/workflows/clang-format.yml
vendored
2
.github/workflows/clang-format.yml
vendored
@@ -10,7 +10,7 @@ on:
|
||||
|
||||
jobs:
|
||||
clang-format-checking:
|
||||
runs-on: ubuntu-4
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
# Is this step failing for you?
|
||||
|
||||
4
.github/workflows/fuzz.yml
vendored
4
.github/workflows/fuzz.yml
vendored
@@ -10,7 +10,7 @@ permissions:
|
||||
|
||||
jobs:
|
||||
list:
|
||||
runs-on: ubuntu-4
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 180
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
@@ -25,7 +25,7 @@ jobs:
|
||||
fuzz-tests: ${{steps.list.outputs.fuzz-tests}}
|
||||
|
||||
fuzz:
|
||||
runs-on: ubuntu-4
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 360
|
||||
needs: list
|
||||
strategy:
|
||||
|
||||
8
.github/workflows/go.yml
vendored
8
.github/workflows/go.yml
vendored
@@ -11,7 +11,7 @@ on:
|
||||
jobs:
|
||||
formatting:
|
||||
name: Formatting
|
||||
runs-on: ubuntu-4
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
@@ -22,7 +22,7 @@ jobs:
|
||||
|
||||
gosec:
|
||||
name: Gosec scan
|
||||
runs-on: ubuntu-4
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
GO111MODULE: on
|
||||
steps:
|
||||
@@ -40,7 +40,7 @@ jobs:
|
||||
|
||||
lint:
|
||||
name: Lint
|
||||
runs-on: ubuntu-4
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
@@ -59,7 +59,7 @@ jobs:
|
||||
|
||||
build:
|
||||
name: Build
|
||||
runs-on: ubuntu-4
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Set up Go 1.25.1
|
||||
uses: actions/setup-go@v4
|
||||
|
||||
4
.github/workflows/horusec.yaml
vendored
4
.github/workflows/horusec.yaml
vendored
@@ -8,7 +8,7 @@ on:
|
||||
jobs:
|
||||
Horusec_Scan:
|
||||
name: horusec-Scan
|
||||
runs-on: ubuntu-4
|
||||
runs-on: ubuntu-latest
|
||||
if: github.ref == 'refs/heads/develop'
|
||||
steps:
|
||||
- name: Check out code
|
||||
@@ -19,4 +19,4 @@ jobs:
|
||||
- name: Running Security Scan
|
||||
run: |
|
||||
curl -fsSL https://raw.githubusercontent.com/ZupIT/horusec/main/deployments/scripts/install.sh | bash -s latest
|
||||
horusec start -t="10000" -p="./" -e="true" -i="**/crypto/bls/herumi/**, **/**/*_test.go, **/third_party/afl/**, **/crypto/keystore/key.go"
|
||||
horusec start -t="10000" -p="./" -e="true" -i="**/crypto/bls/herumi/**, **/**/*_test.go, **/third_party/afl/**, **/crypto/keystore/key.go"
|
||||
@@ -109,7 +109,6 @@ func VerifyCellKZGProofBatch(commitmentsBytes []Bytes48, cellIndices []uint64, c
|
||||
}
|
||||
|
||||
// RecoverCellsAndKZGProofs recovers the complete cells and KZG proofs from a given set of cell indices and partial cells.
|
||||
// Note: `len(cellIndices)` must be equal to `len(partialCells)` and `cellIndices` must be sorted in ascending order.
|
||||
func RecoverCellsAndKZGProofs(cellIndices []uint64, partialCells []Cell) (CellsAndProofs, error) {
|
||||
// Convert `Cell` type to `ckzg4844.Cell`
|
||||
ckzgPartialCells := make([]ckzg4844.Cell, len(partialCells))
|
||||
|
||||
@@ -89,7 +89,7 @@ func (mb *mockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mb *mockBroadcaster) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn) error {
|
||||
func (mb *mockBroadcaster) BroadcastDataColumnSidecar(_ uint64, _ blocks.VerifiedRODataColumn) error {
|
||||
mb.broadcastCalled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package peerdas
|
||||
|
||||
import (
|
||||
"sort"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
|
||||
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
@@ -30,8 +28,7 @@ func MinimumColumnCountToReconstruct() uint64 {
|
||||
|
||||
// ReconstructDataColumnSidecars reconstructs all the data column sidecars from the given input data column sidecars.
|
||||
// All input sidecars must be committed to the same block.
|
||||
// `inVerifiedRoSidecars` should contain enough sidecars to reconstruct the missing columns, and should not contain any duplicate.
|
||||
// WARNING: This function sorts inplace `verifiedRoSidecars` by index.
|
||||
// `inVerifiedRoSidecars` should contain enough (unique) sidecars to reconstruct the missing columns.
|
||||
func ReconstructDataColumnSidecars(verifiedRoSidecars []blocks.VerifiedRODataColumn) ([]blocks.VerifiedRODataColumn, error) {
|
||||
// Check if there is at least one input sidecar.
|
||||
if len(verifiedRoSidecars) == 0 {
|
||||
@@ -54,17 +51,18 @@ func ReconstructDataColumnSidecars(verifiedRoSidecars []blocks.VerifiedRODataCol
|
||||
}
|
||||
}
|
||||
|
||||
// Deduplicate sidecars.
|
||||
sidecarByIndex := make(map[uint64]blocks.VerifiedRODataColumn, len(verifiedRoSidecars))
|
||||
for _, inVerifiedRoSidecar := range verifiedRoSidecars {
|
||||
sidecarByIndex[inVerifiedRoSidecar.Index] = inVerifiedRoSidecar
|
||||
}
|
||||
|
||||
// Check if there is enough sidecars to reconstruct the missing columns.
|
||||
sidecarCount := len(verifiedRoSidecars)
|
||||
sidecarCount := len(sidecarByIndex)
|
||||
if uint64(sidecarCount) < MinimumColumnCountToReconstruct() {
|
||||
return nil, ErrNotEnoughDataColumnSidecars
|
||||
}
|
||||
|
||||
// Sort the input sidecars by index.
|
||||
sort.Slice(verifiedRoSidecars, func(i, j int) bool {
|
||||
return verifiedRoSidecars[i].Index < verifiedRoSidecars[j].Index
|
||||
})
|
||||
|
||||
// Recover cells and compute proofs in parallel.
|
||||
var wg errgroup.Group
|
||||
cellsAndProofs := make([]kzg.CellsAndProofs, blobCount)
|
||||
@@ -73,10 +71,10 @@ func ReconstructDataColumnSidecars(verifiedRoSidecars []blocks.VerifiedRODataCol
|
||||
cellsIndices := make([]uint64, 0, sidecarCount)
|
||||
cells := make([]kzg.Cell, 0, sidecarCount)
|
||||
|
||||
for _, sidecar := range verifiedRoSidecars {
|
||||
for columnIndex, sidecar := range sidecarByIndex {
|
||||
cell := sidecar.Column[blobIndex]
|
||||
cells = append(cells, kzg.Cell(cell))
|
||||
cellsIndices = append(cellsIndices, sidecar.Index)
|
||||
cellsIndices = append(cellsIndices, columnIndex)
|
||||
}
|
||||
|
||||
// Recover the cells and proofs for the corresponding blob
|
||||
|
||||
@@ -162,7 +162,6 @@ go_test(
|
||||
"//cmd/beacon-chain/flags:go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/blocks:go_default_library",
|
||||
"//consensus-types/interfaces:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//consensus-types/wrapper:go_default_library",
|
||||
|
||||
@@ -5,18 +5,14 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/altair"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
|
||||
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v6/crypto/hash"
|
||||
"github.com/OffchainLabs/prysm/v6/monitoring/tracing"
|
||||
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
|
||||
@@ -310,150 +306,86 @@ func (s *Service) BroadcastLightClientFinalityUpdate(ctx context.Context, update
|
||||
return nil
|
||||
}
|
||||
|
||||
// BroadcastDataColumnSidecars broadcasts multiple data column sidecars to the p2p network, after ensuring
|
||||
// there is at least one peer in each needed subnet. If not, it will attempt to find one before broadcasting.
|
||||
// This function is non-blocking. It stops trying to broadcast a given sidecar when more than one slot has passed, or the context is
|
||||
// cancelled (whichever comes first).
|
||||
func (s *Service) BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn) error {
|
||||
// Increase the number of broadcast attempts.
|
||||
dataColumnSidecarBroadcastAttempts.Add(float64(len(sidecars)))
|
||||
// BroadcastDataColumnSidecar broadcasts a data column to the p2p network, the message is assumed to be
|
||||
// broadcasted to the current fork and to the input column subnet.
|
||||
func (s *Service) BroadcastDataColumnSidecar(
|
||||
dataColumnSubnet uint64,
|
||||
dataColumnSidecar blocks.VerifiedRODataColumn,
|
||||
) error {
|
||||
// Add tracing to the function.
|
||||
ctx, span := trace.StartSpan(s.ctx, "p2p.BroadcastDataColumnSidecar")
|
||||
defer span.End()
|
||||
|
||||
// Retrieve the current fork digest.
|
||||
forkDigest, err := s.currentForkDigest()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "current fork digest")
|
||||
err := errors.Wrap(err, "current fork digest")
|
||||
tracing.AnnotateError(span, err)
|
||||
return err
|
||||
}
|
||||
|
||||
go s.broadcastDataColumnSidecars(ctx, forkDigest, sidecars)
|
||||
// Non-blocking broadcast, with attempts to discover a column subnet peer if none available.
|
||||
go s.internalBroadcastDataColumnSidecar(ctx, dataColumnSubnet, dataColumnSidecar, forkDigest)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// broadcastDataColumnSidecars broadcasts multiple data column sidecars to the p2p network, after ensuring
|
||||
// there is at least one peer in each needed subnet. If not, it will attempt to find one before broadcasting.
|
||||
// It returns when all broadcasts are complete, or the context is cancelled (whichever comes first).
|
||||
func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [fieldparams.VersionLength]byte, sidecars []blocks.VerifiedRODataColumn) {
|
||||
type rootAndIndex struct {
|
||||
root [fieldparams.RootLength]byte
|
||||
index uint64
|
||||
func (s *Service) internalBroadcastDataColumnSidecar(
|
||||
ctx context.Context,
|
||||
columnSubnet uint64,
|
||||
dataColumnSidecar blocks.VerifiedRODataColumn,
|
||||
forkDigest [fieldparams.VersionLength]byte,
|
||||
) {
|
||||
// Add tracing to the function.
|
||||
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastDataColumnSidecar")
|
||||
defer span.End()
|
||||
|
||||
// Increase the number of broadcast attempts.
|
||||
dataColumnSidecarBroadcastAttempts.Inc()
|
||||
|
||||
// Define a one-slot length context timeout.
|
||||
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
|
||||
oneSlot := time.Duration(secondsPerSlot) * time.Second
|
||||
ctx, cancel := context.WithTimeout(ctx, oneSlot)
|
||||
defer cancel()
|
||||
|
||||
// Build the topic corresponding to this column subnet and this fork digest.
|
||||
topic := dataColumnSubnetToTopic(columnSubnet, forkDigest)
|
||||
|
||||
// Compute the wrapped subnet index.
|
||||
wrappedSubIdx := columnSubnet + dataColumnSubnetVal
|
||||
|
||||
// Find peers if needed.
|
||||
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, DataColumnSubnetTopicFormat, forkDigest, columnSubnet); err != nil {
|
||||
log.WithError(err).Error("Failed to find peers for data column subnet")
|
||||
tracing.AnnotateError(span, err)
|
||||
}
|
||||
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
timings sync.Map
|
||||
)
|
||||
|
||||
logLevel := logrus.GetLevel()
|
||||
|
||||
slotPerRoot := make(map[[fieldparams.RootLength]byte]primitives.Slot, 1)
|
||||
for _, sidecar := range sidecars {
|
||||
slotPerRoot[sidecar.BlockRoot()] = sidecar.Slot()
|
||||
|
||||
wg.Go(func() {
|
||||
// Add tracing to the function.
|
||||
ctx, span := trace.StartSpan(s.ctx, "p2p.broadcastDataColumnSidecars")
|
||||
defer span.End()
|
||||
|
||||
// Compute the subnet for this data column sidecar.
|
||||
subnet := peerdas.ComputeSubnetForDataColumnSidecar(sidecar.Index)
|
||||
|
||||
// Build the topic corresponding to subnet column subnet and this fork digest.
|
||||
topic := dataColumnSubnetToTopic(subnet, forkDigest)
|
||||
|
||||
// Compute the wrapped subnet index.
|
||||
wrappedSubIdx := subnet + dataColumnSubnetVal
|
||||
|
||||
// Find peers if needed.
|
||||
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, DataColumnSubnetTopicFormat, forkDigest, subnet); err != nil {
|
||||
tracing.AnnotateError(span, err)
|
||||
log.WithError(err).Error("Cannot find peers if needed")
|
||||
return
|
||||
}
|
||||
|
||||
// Broadcast the data column sidecar to the network.
|
||||
if err := s.broadcastObject(ctx, sidecar, topic); err != nil {
|
||||
tracing.AnnotateError(span, err)
|
||||
log.WithError(err).Error("Cannot broadcast data column sidecar")
|
||||
return
|
||||
}
|
||||
|
||||
// Increase the number of successful broadcasts.
|
||||
dataColumnSidecarBroadcasts.Inc()
|
||||
|
||||
// Record the timing for log purposes.
|
||||
if logLevel >= logrus.DebugLevel {
|
||||
root := sidecar.BlockRoot()
|
||||
timings.Store(rootAndIndex{root: root, index: sidecar.Index}, time.Now())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Wait for all broadcasts to finish.
|
||||
wg.Wait()
|
||||
|
||||
// The rest of this function is only for debug logging purposes.
|
||||
if logLevel < logrus.DebugLevel {
|
||||
// Broadcast the data column sidecar to the network.
|
||||
if err := s.broadcastObject(ctx, dataColumnSidecar, topic); err != nil {
|
||||
log.WithError(err).Error("Failed to broadcast data column sidecar")
|
||||
tracing.AnnotateError(span, err)
|
||||
return
|
||||
}
|
||||
|
||||
type logInfo struct {
|
||||
durationMin time.Duration
|
||||
durationMax time.Duration
|
||||
indices []uint64
|
||||
header := dataColumnSidecar.SignedBlockHeader.GetHeader()
|
||||
slot := header.GetSlot()
|
||||
|
||||
slotStartTime, err := slots.StartTime(s.genesisTime, slot)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to convert slot to time")
|
||||
}
|
||||
|
||||
logInfoPerRoot := make(map[[fieldparams.RootLength]byte]*logInfo, 1)
|
||||
log.WithFields(logrus.Fields{
|
||||
"slot": slot,
|
||||
"timeSinceSlotStart": time.Since(slotStartTime),
|
||||
"root": fmt.Sprintf("%#x", dataColumnSidecar.BlockRoot()),
|
||||
"columnSubnet": columnSubnet,
|
||||
"blobCount": len(dataColumnSidecar.Column),
|
||||
}).Debug("Broadcasted data column sidecar")
|
||||
|
||||
timings.Range(func(key any, value any) bool {
|
||||
rootAndIndex, ok := key.(rootAndIndex)
|
||||
if !ok {
|
||||
log.Error("Could not cast key to rootAndIndex")
|
||||
return true
|
||||
}
|
||||
|
||||
broadcastTime, ok := value.(time.Time)
|
||||
if !ok {
|
||||
log.Error("Could not cast value to time.Time")
|
||||
return true
|
||||
}
|
||||
|
||||
slot, ok := slotPerRoot[rootAndIndex.root]
|
||||
if !ok {
|
||||
log.WithField("root", fmt.Sprintf("%#x", rootAndIndex.root)).Error("Could not find slot for root")
|
||||
return true
|
||||
}
|
||||
|
||||
duration, err := slots.SinceSlotStart(slot, s.genesisTime, broadcastTime)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not compute duration since slot start")
|
||||
return true
|
||||
}
|
||||
|
||||
info, ok := logInfoPerRoot[rootAndIndex.root]
|
||||
if !ok {
|
||||
logInfoPerRoot[rootAndIndex.root] = &logInfo{durationMin: duration, durationMax: duration, indices: []uint64{rootAndIndex.index}}
|
||||
return true
|
||||
}
|
||||
|
||||
info.durationMin = min(info.durationMin, duration)
|
||||
info.durationMax = max(info.durationMax, duration)
|
||||
info.indices = append(info.indices, rootAndIndex.index)
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
for root, info := range logInfoPerRoot {
|
||||
slices.Sort(info.indices)
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"root": fmt.Sprintf("%#x", root),
|
||||
"slot": slotPerRoot[root],
|
||||
"count": len(info.indices),
|
||||
"indices": helpers.PrettySlice(info.indices),
|
||||
"timeSinceSlotStartMin": info.durationMin,
|
||||
"timeSinceSlotStartMax": info.durationMax,
|
||||
}).Debug("Broadcasted data column sidecars")
|
||||
}
|
||||
// Increase the number of successful broadcasts.
|
||||
dataColumnSidecarBroadcasts.Inc()
|
||||
}
|
||||
|
||||
func (s *Service) findPeersIfNeeded(
|
||||
|
||||
@@ -15,10 +15,10 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
|
||||
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
|
||||
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/wrapper"
|
||||
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
|
||||
@@ -60,6 +60,7 @@ func TestService_Broadcast(t *testing.T) {
|
||||
topic := "/eth2/%x/testing"
|
||||
// Set a test gossip mapping for testpb.TestSimpleMessage.
|
||||
GossipTypeMapping[reflect.TypeOf(msg)] = topic
|
||||
p.clock = startup.NewClock(p.genesisTime, bytesutil.ToBytes32(p.genesisValidatorsRoot))
|
||||
digest, err := p.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = fmt.Sprintf(topic, digest)
|
||||
@@ -663,8 +664,6 @@ func TestService_BroadcastDataColumn(t *testing.T) {
|
||||
topicFormat = DataColumnSubnetTopicFormat
|
||||
)
|
||||
|
||||
ctx := t.Context()
|
||||
|
||||
// Load the KZG trust setup.
|
||||
err := kzg.Start()
|
||||
require.NoError(t, err)
|
||||
@@ -687,7 +686,7 @@ func TestService_BroadcastDataColumn(t *testing.T) {
|
||||
_, pkey, ipAddr := createHost(t, port)
|
||||
|
||||
service := &Service{
|
||||
ctx: ctx,
|
||||
ctx: t.Context(),
|
||||
host: p1.BHost,
|
||||
pubsub: p1.PubSub(),
|
||||
joinedTopics: map[string]*pubsub.Topic{},
|
||||
@@ -696,7 +695,7 @@ func TestService_BroadcastDataColumn(t *testing.T) {
|
||||
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
|
||||
subnetsLock: make(map[uint64]*sync.RWMutex),
|
||||
subnetsLockLock: sync.Mutex{},
|
||||
peers: peers.NewStatus(ctx, &peers.StatusConfig{ScorerParams: &scorers.Config{}}),
|
||||
peers: peers.NewStatus(t.Context(), &peers.StatusConfig{ScorerParams: &scorers.Config{}}),
|
||||
custodyInfo: &custodyInfo{},
|
||||
}
|
||||
|
||||
@@ -723,7 +722,7 @@ func TestService_BroadcastDataColumn(t *testing.T) {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// Broadcast to peers and wait.
|
||||
err = service.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{verifiedRoSidecar})
|
||||
err = service.BroadcastDataColumnSidecar(subnet, verifiedRoSidecar)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Receive the message.
|
||||
|
||||
@@ -52,7 +52,7 @@ type (
|
||||
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
|
||||
BroadcastLightClientOptimisticUpdate(ctx context.Context, update interfaces.LightClientOptimisticUpdate) error
|
||||
BroadcastLightClientFinalityUpdate(ctx context.Context, update interfaces.LightClientFinalityUpdate) error
|
||||
BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn) error
|
||||
BroadcastDataColumnSidecar(columnSubnet uint64, dataColumnSidecar blocks.VerifiedRODataColumn) error
|
||||
}
|
||||
|
||||
// SetStreamHandler configures p2p to handle streams of a certain topic ID.
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
"github.com/OffchainLabs/prysm/v6/config/features"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
@@ -91,6 +92,7 @@ type Service struct {
|
||||
peerDisconnectionTime *cache.Cache
|
||||
custodyInfo *custodyInfo
|
||||
custodyInfoLock sync.RWMutex // Lock access to custodyInfo
|
||||
clock *startup.Clock
|
||||
allForkDigests map[[4]byte]struct{}
|
||||
}
|
||||
|
||||
|
||||
@@ -169,7 +169,7 @@ func (*FakeP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interfac
|
||||
}
|
||||
|
||||
// BroadcastDataColumnSidecar -- fake.
|
||||
func (*FakeP2P) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn) error {
|
||||
func (*FakeP2P) BroadcastDataColumnSidecar(_ uint64, _ blocks.VerifiedRODataColumn) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -63,7 +63,7 @@ func (m *MockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
|
||||
}
|
||||
|
||||
// BroadcastDataColumnSidecar broadcasts a data column for mock.
|
||||
func (m *MockBroadcaster) BroadcastDataColumnSidecars(context.Context, []blocks.VerifiedRODataColumn) error {
|
||||
func (m *MockBroadcaster) BroadcastDataColumnSidecar(uint64, blocks.VerifiedRODataColumn) error {
|
||||
m.BroadcastCalled.Store(true)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -233,7 +233,7 @@ func (p *TestP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interf
|
||||
}
|
||||
|
||||
// BroadcastDataColumnSidecar broadcasts a data column for mock.
|
||||
func (p *TestP2P) BroadcastDataColumnSidecars(context.Context, []blocks.VerifiedRODataColumn) error {
|
||||
func (p *TestP2P) BroadcastDataColumnSidecar(uint64, blocks.VerifiedRODataColumn) error {
|
||||
p.BroadcastCalled.Store(true)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -27,7 +27,5 @@ go_test(
|
||||
"//testing/require:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -124,28 +124,14 @@ func convertValueForJSON(v reflect.Value, tag string) interface{} {
|
||||
if !v.Field(i).CanInterface() {
|
||||
continue // unexported
|
||||
}
|
||||
jsonTag := f.Tag.Get("json")
|
||||
if jsonTag == "-" {
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse JSON tag options (e.g., "fieldname,omitempty")
|
||||
parts := strings.Split(jsonTag, ",")
|
||||
key := parts[0]
|
||||
|
||||
if key == "" {
|
||||
key := f.Tag.Get("json")
|
||||
if key == "" || key == "-" {
|
||||
key = f.Name
|
||||
}
|
||||
|
||||
fieldValue := convertValueForJSON(v.Field(i), tag)
|
||||
m[key] = fieldValue
|
||||
m[key] = convertValueForJSON(v.Field(i), tag)
|
||||
}
|
||||
return m
|
||||
|
||||
// ===== String =====
|
||||
case reflect.String:
|
||||
return v.String()
|
||||
|
||||
// ===== Default =====
|
||||
default:
|
||||
log.WithFields(log.Fields{
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"math"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/api/server/structs"
|
||||
@@ -18,8 +17,6 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
log "github.com/sirupsen/logrus"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
|
||||
func TestGetDepositContract(t *testing.T) {
|
||||
@@ -692,27 +689,6 @@ func TestGetSpec_BlobSchedule(t *testing.T) {
|
||||
// Check second entry - values should be strings for consistent API output
|
||||
assert.Equal(t, "200", blobSchedule[1]["EPOCH"])
|
||||
assert.Equal(t, "9", blobSchedule[1]["MAX_BLOBS_PER_BLOCK"])
|
||||
|
||||
// Verify that fields with json:"-" are NOT present in the blob schedule entries
|
||||
for i, entry := range blobSchedule {
|
||||
t.Run(fmt.Sprintf("entry_%d_omits_json_dash_fields", i), func(t *testing.T) {
|
||||
// These fields have `json:"-"` in NetworkScheduleEntry and should be omitted
|
||||
_, hasForkVersion := entry["ForkVersion"]
|
||||
assert.Equal(t, false, hasForkVersion, "ForkVersion should be omitted due to json:\"-\"")
|
||||
|
||||
_, hasForkDigest := entry["ForkDigest"]
|
||||
assert.Equal(t, false, hasForkDigest, "ForkDigest should be omitted due to json:\"-\"")
|
||||
|
||||
_, hasBPOEpoch := entry["BPOEpoch"]
|
||||
assert.Equal(t, false, hasBPOEpoch, "BPOEpoch should be omitted due to json:\"-\"")
|
||||
|
||||
_, hasVersionEnum := entry["VersionEnum"]
|
||||
assert.Equal(t, false, hasVersionEnum, "VersionEnum should be omitted due to json:\"-\"")
|
||||
|
||||
_, hasIsFork := entry["isFork"]
|
||||
assert.Equal(t, false, hasIsFork, "isFork should be omitted due to json:\"-\"")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetSpec_BlobSchedule_NotFulu(t *testing.T) {
|
||||
@@ -739,35 +715,3 @@ func TestGetSpec_BlobSchedule_NotFulu(t *testing.T) {
|
||||
_, exists := data["BLOB_SCHEDULE"]
|
||||
require.Equal(t, false, exists)
|
||||
}
|
||||
|
||||
func TestConvertValueForJSON_NoErrorLogsForStrings(t *testing.T) {
|
||||
logHook := logTest.NewLocal(log.StandardLogger())
|
||||
defer logHook.Reset()
|
||||
|
||||
stringTestCases := []struct {
|
||||
tag string
|
||||
value string
|
||||
}{
|
||||
{"CONFIG_NAME", "mainnet"},
|
||||
{"PRESET_BASE", "mainnet"},
|
||||
{"DEPOSIT_CONTRACT_ADDRESS", "0x00000000219ab540356cBB839Cbe05303d7705Fa"},
|
||||
{"TERMINAL_TOTAL_DIFFICULTY", "58750000000000000000000"},
|
||||
}
|
||||
|
||||
for _, tc := range stringTestCases {
|
||||
t.Run(tc.tag, func(t *testing.T) {
|
||||
logHook.Reset()
|
||||
|
||||
// Convert the string value
|
||||
v := reflect.ValueOf(tc.value)
|
||||
result := convertValueForJSON(v, tc.tag)
|
||||
|
||||
// Verify the result is correct
|
||||
require.Equal(t, tc.value, result)
|
||||
|
||||
// Verify NO error was logged about unsupported field kind
|
||||
require.LogsDoNotContain(t, logHook, "Unsupported config field kind")
|
||||
require.LogsDoNotContain(t, logHook, "kind=string")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -690,10 +690,6 @@ func (s *Server) ProduceSyncCommitteeContribution(w http.ResponseWriter, r *http
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if index >= params.BeaconConfig().SyncCommitteeSubnetCount {
|
||||
httputil.HandleError(w, fmt.Sprintf("Subcommittee index needs to be between 0 and %d, %d is outside of this range.", params.BeaconConfig().SyncCommitteeSubnetCount-1, index), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
_, slot, ok := shared.UintFromQuery(w, r, "slot", true)
|
||||
if !ok {
|
||||
return
|
||||
|
||||
@@ -2117,27 +2117,6 @@ func TestProduceSyncCommitteeContribution(t *testing.T) {
|
||||
server.ProduceSyncCommitteeContribution(writer, request)
|
||||
assert.Equal(t, http.StatusServiceUnavailable, writer.Code)
|
||||
})
|
||||
t.Run("invalid subcommittee_index", func(t *testing.T) {
|
||||
url := "http://example.com?slot=1&subcommittee_index=10&beacon_block_root=0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"
|
||||
request := httptest.NewRequest(http.MethodGet, url, nil)
|
||||
writer := httptest.NewRecorder()
|
||||
writer.Body = &bytes.Buffer{}
|
||||
|
||||
// Use non-optimistic server for this test
|
||||
server := Server{
|
||||
CoreService: &core.Service{
|
||||
HeadFetcher: &mockChain.ChainService{
|
||||
SyncCommitteeIndices: []primitives.CommitteeIndex{0},
|
||||
},
|
||||
},
|
||||
SyncCommitteePool: syncCommitteePool,
|
||||
OptimisticModeFetcher: &mockChain.ChainService{}, // Optimistic: false by default
|
||||
}
|
||||
|
||||
server.ProduceSyncCommitteeContribution(writer, request)
|
||||
assert.Equal(t, http.StatusBadRequest, writer.Code)
|
||||
require.ErrorContains(t, "Subcommittee index needs to be between 0 and 3, 10 is outside of this range.", errors.New(writer.Body.String()))
|
||||
})
|
||||
}
|
||||
|
||||
func TestServer_RegisterValidator(t *testing.T) {
|
||||
|
||||
@@ -352,7 +352,7 @@ func (vs *Server) broadcastAndReceiveSidecars(
|
||||
dataColumnSidecars []blocks.RODataColumn,
|
||||
) error {
|
||||
if block.Version() >= version.Fulu {
|
||||
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSidecars); err != nil {
|
||||
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSidecars, root); err != nil {
|
||||
return errors.Wrap(err, "broadcast and receive data columns")
|
||||
}
|
||||
return nil
|
||||
@@ -495,22 +495,43 @@ func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethp
|
||||
}
|
||||
|
||||
// broadcastAndReceiveDataColumns handles the broadcasting and reception of data columns sidecars.
|
||||
func (vs *Server) broadcastAndReceiveDataColumns(ctx context.Context, roSidecars []blocks.RODataColumn) error {
|
||||
// We built this block ourselves, so we can upgrade the read only data column sidecar into a verified one.
|
||||
verifiedSidecars := make([]blocks.VerifiedRODataColumn, 0, len(roSidecars))
|
||||
for _, sidecar := range roSidecars {
|
||||
verifiedSidecar := blocks.NewVerifiedRODataColumn(sidecar)
|
||||
verifiedSidecars = append(verifiedSidecars, verifiedSidecar)
|
||||
func (vs *Server) broadcastAndReceiveDataColumns(
|
||||
ctx context.Context,
|
||||
roSidecars []blocks.RODataColumn,
|
||||
root [fieldparams.RootLength]byte,
|
||||
) error {
|
||||
verifiedRODataColumns := make([]blocks.VerifiedRODataColumn, 0, len(roSidecars))
|
||||
eg, _ := errgroup.WithContext(ctx)
|
||||
for _, roSidecar := range roSidecars {
|
||||
// We build this block ourselves, so we can upgrade the read only data column sidecar into a verified one.
|
||||
verifiedRODataColumn := blocks.NewVerifiedRODataColumn(roSidecar)
|
||||
verifiedRODataColumns = append(verifiedRODataColumns, verifiedRODataColumn)
|
||||
|
||||
eg.Go(func() error {
|
||||
// Compute the subnet index based on the column index.
|
||||
subnet := peerdas.ComputeSubnetForDataColumnSidecar(roSidecar.Index)
|
||||
|
||||
if err := vs.P2P.BroadcastDataColumnSidecar(subnet, verifiedRODataColumn); err != nil {
|
||||
return errors.Wrap(err, "broadcast data column")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Broadcast sidecars (non blocking).
|
||||
if err := vs.P2P.BroadcastDataColumnSidecars(ctx, verifiedSidecars); err != nil {
|
||||
return errors.Wrap(err, "broadcast data column sidecars")
|
||||
if err := vs.DataColumnReceiver.ReceiveDataColumns(verifiedRODataColumns); err != nil {
|
||||
return errors.Wrap(err, "receive data column")
|
||||
}
|
||||
|
||||
// In parallel, receive sidecars.
|
||||
if err := vs.DataColumnReceiver.ReceiveDataColumns(verifiedSidecars); err != nil {
|
||||
return errors.Wrap(err, "receive data columns")
|
||||
for _, verifiedRODataColumn := range verifiedRODataColumns {
|
||||
vs.OperationNotifier.OperationFeed().Send(&feed.Event{
|
||||
Type: operation.DataColumnSidecarReceived,
|
||||
Data: &operation.DataColumnSidecarReceivedData{DataColumn: &verifiedRODataColumn}, // #nosec G601
|
||||
})
|
||||
}
|
||||
|
||||
if err := eg.Wait(); err != nil {
|
||||
return errors.Wrap(err, "wait for data columns to be broadcasted")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -196,6 +196,7 @@ go_test(
|
||||
"subscriber_beacon_aggregate_proof_test.go",
|
||||
"subscriber_beacon_blocks_test.go",
|
||||
"subscriber_data_column_sidecar_test.go",
|
||||
"subscriber_race_test.go",
|
||||
"subscriber_test.go",
|
||||
"subscription_topic_handler_test.go",
|
||||
"sync_fuzz_test.go",
|
||||
|
||||
@@ -416,6 +416,11 @@ func (s *Service) startDiscoveryAndSubscriptions() {
|
||||
// Register respective pubsub handlers at state synced event.
|
||||
s.registerSubscribers(currentEpoch, forkDigest)
|
||||
|
||||
// Initialize registeredNetworkEntry to the current network schedule entry to avoid
|
||||
// duplicate subscriber registration on the first forkWatcher tick when the next
|
||||
// epoch has the same digest.
|
||||
s.registeredNetworkEntry = params.GetNetworkScheduleEntry(currentEpoch)
|
||||
|
||||
// Start the fork watcher.
|
||||
go s.forkWatcher()
|
||||
}
|
||||
|
||||
@@ -344,15 +344,23 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s
|
||||
topic += s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
log := log.WithField("topic", topic)
|
||||
|
||||
// Do not resubscribe already seen subscriptions.
|
||||
ok := s.subHandler.topicExists(topic)
|
||||
if ok {
|
||||
// 1) Fast-path bail if it already exists.
|
||||
if s.subHandler.topicExists(topic) {
|
||||
log.WithField("topic", topic).Debug("Provided topic already has an active subscription running")
|
||||
return nil
|
||||
}
|
||||
|
||||
// 2) Otherwise, atomically reserve to block concurrent goroutines.
|
||||
if !s.subHandler.tryReserveTopic(topic) {
|
||||
// Someone else reserved first.
|
||||
log.WithField("topic", topic).Debug("Provided topic already has an active subscription running")
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := s.cfg.p2p.PubSub().RegisterTopicValidator(s.wrapAndReportValidation(topic, validator)); err != nil {
|
||||
log.WithError(err).Error("Could not register validator for topic")
|
||||
// Clean up the reservation since we're not proceeding
|
||||
s.subHandler.removeTopic(topic)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -362,9 +370,12 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s
|
||||
// libp2p PubSub library or a subscription request to a topic that fails to match the topic
|
||||
// subscription filter.
|
||||
log.WithError(err).Error("Could not subscribe topic")
|
||||
// Clean up the reservation since we're not proceeding
|
||||
s.subHandler.removeTopic(topic)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update the reservation with the actual subscription
|
||||
s.subHandler.addTopic(sub.Topic(), sub)
|
||||
|
||||
// Pipeline decodes the incoming subscription data, runs the validation, and handles the
|
||||
@@ -414,6 +425,8 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s
|
||||
// Cancel subscription in the event of an error, as we are
|
||||
// now exiting topic event loop.
|
||||
sub.Cancel()
|
||||
// Remove topic from our tracking to allow resubscription.
|
||||
s.subHandler.removeTopic(topic)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -533,7 +546,15 @@ func (s *Service) subscribeToSubnets(t *subnetTracker) error {
|
||||
for _, subnet := range t.missing(subnetsToJoin) {
|
||||
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
|
||||
topic := t.fullTopic(subnet, "")
|
||||
t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle))
|
||||
sub := s.subscribeWithBase(topic, t.validate, t.handle)
|
||||
// Even if sub is nil (topic already exists), we need to track the subnet
|
||||
// to avoid repeated subscription attempts every slot.
|
||||
if sub == nil {
|
||||
// Topic already exists, get the existing subscription for tracking
|
||||
fullTopic := topic + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
sub = s.subHandler.subForTopic(fullTopic)
|
||||
}
|
||||
t.track(subnet, sub)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -267,9 +267,16 @@ func (s *Service) broadcastAndReceiveUnseenDataColumnSidecars(
|
||||
unseenIndices[sidecar.Index] = true
|
||||
}
|
||||
|
||||
// Broadcast all the data column sidecars we reconstructed but did not see via gossip (non blocking).
|
||||
if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, unseenSidecars); err != nil {
|
||||
return nil, errors.Wrap(err, "broadcast data column sidecars")
|
||||
// Broadcast all the data column sidecars we reconstructed but did not see via gossip.
|
||||
for _, sidecar := range unseenSidecars {
|
||||
// Compute the subnet for this data column sidecar.
|
||||
subnet := peerdas.ComputeSubnetForDataColumnSidecar(sidecar.Index)
|
||||
|
||||
// Broadcast the data column sidecar.
|
||||
if err := s.cfg.p2p.BroadcastDataColumnSidecar(subnet, sidecar); err != nil {
|
||||
// Don't return on error on broadcast failure, just log it.
|
||||
log.WithError(err).Error("Broadcast data column")
|
||||
}
|
||||
}
|
||||
|
||||
// Receive data column sidecars.
|
||||
|
||||
347
beacon-chain/sync/subscriber_race_test.go
Normal file
347
beacon-chain/sync/subscriber_race_test.go
Normal file
@@ -0,0 +1,347 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/async/abool"
|
||||
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
|
||||
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
// TestSubscriptionCleanup_MissingRemoveTopic tests the following bug:
|
||||
// When a subscription's message loop fails and sub.Cancel() is called,
|
||||
// removeTopic() is NOT called, leaving stale entries in subTopics map.
|
||||
// This likely causes memory leaks and prevents resubscription (missed attestations).
|
||||
func TestSubscriptionCleanup_MissingRemoveTopic(t *testing.T) {
|
||||
t.Run("memory leak with repeated failures", func(t *testing.T) {
|
||||
// This test verifies that removeTopic() is called when subscription fails
|
||||
// Fresh setup for this subtest
|
||||
p2pService := p2ptest.NewTestP2P(t)
|
||||
gt := time.Now()
|
||||
vr := [32]byte{'A'}
|
||||
|
||||
r := &Service{
|
||||
ctx: context.Background(),
|
||||
cfg: &config{
|
||||
p2p: p2pService,
|
||||
initialSync: &mockSync.Sync{IsSyncing: false},
|
||||
chain: &mockChain.ChainService{
|
||||
ValidatorsRoot: vr,
|
||||
Genesis: gt,
|
||||
},
|
||||
clock: startup.NewClock(gt, vr),
|
||||
},
|
||||
subHandler: newSubTopicHandler(),
|
||||
chainStarted: abool.New(),
|
||||
}
|
||||
markInitSyncComplete(t, r)
|
||||
|
||||
digest, err := r.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
p2pService.Digest = digest
|
||||
|
||||
getMapSize := func() int {
|
||||
r.subHandler.RLock()
|
||||
defer r.subHandler.RUnlock()
|
||||
return len(r.subHandler.subTopics)
|
||||
}
|
||||
|
||||
baseTopic := "/eth2/%x/voluntary_exit"
|
||||
|
||||
// Do one cycle: subscribe, cancel, check cleanup
|
||||
iterCtx, iterCancel := context.WithCancel(context.Background())
|
||||
r.ctx = iterCtx
|
||||
|
||||
handler := func(ctx context.Context, msg proto.Message) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
r.markForChainStart()
|
||||
|
||||
// Subscribe
|
||||
sub := r.subscribeWithBase(baseTopic, r.noopValidator, handler)
|
||||
require.NotNil(t, sub, "First subscription should succeed")
|
||||
|
||||
// Verify subscribed
|
||||
sizeAfterSubscribe := getMapSize()
|
||||
require.Equal(t, 1, sizeAfterSubscribe, "Should have 1 entry after subscribe")
|
||||
|
||||
// Cancel to simulate failure
|
||||
iterCancel()
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
// Check cleanup happened - this is the core fix verification
|
||||
sizeAfterCancel := getMapSize()
|
||||
if sizeAfterCancel != 0 {
|
||||
t.Errorf("After context cancellation, subTopics has %d entries (expected 0). "+
|
||||
"removeTopic() should have been called at line 420.",
|
||||
sizeAfterCancel)
|
||||
} else {
|
||||
t.Logf("SUCCESS: Cleanup working correctly - map size is 0 after cancellation")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestConcurrentSubscription_RaceCondition tests the following bug:
|
||||
// Multiple goroutines can pass topicExists() check simultaneously
|
||||
// before any calls addTopic(), causing duplicate subscriptions.
|
||||
func TestConcurrentSubscription_RaceCondition(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
numGoroutines int
|
||||
iterations int
|
||||
useBarrier bool
|
||||
}{
|
||||
{
|
||||
name: "two concurrent",
|
||||
numGoroutines: 2,
|
||||
iterations: 20,
|
||||
useBarrier: true,
|
||||
},
|
||||
{
|
||||
name: "five concurrent",
|
||||
numGoroutines: 5,
|
||||
iterations: 15,
|
||||
useBarrier: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
duplicateDetected := 0
|
||||
|
||||
for iter := 0; iter < tt.iterations; iter++ {
|
||||
// Fresh setup for each iteration
|
||||
p2pService := p2ptest.NewTestP2P(t)
|
||||
gt := time.Now()
|
||||
vr := [32]byte{'A'}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
|
||||
r := &Service{
|
||||
ctx: ctx,
|
||||
cfg: &config{
|
||||
p2p: p2pService,
|
||||
initialSync: &mockSync.Sync{IsSyncing: false},
|
||||
chain: &mockChain.ChainService{
|
||||
ValidatorsRoot: vr,
|
||||
Genesis: gt,
|
||||
},
|
||||
clock: startup.NewClock(gt, vr),
|
||||
},
|
||||
subHandler: newSubTopicHandler(),
|
||||
chainStarted: abool.New(),
|
||||
}
|
||||
markInitSyncComplete(t, r)
|
||||
|
||||
digest, err := r.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
p2pService.Digest = digest
|
||||
|
||||
baseTopic := "/eth2/%x/voluntary_exit"
|
||||
|
||||
r.markForChainStart()
|
||||
|
||||
// Track successful subscriptions
|
||||
successfulSubs := atomic.Int32{}
|
||||
checksPassed := atomic.Int32{}
|
||||
|
||||
// Barrier to synchronize goroutine starts
|
||||
var barrier sync.WaitGroup
|
||||
if tt.useBarrier {
|
||||
barrier.Add(tt.numGoroutines)
|
||||
}
|
||||
startSignal := make(chan struct{})
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Launch concurrent subscription attempts
|
||||
for i := 0; i < tt.numGoroutines; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
if tt.useBarrier {
|
||||
barrier.Done()
|
||||
barrier.Wait()
|
||||
}
|
||||
|
||||
<-startSignal
|
||||
|
||||
// Attempt subscription
|
||||
// ideally only one goroutine should get a non-nil subscription
|
||||
handler := func(ctx context.Context, msg proto.Message) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
sub := r.subscribeWithBase(baseTopic, r.noopValidator, handler)
|
||||
if sub != nil {
|
||||
successfulSubs.Add(1)
|
||||
}
|
||||
// Count how many goroutines attempted (for stats)
|
||||
checksPassed.Add(1)
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait for all goroutines to be ready
|
||||
if tt.useBarrier {
|
||||
barrier.Wait()
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Start all goroutines simultaneously
|
||||
close(startSignal)
|
||||
|
||||
// Wait for completion
|
||||
wg.Wait()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Check results
|
||||
subs := successfulSubs.Load()
|
||||
attempts := checksPassed.Load()
|
||||
|
||||
r.subHandler.RLock()
|
||||
finalMapSize := len(r.subHandler.subTopics)
|
||||
r.subHandler.RUnlock()
|
||||
|
||||
// ideally only ONE goroutine should successfully subscribe
|
||||
// If more than one succeeds, a race condition exists
|
||||
if subs > 1 {
|
||||
duplicateDetected++
|
||||
t.Logf("Iteration %d: RACE DETECTED - %d goroutines attempted, "+
|
||||
"%d successful subscriptions (expected 1), final map size: %d",
|
||||
iter, attempts, subs, finalMapSize)
|
||||
}
|
||||
|
||||
// The map should have exactly 0 or 1 entry
|
||||
if finalMapSize > 1 {
|
||||
t.Errorf("Iteration %d: INCONSISTENT STATE - map has %d entries (expected 0-1). "+
|
||||
"This indicates multiple goroutines subscribed concurrently.",
|
||||
iter, finalMapSize)
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
cancel()
|
||||
r.subHandler.Lock()
|
||||
for topic := range r.subHandler.subTopics {
|
||||
sub := r.subHandler.subTopics[topic]
|
||||
if sub != nil {
|
||||
sub.Cancel()
|
||||
}
|
||||
delete(r.subHandler.subTopics, topic)
|
||||
}
|
||||
r.subHandler.Unlock()
|
||||
}
|
||||
|
||||
if duplicateDetected > 0 {
|
||||
racePercentage := float64(duplicateDetected) / float64(tt.iterations) * 100
|
||||
t.Errorf("RACE CONDITION EXISTS in %d/%d iterations (%.1f%%). "+
|
||||
"Multiple goroutines successfully subscribed (only 1 expected). ",
|
||||
duplicateDetected, tt.iterations, racePercentage)
|
||||
} else {
|
||||
t.Logf("SUCCESS: No Race condition! Only 1 subscription succeeded in all %d iterations", tt.iterations)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestMemoryGrowth_SubscriptionFailures demonstrates memory growth over time
|
||||
func TestMemoryGrowth_SubscriptionFailures(t *testing.T) {
|
||||
p2pService := p2ptest.NewTestP2P(t)
|
||||
gt := time.Now()
|
||||
vr := [32]byte{'A'}
|
||||
|
||||
r := &Service{
|
||||
ctx: context.Background(),
|
||||
cfg: &config{
|
||||
p2p: p2pService,
|
||||
initialSync: &mockSync.Sync{IsSyncing: false},
|
||||
chain: &mockChain.ChainService{
|
||||
ValidatorsRoot: vr,
|
||||
Genesis: gt,
|
||||
},
|
||||
clock: startup.NewClock(gt, vr),
|
||||
},
|
||||
subHandler: newSubTopicHandler(),
|
||||
chainStarted: abool.New(),
|
||||
}
|
||||
markInitSyncComplete(t, r)
|
||||
|
||||
digest, err := r.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
p2pService.Digest = digest
|
||||
|
||||
baseTopic := "/eth2/%x/voluntary_exit"
|
||||
|
||||
getMapSize := func() int {
|
||||
r.subHandler.RLock()
|
||||
defer r.subHandler.RUnlock()
|
||||
return len(r.subHandler.subTopics)
|
||||
}
|
||||
|
||||
failures := 50
|
||||
var memStats runtime.MemStats
|
||||
|
||||
runtime.ReadMemStats(&memStats)
|
||||
startAlloc := memStats.Alloc
|
||||
|
||||
for i := 0; i < failures; i++ {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
r.ctx = ctx
|
||||
r.markForChainStart()
|
||||
|
||||
handler := func(ctx context.Context, msg proto.Message) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
sub := r.subscribeWithBase(baseTopic, r.noopValidator, handler)
|
||||
if sub != nil {
|
||||
// Cancel immediately to simulate failure
|
||||
cancel()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
if i%10 == 0 {
|
||||
runtime.ReadMemStats(&memStats)
|
||||
currentAlloc := memStats.Alloc
|
||||
growth := currentAlloc - startAlloc
|
||||
t.Logf("After %d failures: subTopics size=%d, heap growth=%d KB",
|
||||
i, getMapSize(), growth/1024)
|
||||
}
|
||||
}
|
||||
|
||||
finalSize := getMapSize()
|
||||
runtime.ReadMemStats(&memStats)
|
||||
finalAlloc := memStats.Alloc
|
||||
|
||||
t.Logf("Final results: %d subscription failures", failures)
|
||||
t.Logf(" subTopics map size: %d entries", finalSize)
|
||||
t.Logf(" Start heap: %d KB, Final heap: %d KB", startAlloc/1024, finalAlloc/1024)
|
||||
|
||||
// With the bug, even one stale entry is a problem because it prevents resubscription
|
||||
if finalSize > 0 {
|
||||
t.Errorf("MEMORY LEAK / STALE ENTRY: After %d failures, %d stale entries remain in subTopics map (expected 0). "+
|
||||
"Even 1 stale entry prevents resubscription, causing missed attestations in production.",
|
||||
failures, finalSize)
|
||||
}
|
||||
|
||||
// Check if heap grew significantly (handle wraparound by checking if finalAlloc >= startAlloc)
|
||||
if finalAlloc >= startAlloc {
|
||||
totalGrowth := finalAlloc - startAlloc
|
||||
if totalGrowth > 50*1024 { // 50 KB threshold
|
||||
t.Logf("NOTE: Heap grew by %d KB over %d failures. ",
|
||||
totalGrowth/1024, failures)
|
||||
}
|
||||
} else {
|
||||
t.Logf("NOTE: Heap decreased (GC ran), cannot measure growth accurately")
|
||||
}
|
||||
}
|
||||
@@ -26,13 +26,22 @@ func newSubTopicHandler() *subTopicHandler {
|
||||
func (s *subTopicHandler) addTopic(topic string, sub *pubsub.Subscription) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
// Check if this is updating a reserved entry (nil subscription)
|
||||
existingSub, exists := s.subTopics[topic]
|
||||
wasReserved := exists && existingSub == nil
|
||||
|
||||
s.subTopics[topic] = sub
|
||||
digest, err := p2p.ExtractGossipDigest(topic)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not retrieve digest")
|
||||
return
|
||||
}
|
||||
s.digestMap[digest] += 1
|
||||
|
||||
// Only increment digest count if this is a new topic (not just updating a reservation)
|
||||
if !wasReserved {
|
||||
s.digestMap[digest] += 1
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subTopicHandler) topicExists(topic string) bool {
|
||||
@@ -42,25 +51,57 @@ func (s *subTopicHandler) topicExists(topic string) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
// tryReserveTopic atomically checks if a topic exists and reserves it if not.
|
||||
// Returns true if the topic was successfully reserved (didn't exist before),
|
||||
// false if the topic already exists or is reserved.
|
||||
// This prevents the race condition where multiple goroutines check topicExists()
|
||||
// simultaneously and both proceed to subscribe.
|
||||
func (s *subTopicHandler) tryReserveTopic(topic string) bool {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
// Check if topic already exists or is reserved
|
||||
if _, exists := s.subTopics[topic]; exists {
|
||||
return false
|
||||
}
|
||||
|
||||
// Reserve the topic with a nil placeholder
|
||||
// This will be updated with the actual subscription later
|
||||
s.subTopics[topic] = nil
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *subTopicHandler) removeTopic(topic string) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
// Check if topic exists and whether it was just a reservation (nil)
|
||||
existingSub, exists := s.subTopics[topic]
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
wasReserved := existingSub == nil
|
||||
|
||||
delete(s.subTopics, topic)
|
||||
digest, err := p2p.ExtractGossipDigest(topic)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not retrieve digest")
|
||||
return
|
||||
}
|
||||
currAmt, ok := s.digestMap[digest]
|
||||
// Should never be possible, is a
|
||||
// defensive check.
|
||||
if !ok || currAmt <= 0 {
|
||||
delete(s.digestMap, digest)
|
||||
return
|
||||
}
|
||||
s.digestMap[digest] -= 1
|
||||
if s.digestMap[digest] == 0 {
|
||||
delete(s.digestMap, digest)
|
||||
|
||||
// Only decrement digest count if this wasn't just a reservation
|
||||
if !wasReserved {
|
||||
digest, err := p2p.ExtractGossipDigest(topic)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not retrieve digest")
|
||||
return
|
||||
}
|
||||
currAmt, ok := s.digestMap[digest]
|
||||
// Should never be possible, is a
|
||||
// defensive check.
|
||||
if !ok || currAmt <= 0 {
|
||||
delete(s.digestMap, digest)
|
||||
return
|
||||
}
|
||||
s.digestMap[digest] -= 1
|
||||
if s.digestMap[digest] == 0 {
|
||||
delete(s.digestMap, digest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
### Fixed
|
||||
|
||||
- Fixing Unsupported config field kind; value forwarded verbatim errors for type string.
|
||||
@@ -1,3 +0,0 @@
|
||||
### Fixed
|
||||
|
||||
- fix /eth/v1/config/spec endpoint to properly skip omitted values.
|
||||
@@ -1,3 +0,0 @@
|
||||
### Changed
|
||||
|
||||
- Add sources for compute_fork_digest to specrefs
|
||||
@@ -1,2 +0,0 @@
|
||||
### Changed
|
||||
- `c-kzg-4844`: Update from `v2.1.1` to `v2.1.5`
|
||||
@@ -1,2 +0,0 @@
|
||||
### Changed
|
||||
- Aggregate logs when broadcasting data column sidecars (one per root instead of one per sidecar)
|
||||
@@ -1,2 +0,0 @@
|
||||
### Ignored
|
||||
- P2P service: Remove unused clock.
|
||||
@@ -1,4 +0,0 @@
|
||||
### Fixed
|
||||
|
||||
- Fix ProduceSyncCommitteeContribution not returning error when committee index is out of range
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
### Ignored
|
||||
|
||||
- Changed github action runners from `ubuntu-latest` to `ubuntu-4`
|
||||
@@ -1,3 +0,0 @@
|
||||
### Changed
|
||||
|
||||
- Bazel builds with `--config=release` now properly apply `--strip=always` to strip debug symbols from the release assets.
|
||||
@@ -10,7 +10,6 @@ go_library(
|
||||
deps = [
|
||||
"//cmd/prysmctl/checkpointsync:go_default_library",
|
||||
"//cmd/prysmctl/db:go_default_library",
|
||||
"//cmd/prysmctl/logging:go_default_library",
|
||||
"//cmd/prysmctl/p2p:go_default_library",
|
||||
"//cmd/prysmctl/testnet:go_default_library",
|
||||
"//cmd/prysmctl/validator:go_default_library",
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"commands.go",
|
||||
"json_to_text.go",
|
||||
],
|
||||
importpath = "github.com/OffchainLabs/prysm/v6/cmd/prysmctl/logging",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//runtime/logging/logrus-prefixed-formatter:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@com_github_urfave_cli_v2//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["json_to_text_test.go"],
|
||||
deps = [
|
||||
":go_default_library",
|
||||
"//runtime/logging/logrus-prefixed-formatter:go_default_library",
|
||||
"//testing/assert:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
"@com_github_joonix_log//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
],
|
||||
)
|
||||
@@ -1,76 +0,0 @@
|
||||
package logging
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
var Commands = []*cli.Command{
|
||||
{
|
||||
Name: "logs",
|
||||
Aliases: []string{"l", "logging"},
|
||||
Usage: "Translate logs from fluentd or json to unstructured text logs",
|
||||
Flags: []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "from",
|
||||
Usage: "Input log format (fluentd, text, json)",
|
||||
Value: "fluentd",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "to",
|
||||
Usage: "Output log format (fluentd, text, json)",
|
||||
Value: "text",
|
||||
},
|
||||
},
|
||||
Action: func(ctx *cli.Context) error {
|
||||
from := ctx.String("from")
|
||||
to := ctx.String("to")
|
||||
|
||||
// Validate flags
|
||||
validFormats := map[string]bool{"fluentd": true, "text": true, "json": true}
|
||||
if !validFormats[from] {
|
||||
return fmt.Errorf("invalid --from format: %s. Must be one of: fluentd, text, json", from)
|
||||
}
|
||||
if !validFormats[to] {
|
||||
return fmt.Errorf("invalid --to format: %s. Must be one of: fluentd, text, json", to)
|
||||
}
|
||||
|
||||
// Only fluentd to text is currently implemented
|
||||
if from != "fluentd" || to != "text" {
|
||||
return fmt.Errorf("only fluentd to text translation is currently supported")
|
||||
}
|
||||
|
||||
// Read from stdin line by line
|
||||
scanner := bufio.NewScanner(os.Stdin)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
// Translate the log line
|
||||
translated, err := TranslateFluentdtoUnstructuredLog(line)
|
||||
if err != nil {
|
||||
// Write error to stderr and continue processing
|
||||
fmt.Fprintf(os.Stderr, "Error translating line: %v\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Write to stdout (without extra newline as TranslateFluentdtoUnstructuredLog adds one)
|
||||
if _, err := io.WriteString(os.Stdout, translated); err != nil {
|
||||
return fmt.Errorf("failed to write output: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
return fmt.Errorf("error reading input: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -1,108 +0,0 @@
|
||||
package logging
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
prefixed "github.com/OffchainLabs/prysm/v6/runtime/logging/logrus-prefixed-formatter"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// TranslateFluentdtoUnstructuredLog accepts a JSON object as a string and converts it to Prysm's
|
||||
// default unstructured text logger.
|
||||
func TranslateFluentdtoUnstructuredLog(s string) (string, error) {
|
||||
// Parse the JSON input
|
||||
var data map[string]interface{}
|
||||
if err := json.Unmarshal([]byte(s), &data); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Create a logrus entry
|
||||
entry := &logrus.Entry{
|
||||
Data: make(logrus.Fields),
|
||||
}
|
||||
|
||||
// Extract timestamp if present, otherwise use zero time
|
||||
// This matches the test expectations and is fine since we'll only
|
||||
// use this for translating existing logs that don't have timestamps
|
||||
if ts, ok := data["timestamp"].(string); ok {
|
||||
// Try to parse the timestamp
|
||||
if parsedTime, err := time.Parse(time.RFC3339, ts); err == nil {
|
||||
entry.Time = parsedTime
|
||||
} else {
|
||||
entry.Time = time.Time{} // Zero time if parse fails
|
||||
}
|
||||
delete(data, "timestamp")
|
||||
} else if ts, ok := data["time"].(string); ok {
|
||||
// Alternative field name
|
||||
if parsedTime, err := time.Parse(time.RFC3339, ts); err == nil {
|
||||
entry.Time = parsedTime
|
||||
} else {
|
||||
entry.Time = time.Time{} // Zero time if parse fails
|
||||
}
|
||||
delete(data, "time")
|
||||
} else {
|
||||
// No timestamp in JSON, use zero time (will show as 0001-01-01)
|
||||
entry.Time = time.Time{}
|
||||
}
|
||||
|
||||
// Extract message and severity
|
||||
if msg, ok := data["message"].(string); ok {
|
||||
entry.Message = msg
|
||||
delete(data, "message")
|
||||
}
|
||||
|
||||
if severity, ok := data["severity"].(string); ok {
|
||||
// Convert severity to logrus level
|
||||
level, err := logrus.ParseLevel(strings.ToLower(severity))
|
||||
if err != nil {
|
||||
// Default to info if we can't parse the level
|
||||
entry.Level = logrus.InfoLevel
|
||||
} else {
|
||||
entry.Level = level
|
||||
}
|
||||
delete(data, "severity")
|
||||
} else {
|
||||
entry.Level = logrus.InfoLevel
|
||||
}
|
||||
|
||||
// All remaining fields go into Data
|
||||
// Convert float64 to int64 if they're whole numbers to avoid scientific notation
|
||||
for k, v := range data {
|
||||
switch val := v.(type) {
|
||||
case float64:
|
||||
// Check if it's a whole number
|
||||
if val == float64(int64(val)) {
|
||||
entry.Data[k] = int64(val)
|
||||
} else {
|
||||
entry.Data[k] = val
|
||||
}
|
||||
case float32:
|
||||
// Check if it's a whole number
|
||||
if val == float32(int64(val)) {
|
||||
entry.Data[k] = int64(val)
|
||||
} else {
|
||||
entry.Data[k] = val
|
||||
}
|
||||
default:
|
||||
entry.Data[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
// Use the prefixed formatter to format the entry.
|
||||
formatter := &prefixed.TextFormatter{
|
||||
FullTimestamp: true,
|
||||
TimestampFormat: "2006-01-02 15:04:05.00", // Match beacon-chain format
|
||||
DisableColors: false,
|
||||
ForceColors: true, // Force colors even when not a TTY
|
||||
ForceFormatting: true, // Force formatted output even when not a TTY
|
||||
}
|
||||
|
||||
formatted, err := formatter.Format(entry)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return string(formatted), nil
|
||||
}
|
||||
@@ -1,95 +0,0 @@
|
||||
package logging_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/cmd/prysmctl/logging"
|
||||
prefixed "github.com/OffchainLabs/prysm/v6/runtime/logging/logrus-prefixed-formatter"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/assert"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||
joonix "github.com/joonix/log"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type testCase struct {
|
||||
input string
|
||||
output string
|
||||
}
|
||||
|
||||
func TestTranslateFluentdtoUnstructuredLog(t *testing.T) {
|
||||
tests := []testCase{
|
||||
createTestCaseFluentdToText(t, &logrus.Entry{
|
||||
Data: logrus.Fields{
|
||||
"prefix": "p2p",
|
||||
"error": "something really bad happened",
|
||||
"slot": 529,
|
||||
},
|
||||
Level: logrus.DebugLevel,
|
||||
Message: "Failed to process something not very important",
|
||||
}),
|
||||
createTestCaseFluentdToText(t, &logrus.Entry{
|
||||
Data: logrus.Fields{
|
||||
"prefix": "core",
|
||||
"error": "something really really bad happened",
|
||||
"slot": 530,
|
||||
},
|
||||
Level: logrus.ErrorLevel,
|
||||
Message: "Failed to process something very important",
|
||||
}),
|
||||
createTestCaseFluentdToText(t, &logrus.Entry{
|
||||
Data: logrus.Fields{
|
||||
"prefix": "core",
|
||||
"slot": 100_000_000,
|
||||
"hash": "0xabcdef",
|
||||
},
|
||||
Level: logrus.InfoLevel,
|
||||
Message: "Processed something successfully",
|
||||
}),
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
t.Run(fmt.Sprintf("scenario_%d", i), func(t *testing.T) {
|
||||
t.Logf("Input was %v", tt.input)
|
||||
got, err := logging.TranslateFluentdtoUnstructuredLog(tt.input)
|
||||
assert.NoError(t, err)
|
||||
require.Equal(t, tt.output, got, "Did not get expected output")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func createTestCaseFluentdToText(t *testing.T, e *logrus.Entry) testCase {
|
||||
return testCase{
|
||||
input: logToString(t, fluentdFormat(t), e),
|
||||
output: logToString(t, textFormat(), e),
|
||||
}
|
||||
}
|
||||
|
||||
type formatter interface {
|
||||
Format(entry *logrus.Entry) ([]byte, error)
|
||||
}
|
||||
|
||||
func logToString(t *testing.T, f formatter, e *logrus.Entry) string {
|
||||
b, err := f.Format(e)
|
||||
require.NoError(t, err)
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func fluentdFormat(t *testing.T) formatter {
|
||||
f := joonix.NewFormatter()
|
||||
|
||||
require.NoError(t, joonix.DisableTimestampFormat(f))
|
||||
return f
|
||||
|
||||
}
|
||||
|
||||
func textFormat() formatter {
|
||||
formatter := new(prefixed.TextFormatter)
|
||||
formatter.FullTimestamp = true
|
||||
formatter.TimestampFormat = "2006-01-02 15:04:05.00"
|
||||
formatter.DisableColors = false
|
||||
formatter.ForceColors = true // Force colors to match the implementation
|
||||
formatter.ForceFormatting = true // Force formatted output
|
||||
|
||||
return formatter
|
||||
}
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/cmd/prysmctl/checkpointsync"
|
||||
"github.com/OffchainLabs/prysm/v6/cmd/prysmctl/db"
|
||||
"github.com/OffchainLabs/prysm/v6/cmd/prysmctl/logging"
|
||||
"github.com/OffchainLabs/prysm/v6/cmd/prysmctl/p2p"
|
||||
"github.com/OffchainLabs/prysm/v6/cmd/prysmctl/testnet"
|
||||
"github.com/OffchainLabs/prysm/v6/cmd/prysmctl/validator"
|
||||
@@ -33,5 +32,4 @@ func init() {
|
||||
prysmctlCommands = append(prysmctlCommands, testnet.Commands...)
|
||||
prysmctlCommands = append(prysmctlCommands, weaksubjectivity.Commands...)
|
||||
prysmctlCommands = append(prysmctlCommands, validator.Commands...)
|
||||
prysmctlCommands = append(prysmctlCommands, logging.Commands...)
|
||||
}
|
||||
|
||||
8
deps.bzl
8
deps.bzl
@@ -776,8 +776,8 @@ def prysm_deps():
|
||||
importpath = "github.com/ethereum/c-kzg-4844/v2",
|
||||
patch_args = ["-p1"],
|
||||
patches = ["//third_party:com_github_ethereum_c_kzg_4844.patch"],
|
||||
sum = "h1:aVtoLK5xwJ6c5RiqO8g8ptJ5KU+2Hdquf6G3aXiHh5s=",
|
||||
version = "v2.1.5",
|
||||
sum = "h1:KhzBVjmURsfr1+S3k/VE35T02+AW2qU9t9gr4R6YpSo=",
|
||||
version = "v2.1.1",
|
||||
)
|
||||
go_repository(
|
||||
name = "com_github_ethereum_go_ethereum",
|
||||
@@ -3318,8 +3318,8 @@ def prysm_deps():
|
||||
importpath = "github.com/supranational/blst",
|
||||
patch_args = ["-p1"],
|
||||
patches = ["//third_party:com_github_supranational_blst.patch"],
|
||||
sum = "h1:nbdqkIGOGfUAD54q1s2YBcBz/WcsxCO9HUQ4aGV5hUw=",
|
||||
version = "v0.3.16-0.20250831170142-f48500c1fdbe",
|
||||
sum = "h1:xNMoHRJOTwMn63ip6qoWJ2Ymgvj7E2b9jY2FAwY+qRo=",
|
||||
version = "v0.3.14",
|
||||
)
|
||||
go_repository(
|
||||
name = "com_github_syndtr_goleveldb",
|
||||
|
||||
4
go.mod
4
go.mod
@@ -14,7 +14,7 @@ require (
|
||||
github.com/dgraph-io/ristretto/v2 v2.2.0
|
||||
github.com/dustin/go-humanize v1.0.1
|
||||
github.com/emicklei/dot v0.11.0
|
||||
github.com/ethereum/c-kzg-4844/v2 v2.1.5
|
||||
github.com/ethereum/c-kzg-4844/v2 v2.1.1
|
||||
github.com/ethereum/go-ethereum v1.15.9
|
||||
github.com/fsnotify/fsnotify v1.6.0
|
||||
github.com/ghodss/yaml v1.0.0
|
||||
@@ -70,7 +70,7 @@ require (
|
||||
github.com/spf13/afero v1.10.0
|
||||
github.com/status-im/keycard-go v0.2.0
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe
|
||||
github.com/supranational/blst v0.3.14
|
||||
github.com/thomaso-mirodin/intmath v0.0.0-20160323211736-5dc6d854e46e
|
||||
github.com/trailofbits/go-mutexasserts v0.0.0-20250212181730-4c2b8e9e784b
|
||||
github.com/tyler-smith/go-bip39 v1.1.0
|
||||
|
||||
8
go.sum
8
go.sum
@@ -234,8 +234,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/ethereum/c-kzg-4844 v1.0.0 h1:0X1LBXxaEtYD9xsyj9B9ctQEZIpnvVDeoBx8aHEwTNA=
|
||||
github.com/ethereum/c-kzg-4844 v1.0.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0=
|
||||
github.com/ethereum/c-kzg-4844/v2 v2.1.5 h1:aVtoLK5xwJ6c5RiqO8g8ptJ5KU+2Hdquf6G3aXiHh5s=
|
||||
github.com/ethereum/c-kzg-4844/v2 v2.1.5/go.mod h1:u59hRTTah4Co6i9fDWtiCjTrblJv0UwsqZKCc0GfgUs=
|
||||
github.com/ethereum/c-kzg-4844/v2 v2.1.1 h1:KhzBVjmURsfr1+S3k/VE35T02+AW2qU9t9gr4R6YpSo=
|
||||
github.com/ethereum/c-kzg-4844/v2 v2.1.1/go.mod h1:TC48kOKjJKPbN7C++qIgt0TJzZ70QznYR7Ob+WXl57E=
|
||||
github.com/ethereum/go-ethereum v1.15.9 h1:bRra1zi+/q+qyXZ6fylZOrlaF8kDdnlTtzNTmNHfX+g=
|
||||
github.com/ethereum/go-ethereum v1.15.9/go.mod h1:+S9k+jFzlyVTNcYGvqFhzN/SFhI6vA+aOY4T5tLSPL0=
|
||||
github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8=
|
||||
@@ -1021,8 +1021,8 @@ github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
|
||||
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe h1:nbdqkIGOGfUAD54q1s2YBcBz/WcsxCO9HUQ4aGV5hUw=
|
||||
github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw=
|
||||
github.com/supranational/blst v0.3.14 h1:xNMoHRJOTwMn63ip6qoWJ2Ymgvj7E2b9jY2FAwY+qRo=
|
||||
github.com/supranational/blst v0.3.14/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw=
|
||||
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=
|
||||
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
|
||||
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
|
||||
|
||||
@@ -18,6 +18,7 @@ exceptions:
|
||||
- UPDATE_TIMEOUT#altair
|
||||
|
||||
# Not implemented: gloas (future fork)
|
||||
- KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH_GLOAS#gloas
|
||||
- MAX_PAYLOAD_ATTESTATIONS#gloas
|
||||
- PTC_SIZE#gloas
|
||||
|
||||
@@ -214,6 +215,7 @@ exceptions:
|
||||
|
||||
# Not implemented: altair
|
||||
- compute_sync_committee_period_at_slot#altair
|
||||
- compute_timestamp_at_slot#bellatrix
|
||||
- get_contribution_and_proof#altair
|
||||
- get_contribution_due_ms#altair
|
||||
- get_index_for_new_validator#altair
|
||||
@@ -283,10 +285,12 @@ exceptions:
|
||||
- upgrade_lc_update_to_electra#electra
|
||||
|
||||
# Not implemented: fulu
|
||||
- compute_fork_digest#fulu
|
||||
- compute_matrix#fulu
|
||||
- get_blob_parameters#fulu
|
||||
- get_data_column_sidecars_from_block#fulu
|
||||
- get_data_column_sidecars_from_column_sidecar#fulu
|
||||
- get_extended_sample_count#fulu
|
||||
- recover_matrix#fulu
|
||||
|
||||
# Not implemented: gloas (future fork)
|
||||
|
||||
@@ -1,10 +1,3 @@
|
||||
- name: AGGREGATE_DUE_BPS
|
||||
sources: []
|
||||
spec: |
|
||||
<spec config_var="AGGREGATE_DUE_BPS" fork="phase0" hash="7eaa811a">
|
||||
AGGREGATE_DUE_BPS: uint64 = 6667
|
||||
</spec>
|
||||
|
||||
- name: ALTAIR_FORK_EPOCH
|
||||
sources:
|
||||
- file: config/params/config.go
|
||||
@@ -25,11 +18,11 @@
|
||||
ALTAIR_FORK_VERSION: Version = '0x01000000'
|
||||
</spec>
|
||||
|
||||
- name: ATTESTATION_DUE_BPS
|
||||
- name: AGGREGATE_DUE_BPS
|
||||
sources: []
|
||||
spec: |
|
||||
<spec config_var="ATTESTATION_DUE_BPS" fork="phase0" hash="929dd1c9">
|
||||
ATTESTATION_DUE_BPS: uint64 = 3333
|
||||
<spec config_var="AGGREGATE_DUE_BPS" fork="phase0" hash="7eaa811a">
|
||||
AGGREGATE_DUE_BPS: uint64 = 6667
|
||||
</spec>
|
||||
|
||||
- name: ATTESTATION_PROPAGATION_SLOT_RANGE
|
||||
@@ -72,6 +65,13 @@
|
||||
ATTESTATION_SUBNET_PREFIX_BITS: int = 6
|
||||
</spec>
|
||||
|
||||
- name: ATTESTATION_DUE_BPS
|
||||
sources: []
|
||||
spec: |
|
||||
<spec config_var="ATTESTATION_DUE_BPS" fork="phase0" hash="929dd1c9">
|
||||
ATTESTATION_DUE_BPS: uint64 = 3333
|
||||
</spec>
|
||||
|
||||
- name: BALANCE_PER_ADDITIONAL_CUSTODY_GROUP
|
||||
sources:
|
||||
- file: config/params/config.go
|
||||
@@ -163,13 +163,6 @@
|
||||
CHURN_LIMIT_QUOTIENT: uint64 = 65536
|
||||
</spec>
|
||||
|
||||
- name: CONTRIBUTION_DUE_BPS
|
||||
sources: []
|
||||
spec: |
|
||||
<spec config_var="CONTRIBUTION_DUE_BPS" fork="altair" hash="a3808203">
|
||||
CONTRIBUTION_DUE_BPS: uint64 = 6667
|
||||
</spec>
|
||||
|
||||
- name: CUSTODY_REQUIREMENT
|
||||
sources:
|
||||
- file: config/params/config.go
|
||||
@@ -540,13 +533,6 @@
|
||||
NUMBER_OF_CUSTODY_GROUPS = 128
|
||||
</spec>
|
||||
|
||||
- name: PROPOSER_REORG_CUTOFF_BPS
|
||||
sources: []
|
||||
spec: |
|
||||
<spec config_var="PROPOSER_REORG_CUTOFF_BPS" fork="phase0" hash="a487cc43">
|
||||
PROPOSER_REORG_CUTOFF_BPS: uint64 = 1667
|
||||
</spec>
|
||||
|
||||
- name: PROPOSER_SCORE_BOOST
|
||||
sources:
|
||||
- file: config/params/config.go
|
||||
@@ -557,6 +543,13 @@
|
||||
PROPOSER_SCORE_BOOST: uint64 = 40
|
||||
</spec>
|
||||
|
||||
- name: PROPOSER_REORG_CUTOFF_BPS
|
||||
sources: []
|
||||
spec: |
|
||||
<spec config_var="PROPOSER_REORG_CUTOFF_BPS" fork="phase0" hash="a487cc43">
|
||||
PROPOSER_REORG_CUTOFF_BPS: uint64 = 1667
|
||||
</spec>
|
||||
|
||||
- name: REORG_HEAD_WEIGHT_THRESHOLD
|
||||
sources:
|
||||
- file: config/params/config.go
|
||||
@@ -617,6 +610,13 @@
|
||||
SECONDS_PER_SLOT: uint64 = 12
|
||||
</spec>
|
||||
|
||||
- name: SLOT_DURATION_MS
|
||||
sources: []
|
||||
spec: |
|
||||
<spec config_var="SLOT_DURATION_MS" fork="phase0" hash="b6d4ba6d">
|
||||
SLOT_DURATION_MS: uint64 = 12000
|
||||
</spec>
|
||||
|
||||
- name: SHARD_COMMITTEE_PERIOD
|
||||
sources:
|
||||
- file: config/params/config.go
|
||||
@@ -627,13 +627,6 @@
|
||||
SHARD_COMMITTEE_PERIOD: uint64 = 256
|
||||
</spec>
|
||||
|
||||
- name: SLOT_DURATION_MS
|
||||
sources: []
|
||||
spec: |
|
||||
<spec config_var="SLOT_DURATION_MS" fork="phase0" hash="b6d4ba6d">
|
||||
SLOT_DURATION_MS: uint64 = 12000
|
||||
</spec>
|
||||
|
||||
- name: SUBNETS_PER_NODE
|
||||
sources:
|
||||
- file: config/params/config.go
|
||||
@@ -644,13 +637,6 @@
|
||||
SUBNETS_PER_NODE = 2
|
||||
</spec>
|
||||
|
||||
- name: SYNC_MESSAGE_DUE_BPS
|
||||
sources: []
|
||||
spec: |
|
||||
<spec config_var="SYNC_MESSAGE_DUE_BPS" fork="altair" hash="791b29d8">
|
||||
SYNC_MESSAGE_DUE_BPS: uint64 = 3333
|
||||
</spec>
|
||||
|
||||
- name: TERMINAL_BLOCK_HASH
|
||||
sources:
|
||||
- file: config/params/config.go
|
||||
@@ -690,3 +676,18 @@
|
||||
<spec config_var="VALIDATOR_CUSTODY_REQUIREMENT" fork="fulu" hash="4dfc4457">
|
||||
VALIDATOR_CUSTODY_REQUIREMENT = 8
|
||||
</spec>
|
||||
|
||||
- name: CONTRIBUTION_DUE_BPS
|
||||
sources: []
|
||||
spec: |
|
||||
<spec config_var="CONTRIBUTION_DUE_BPS" fork="altair" hash="a3808203">
|
||||
CONTRIBUTION_DUE_BPS: uint64 = 6667
|
||||
</spec>
|
||||
|
||||
- name: SYNC_MESSAGE_DUE_BPS
|
||||
sources: []
|
||||
spec: |
|
||||
<spec config_var="SYNC_MESSAGE_DUE_BPS" fork="altair" hash="791b29d8">
|
||||
SYNC_MESSAGE_DUE_BPS: uint64 = 3333
|
||||
</spec>
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
|
||||
- name: BASE_REWARDS_PER_EPOCH
|
||||
sources:
|
||||
- file: config/params/config.go
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
|
||||
- name: BlobParameters
|
||||
sources: []
|
||||
spec: |
|
||||
|
||||
@@ -527,11 +527,7 @@
|
||||
</spec>
|
||||
|
||||
- name: compute_fork_digest#fulu
|
||||
sources:
|
||||
- file: config/params/fork.go
|
||||
search: func ForkDigest(
|
||||
- file: config/params/config.go
|
||||
search: func entryWithForkDigest(
|
||||
sources: []
|
||||
spec: |
|
||||
<spec fn="compute_fork_digest" fork="fulu" hash="e916a595">
|
||||
def compute_fork_digest(
|
||||
@@ -3144,17 +3140,6 @@
|
||||
return hash(domain_type + uint_to_bytes(epoch) + mix)
|
||||
</spec>
|
||||
|
||||
- name: get_slot_component_duration_ms
|
||||
sources: []
|
||||
spec: |
|
||||
<spec fn="get_slot_component_duration_ms" fork="phase0" hash="b81504df">
|
||||
def get_slot_component_duration_ms(basis_points: uint64) -> uint64:
|
||||
"""
|
||||
Calculate the duration of a slot component in milliseconds.
|
||||
"""
|
||||
return basis_points * SLOT_DURATION_MS // BASIS_POINTS
|
||||
</spec>
|
||||
|
||||
- name: get_slot_signature
|
||||
sources: []
|
||||
spec: |
|
||||
@@ -3175,6 +3160,17 @@
|
||||
return (store.time - store.genesis_time) // SECONDS_PER_SLOT
|
||||
</spec>
|
||||
|
||||
- name: get_slot_component_duration_ms
|
||||
sources: []
|
||||
spec: |
|
||||
<spec fn="get_slot_component_duration_ms" fork="phase0" hash="b81504df">
|
||||
def get_slot_component_duration_ms(basis_points: uint64) -> uint64:
|
||||
"""
|
||||
Calculate the duration of a slot component in milliseconds.
|
||||
"""
|
||||
return basis_points * SLOT_DURATION_MS // BASIS_POINTS
|
||||
</spec>
|
||||
|
||||
- name: get_source_deltas
|
||||
sources:
|
||||
- file: beacon-chain/core/epoch/precompute/reward_penalty.go
|
||||
@@ -7283,20 +7279,6 @@
|
||||
return a - b if a > b else 0
|
||||
</spec>
|
||||
|
||||
- name: seconds_to_milliseconds
|
||||
sources: []
|
||||
spec: |
|
||||
<spec fn="seconds_to_milliseconds" fork="phase0" hash="b2cc9743">
|
||||
def seconds_to_milliseconds(seconds: uint64) -> uint64:
|
||||
"""
|
||||
Convert seconds to milliseconds with overflow protection.
|
||||
Returns ``UINT64_MAX`` if the result would overflow.
|
||||
"""
|
||||
if seconds > UINT64_MAX // 1000:
|
||||
return UINT64_MAX
|
||||
return seconds * 1000
|
||||
</spec>
|
||||
|
||||
- name: set_or_append_list
|
||||
sources: []
|
||||
spec: |
|
||||
@@ -7534,6 +7516,20 @@
|
||||
assert block.state_root == hash_tree_root(state)
|
||||
</spec>
|
||||
|
||||
- name: seconds_to_milliseconds
|
||||
sources: []
|
||||
spec: |
|
||||
<spec fn="seconds_to_milliseconds" fork="phase0" hash="b2cc9743">
|
||||
def seconds_to_milliseconds(seconds: uint64) -> uint64:
|
||||
"""
|
||||
Convert seconds to milliseconds with overflow protection.
|
||||
Returns ``UINT64_MAX`` if the result would overflow.
|
||||
"""
|
||||
if seconds > UINT64_MAX // 1000:
|
||||
return UINT64_MAX
|
||||
return seconds * 1000
|
||||
</spec>
|
||||
|
||||
- name: store_target_checkpoint_state
|
||||
sources: []
|
||||
spec: |
|
||||
|
||||
Reference in New Issue
Block a user