Always pass an item through channel when processing blocks (#728)

This commit is contained in:
Yutaro Mori
2018-11-06 21:48:11 +01:00
committed by GitHub
parent 15563ae265
commit 278a81032f
2 changed files with 84 additions and 77 deletions

View File

@@ -3,6 +3,7 @@ package blockchain
import (
"context"
"errors"
"fmt"
"time"
@@ -134,6 +135,10 @@ func (c *ChainService) updateHead(processedBlock <-chan *types.Block) {
case <-c.ctx.Done():
return
case block := <-processedBlock:
if block == nil {
continue
}
h, err := block.Hash()
if err != nil {
log.Errorf("Could not hash incoming block: %v", err)
@@ -235,88 +240,89 @@ func (c *ChainService) blockProcessing(processedBlock chan<- *types.Block) {
// Listen for a newly received incoming block from the sync service.
case block := <-c.incomingBlockChan:
blockHash, err := block.Hash()
if err != nil {
log.Errorf("Failed to get hash of block: %v", err)
if err := c.processBlock(block); err != nil {
log.Error(err)
processedBlock <- nil
continue
}
if c.enablePOWChain && !c.doesPoWBlockExist(block) {
log.Errorf("Proof-of-Work chain reference in block does not exist")
continue
}
parent, err := c.beaconDB.GetBlock(block.ParentHash())
if err != nil {
log.Errorf("Could not get parent block: %v", err)
continue
}
if parent == nil {
log.Errorf("Block points to nil parent: %#x", block.ParentHash())
continue
}
aState, err := c.beaconDB.GetActiveState()
if err != nil {
log.Errorf("Failed to get active state: %v", err)
}
cState, err := c.beaconDB.GetCrystallizedState()
if err != nil {
log.Errorf("Failed to get crystallized state: %v", err)
}
if valid := block.IsValid(
c.beaconDB,
aState,
cState,
parent.SlotNumber(),
c.genesisTime,
); !valid {
log.Debug("Block failed validity conditions")
continue
}
// If the block is valid, we compute its associated state tuple (active, crystallized)
// and apply a block scoring function.
var didCycleTransition bool
if cState.IsCycleTransition(block.SlotNumber()) {
cState, err = c.executeStateTransition(cState, aState, block)
if err != nil {
log.Errorf("Initialize new cycle transition failed: %v", err)
continue
}
didCycleTransition = true
}
aState, err = aState.CalculateNewActiveState(
block,
cState,
parent.SlotNumber(),
)
if err != nil {
log.Errorf("Compute active state failed: %v", err)
continue
}
if err := c.beaconDB.SaveBlock(block); err != nil {
log.Errorf("Failed to save block: %v", err)
continue
}
if err := c.beaconDB.SaveUnfinalizedBlockState(aState, cState); err != nil {
log.Errorf("Error persisting unfinalized block's state: %v", err)
continue
}
log.Infof("Processed block: %#x", blockHash)
c.unfinalizedBlocks[blockHash] = &statePair{
crystallizedState: cState,
activeState: aState,
cycleTransition: didCycleTransition,
}
// Push the block to trigger the fork choice rule.
processedBlock <- block
}
}
}
func (c *ChainService) processBlock(block *types.Block) error {
blockHash, err := block.Hash()
if err != nil {
return fmt.Errorf("Failed to get hash of block: %v", err)
}
if c.enablePOWChain && !c.doesPoWBlockExist(block) {
return errors.New("Proof-of-Work chain reference in block does not exist")
}
parent, err := c.beaconDB.GetBlock(block.ParentHash())
if err != nil {
return fmt.Errorf("Could not get parent block: %v", err)
}
if parent == nil {
return fmt.Errorf("Block points to nil parent: %#x", block.ParentHash())
}
aState, err := c.beaconDB.GetActiveState()
if err != nil {
return fmt.Errorf("Failed to get active state: %v", err)
}
cState, err := c.beaconDB.GetCrystallizedState()
if err != nil {
return fmt.Errorf("Failed to get crystallized state: %v", err)
}
if valid := block.IsValid(
c.beaconDB,
aState,
cState,
parent.SlotNumber(),
c.genesisTime,
); !valid {
return errors.New("Block failed validity conditions")
}
// If the block is valid, we compute its associated state tuple (active, crystallized)
// and apply a block scoring function.
var didCycleTransition bool
if cState.IsCycleTransition(block.SlotNumber()) {
cState, err = c.executeStateTransition(cState, aState, block)
if err != nil {
return fmt.Errorf("Initialize new cycle transition failed: %v", err)
}
didCycleTransition = true
}
aState, err = aState.CalculateNewActiveState(
block,
cState,
parent.SlotNumber(),
)
if err != nil {
return fmt.Errorf("Compute active state failed: %v", err)
}
if err := c.beaconDB.SaveBlock(block); err != nil {
return fmt.Errorf("Failed to save block: %v", err)
}
if err := c.beaconDB.SaveUnfinalizedBlockState(aState, cState); err != nil {
return fmt.Errorf("Error persisting unfinalized block's state: %v", err)
}
log.Infof("Processed block: %#x", blockHash)
c.unfinalizedBlocks[blockHash] = &statePair{
crystallizedState: cState,
activeState: aState,
cycleTransition: didCycleTransition,
}
return nil
}

View File

@@ -144,6 +144,7 @@ func TestRunningChainServiceFaultyPOWChain(t *testing.T) {
}
chainService.incomingBlockChan <- block
<-blockChan
chainService.cancel()
exitRoutine <- true