mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 13:28:01 -05:00
sharding: fix race condition in simulator tests (#237)
Former-commit-id: d53651fc87d713f29b458544eec8f060c0f2a284 [formerly 64adf38c5615fd6b833361e8836f2f97d089b78c] Former-commit-id: 896ca03a66d8fa727897eedb970c5088db2e53b9
This commit is contained in:
@@ -41,6 +41,7 @@ func (t *LogHandler) Len() int {
|
||||
func (h *LogHandler) VerifyLogMsg(str string) {
|
||||
if h.Len() == 0 {
|
||||
h.t.Error("Expected a log, but there were none!")
|
||||
return
|
||||
}
|
||||
if l := h.Pop(); l.Msg != str {
|
||||
h.t.Errorf("Unexpected log: %v. Wanted: %s", l.Msg, str)
|
||||
|
||||
@@ -13,7 +13,6 @@ import (
|
||||
"github.com/prysmaticlabs/geth-sharding/sharding/p2p/messages"
|
||||
"github.com/prysmaticlabs/geth-sharding/sharding/params"
|
||||
"github.com/prysmaticlabs/geth-sharding/sharding/syncer"
|
||||
"github.com/prysmaticlabs/geth-sharding/sharding/utils"
|
||||
)
|
||||
|
||||
// Simulator is a service in a shard node that simulates requests from
|
||||
@@ -29,7 +28,6 @@ type Simulator struct {
|
||||
shardID int
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
errChan chan error // Useful channel for handling errors at the service layer.
|
||||
delay time.Duration
|
||||
requestFeed *event.Feed
|
||||
}
|
||||
@@ -39,8 +37,7 @@ type Simulator struct {
|
||||
// and a shardID.
|
||||
func NewSimulator(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Server, shardID int, delay time.Duration) (*Simulator, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
errChan := make(chan error)
|
||||
return &Simulator{config, client, p2p, shardID, ctx, cancel, errChan, delay, nil}, nil
|
||||
return &Simulator{config, client, p2p, shardID, ctx, cancel, delay, nil}, nil
|
||||
}
|
||||
|
||||
// Start the main loop for simulating p2p requests.
|
||||
@@ -48,8 +45,7 @@ func (s *Simulator) Start() {
|
||||
log.Info("Starting simulator service")
|
||||
|
||||
s.requestFeed = s.p2p.Feed(messages.CollationBodyRequest{})
|
||||
go utils.HandleServiceErrors(s.ctx.Done(), s.errChan)
|
||||
go s.simulateNotaryRequests(s.client.SMCCaller(), s.client.ChainReader(), time.Tick(time.Second*s.delay))
|
||||
go s.simulateNotaryRequests(s.client.SMCCaller(), s.client.ChainReader(), time.Tick(time.Second*s.delay), s.ctx.Done())
|
||||
}
|
||||
|
||||
// Stop the main loop for simulator requests.
|
||||
@@ -57,7 +53,6 @@ func (s *Simulator) Stop() error {
|
||||
// Triggers a cancel call in the service's context which shuts down every goroutine
|
||||
// in this service.
|
||||
defer s.cancel()
|
||||
defer close(s.errChan)
|
||||
log.Warn("Stopping simulator service")
|
||||
return nil
|
||||
}
|
||||
@@ -68,24 +63,24 @@ func (s *Simulator) Stop() error {
|
||||
// pair to perform their responsibilities. This function in particular simulates
|
||||
// requests for collation bodies that will be relayed to the appropriate proposer
|
||||
// by the p2p feed layer.
|
||||
func (s *Simulator) simulateNotaryRequests(fetcher mainchain.RecordFetcher, reader mainchain.Reader, delayChan <-chan time.Time) {
|
||||
func (s *Simulator) simulateNotaryRequests(fetcher mainchain.RecordFetcher, reader mainchain.Reader, delayChan <-chan time.Time, done <-chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
// Makes sure to close this goroutine when the service stops.
|
||||
case <-s.ctx.Done():
|
||||
case <-done:
|
||||
log.Debug("Simulator context closed, exiting goroutine")
|
||||
return
|
||||
case <-delayChan:
|
||||
blockNumber, err := reader.BlockByNumber(s.ctx, nil)
|
||||
if err != nil {
|
||||
s.errChan <- fmt.Errorf("could not fetch current block number: %v", err)
|
||||
log.Error(fmt.Sprintf("Could not fetch current block number: %v", err))
|
||||
continue
|
||||
}
|
||||
|
||||
period := new(big.Int).Div(blockNumber.Number(), big.NewInt(s.config.PeriodLength))
|
||||
req, err := syncer.RequestCollationBody(fetcher, big.NewInt(int64(s.shardID)), period)
|
||||
if err != nil {
|
||||
s.errChan <- fmt.Errorf("error constructing collation body request: %v", err)
|
||||
log.Error(fmt.Sprintf("Error constructing collation body request: %v", err))
|
||||
continue
|
||||
}
|
||||
if req != nil {
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -113,6 +112,9 @@ func TestStartStop(t *testing.T) {
|
||||
// in the simulateNotaryRequests goroutine when reading the block number from
|
||||
// the mainchain via RPC.
|
||||
func TestSimulateNotaryRequests_FaultyReader(t *testing.T) {
|
||||
h := internal.NewLogHandler(t)
|
||||
log.Root().SetHandler(h)
|
||||
|
||||
shardID := 0
|
||||
server, err := p2p.NewServer()
|
||||
if err != nil {
|
||||
@@ -125,28 +127,30 @@ func TestSimulateNotaryRequests_FaultyReader(t *testing.T) {
|
||||
}
|
||||
|
||||
simulator.requestFeed = server.Feed(messages.CollationBodyRequest{})
|
||||
simulator.errChan = make(chan error)
|
||||
|
||||
go simulator.simulateNotaryRequests(&goodSMCCaller{}, &faultyReader{}, time.After(time.Second*0))
|
||||
delayChan := make(chan time.Time)
|
||||
doneChan := make(chan struct{})
|
||||
exitRoutine := make(chan bool)
|
||||
go func() {
|
||||
simulator.simulateNotaryRequests(&goodSMCCaller{}, &faultyReader{}, delayChan, doneChan)
|
||||
<-exitRoutine
|
||||
}()
|
||||
|
||||
receivedErr := <-simulator.errChan
|
||||
expectedErr := "could not fetch current block number"
|
||||
if !strings.Contains(receivedErr.Error(), expectedErr) {
|
||||
t.Errorf("Expected error did not match. want: %v, got: %v", expectedErr, receivedErr)
|
||||
}
|
||||
delayChan <- time.Time{}
|
||||
doneChan <- struct{}{}
|
||||
h.VerifyLogMsg("Could not fetch current block number: cannot fetch block by number")
|
||||
|
||||
simulator.cancel()
|
||||
|
||||
// The context should have been canceled.
|
||||
if simulator.ctx.Err() == nil {
|
||||
t.Error("Context was not canceled")
|
||||
}
|
||||
exitRoutine <- true
|
||||
h.VerifyLogMsg("Simulator context closed, exiting goroutine")
|
||||
}
|
||||
|
||||
// This test uses a faulty SMCCaller in order to trigger an error
|
||||
// in the simulateNotaryRequests goroutine when reading the collation records
|
||||
// from the SMC.
|
||||
func TestSimulateNotaryRequests_FaultyCaller(t *testing.T) {
|
||||
h := internal.NewLogHandler(t)
|
||||
log.Root().SetHandler(h)
|
||||
|
||||
shardID := 0
|
||||
server, err := p2p.NewServer()
|
||||
if err != nil {
|
||||
@@ -159,22 +163,21 @@ func TestSimulateNotaryRequests_FaultyCaller(t *testing.T) {
|
||||
}
|
||||
|
||||
simulator.requestFeed = server.Feed(messages.CollationBodyRequest{})
|
||||
simulator.errChan = make(chan error)
|
||||
|
||||
go simulator.simulateNotaryRequests(&faultySMCCaller{}, &goodReader{}, time.After(time.Second*0))
|
||||
delayChan := make(chan time.Time)
|
||||
doneChan := make(chan struct{})
|
||||
exitRoutine := make(chan bool)
|
||||
go func() {
|
||||
simulator.simulateNotaryRequests(&faultySMCCaller{}, &goodReader{}, delayChan, doneChan)
|
||||
<-exitRoutine
|
||||
}()
|
||||
|
||||
receivedErr := <-simulator.errChan
|
||||
expectedErr := "error constructing collation body request"
|
||||
if !strings.Contains(receivedErr.Error(), expectedErr) {
|
||||
t.Errorf("Expected error did not match. want: %v, got: %v", expectedErr, receivedErr)
|
||||
}
|
||||
delayChan <- time.Time{}
|
||||
doneChan <- struct{}{}
|
||||
h.VerifyLogMsg("Error constructing collation body request: could not fetch collation record from SMC: error fetching collation record")
|
||||
|
||||
simulator.cancel()
|
||||
|
||||
// The context should have been canceled.
|
||||
if simulator.ctx.Err() == nil {
|
||||
t.Error("Context was not canceled")
|
||||
}
|
||||
exitRoutine <- true
|
||||
h.VerifyLogMsg("Simulator context closed, exiting goroutine")
|
||||
}
|
||||
|
||||
// This test checks the proper functioning of the simulateNotaryRequests goroutine
|
||||
@@ -196,20 +199,20 @@ func TestSimulateNotaryRequests(t *testing.T) {
|
||||
}
|
||||
|
||||
simulator.requestFeed = server.Feed(messages.CollationBodyRequest{})
|
||||
simulator.errChan = make(chan error)
|
||||
delayChan := make(chan time.Time)
|
||||
doneChan := make(chan struct{})
|
||||
exitRoutine := make(chan bool)
|
||||
|
||||
go simulator.simulateNotaryRequests(&goodSMCCaller{}, &goodReader{}, delayChan)
|
||||
go func() {
|
||||
simulator.simulateNotaryRequests(&goodSMCCaller{}, &goodReader{}, delayChan, doneChan)
|
||||
<-exitRoutine
|
||||
}()
|
||||
|
||||
delayChan <- time.Time{}
|
||||
delayChan <- time.Time{}
|
||||
doneChan <- struct{}{}
|
||||
|
||||
h.VerifyLogMsg("Simulator context closed, exiting goroutine")
|
||||
h.VerifyLogMsg("Sent request for collation body via a shardp2p feed")
|
||||
|
||||
simulator.cancel()
|
||||
// The context should have been canceled.
|
||||
if simulator.ctx.Err() == nil {
|
||||
t.Error("Context was not canceled")
|
||||
}
|
||||
exitRoutine <- true
|
||||
h.VerifyLogMsg("Simulator context closed, exiting goroutine")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user