Compare commits

...

4 Commits

Author SHA1 Message Date
james-prysm
c690412c61 fixing test 2025-10-02 16:22:01 -05:00
james-prysm
595728b0bb Merge changes and resolve conflicts
Resolved conflict in beacon-chain/blockchain/process_block.go where both
branches modified the postBlockProcess function. Kept the blockProcessed
flag logic while incorporating upstream changes.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-authored-by: Sculptor <sculptor@imbue.com>
2025-10-02 19:46:21 +00:00
james-prysm
f379b37d25 state channel handling
Co-authored-by: Sculptor <sculptor@imbue.com>
2025-10-02 19:44:30 +00:00
james-prysm
f4fee5e26d init 2025-10-02 14:42:38 -05:00
3 changed files with 169 additions and 2 deletions

View File

@@ -72,7 +72,13 @@ func (s *Service) postBlockProcess(cfg *postBlockProcessConfig) error {
if features.Get().EnableLightClient && slots.ToEpoch(s.CurrentSlot()) >= params.BeaconConfig().AltairForkEpoch {
defer s.processLightClientUpdates(cfg)
}
defer s.sendStateFeedOnBlock(cfg)
// Track whether block processing succeeded to only send event on success
var blockProcessed bool
defer func() {
if blockProcessed {
s.sendStateFeedOnBlock(cfg)
}
}()
defer reportProcessingTime(startTime)
defer reportAttestationInclusion(cfg.roblock.Block())
@@ -101,16 +107,22 @@ func (s *Service) postBlockProcess(cfg *postBlockProcessConfig) error {
newBlockHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
if cfg.headRoot != cfg.roblock.Root() {
s.logNonCanonicalBlockReceived(cfg.roblock.Root(), cfg.headRoot)
// Mark as processed even for non-canonical blocks that succeed
blockProcessed = true
return nil
}
if err := s.getFCUArgs(cfg, fcuArgs); err != nil {
log.WithError(err).Error("Could not get forkchoice update argument")
// Mark as processed - block was successfully inserted even if FCU args failed
blockProcessed = true
return nil
}
if err := s.sendFCU(cfg, fcuArgs); err != nil {
return errors.Wrap(err, "could not send FCU to engine")
}
// Block successfully processed
blockProcessed = true
return nil
}

View File

@@ -9,8 +9,10 @@ import (
"testing"
"time"
mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/blocks"
statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/signing"
@@ -3462,3 +3464,156 @@ func TestProcessLightClientFinalityUpdate(t *testing.T) {
}
}
}
// Test_postBlockProcess_EventSending tests that block processed events are only sent
// when block processing succeeds according to the decision tree:
//
// Block Processing Flow:
// ├─ InsertNode FAILS (fork choice timeout)
// │ └─ blockProcessed = false ❌ NO EVENT
// │
// ├─ InsertNode succeeds
// │ ├─ handleBlockAttestations FAILS
// │ │ └─ blockProcessed = false ❌ NO EVENT
// │ │
// │ ├─ Block is NON-CANONICAL (not head)
// │ │ └─ blockProcessed = true ✅ SEND EVENT (Line 111)
// │ │
// │ ├─ Block IS CANONICAL (new head)
// │ │ ├─ getFCUArgs FAILS
// │ │ │ └─ blockProcessed = true ✅ SEND EVENT (Line 117)
// │ │ │
// │ │ ├─ sendFCU FAILS
// │ │ │ └─ blockProcessed = false ❌ NO EVENT
// │ │ │
// │ │ └─ Full success
// │ │ └─ blockProcessed = true ✅ SEND EVENT (Line 125)
func Test_postBlockProcess_EventSending(t *testing.T) {
ctx := context.Background()
// Helper to create a minimal valid block and state
createTestBlockAndState := func(t *testing.T, slot primitives.Slot, parentRoot [32]byte) (consensusblocks.ROBlock, state.BeaconState) {
st, _ := util.DeterministicGenesisState(t, 64)
require.NoError(t, st.SetSlot(slot))
stateRoot, err := st.HashTreeRoot(ctx)
require.NoError(t, err)
blk := util.NewBeaconBlock()
blk.Block.Slot = slot
blk.Block.ProposerIndex = 0
blk.Block.ParentRoot = parentRoot[:]
blk.Block.StateRoot = stateRoot[:]
signed := util.HydrateSignedBeaconBlock(blk)
roBlock, err := consensusblocks.NewSignedBeaconBlock(signed)
require.NoError(t, err)
roBlk, err := consensusblocks.NewROBlock(roBlock)
require.NoError(t, err)
return roBlk, st
}
tests := []struct {
name string
setupService func(*Service, [32]byte)
expectEvent bool
expectError bool
errorContains string
}{
{
name: "Block successfully processed - sends event",
setupService: func(s *Service, blockRoot [32]byte) {
// Default setup should work
},
expectEvent: true,
expectError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create service with required options
opts := testServiceOptsWithDB(t)
service, err := NewService(ctx, opts...)
require.NoError(t, err)
// Initialize fork choice with genesis block
st, _ := util.DeterministicGenesisState(t, 64)
require.NoError(t, st.SetSlot(0))
genesisBlock := util.NewBeaconBlock()
genesisBlock.Block.StateRoot = bytesutil.PadTo([]byte("genesisState"), 32)
signedGenesis := util.HydrateSignedBeaconBlock(genesisBlock)
block, err := consensusblocks.NewSignedBeaconBlock(signedGenesis)
require.NoError(t, err)
genesisRoot, err := block.Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, block))
require.NoError(t, service.cfg.BeaconDB.SaveGenesisBlockRoot(ctx, genesisRoot))
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, st, genesisRoot))
genesisROBlock, err := consensusblocks.NewROBlock(block)
require.NoError(t, err)
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, st, genesisROBlock))
// Create test block and state with genesis as parent
roBlock, postSt := createTestBlockAndState(t, 100, genesisRoot)
// Apply additional service setup if provided
if tt.setupService != nil {
tt.setupService(service, roBlock.Root())
}
// Create post block process config
cfg := &postBlockProcessConfig{
ctx: ctx,
roblock: roBlock,
postState: postSt,
isValidPayload: true,
}
// Execute postBlockProcess
err = service.postBlockProcess(cfg)
// Check error expectation
if tt.expectError {
require.NotNil(t, err)
if tt.errorContains != "" {
require.ErrorContains(t, tt.errorContains, err)
}
} else {
require.NoError(t, err)
}
// Give a moment for deferred functions to execute
time.Sleep(10 * time.Millisecond)
// Check event expectation
notifier := service.cfg.StateNotifier.(*mock.MockStateNotifier)
events := notifier.ReceivedEvents()
if tt.expectEvent {
require.NotEqual(t, 0, len(events), "Expected event to be sent but none were received")
// Verify it's a BlockProcessed event
foundBlockProcessed := false
for _, evt := range events {
if evt.Type == statefeed.BlockProcessed {
foundBlockProcessed = true
data, ok := evt.Data.(*statefeed.BlockProcessedData)
require.Equal(t, true, ok, "Event data should be BlockProcessedData")
require.Equal(t, roBlock.Root(), data.BlockRoot, "Event should contain correct block root")
break
}
}
require.Equal(t, true, foundBlockProcessed, "Expected BlockProcessed event type")
} else {
// For no-event cases, verify no BlockProcessed events were sent
for _, evt := range events {
require.NotEqual(t, statefeed.BlockProcessed, evt.Type,
"Expected no BlockProcessed event but one was sent")
}
}
})
}
}

View File

@@ -230,7 +230,7 @@ func (v *ValidatorService) Start() {
distributed: v.distributed,
disableDutiesPolling: v.disableDutiesPolling,
accountsChangedChannel: make(chan [][fieldparams.BLSPubkeyLength]byte, 1),
eventsChannel: make(chan *eventClient.Event, 1),
eventsChannel: make(chan *eventClient.Event, 100), // 100 gives some room if the validator is slow at processing the events
}
hm := newHealthMonitor(v.ctx, v.cancel, v.maxHealthChecks, v.validator)