mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-08 15:13:57 -05:00
Simplify Goroutines for Better Testing (#216)
sharding: goroutine for better testing (#216) Former-commit-id: 2c70ee0892b1e36d5b4473f1e5bba5f151ee449c [formerly 3d91ae5c4288ab27fbf09347d5b12164802726bc] Former-commit-id: 24085acd2b045f549a3356ef0da219cb91149650
This commit is contained in:
@@ -12,17 +12,28 @@ import (
|
||||
)
|
||||
|
||||
type ShardDB struct {
|
||||
dataDir string
|
||||
name string
|
||||
cache int
|
||||
handles int
|
||||
db *ethdb.LDBDatabase
|
||||
inmemory bool
|
||||
dataDir string
|
||||
name string
|
||||
cache int
|
||||
handles int
|
||||
db ethdb.Database
|
||||
}
|
||||
|
||||
// NewShardDB initializes a shardDB.
|
||||
func NewShardDB(dataDir string, name string) (*ShardDB, error) {
|
||||
func NewShardDB(dataDir string, name string, inmemory bool) (*ShardDB, error) {
|
||||
// Uses default cache and handles values.
|
||||
// TODO: allow these arguments to be set based on cli context.
|
||||
if inmemory {
|
||||
return &ShardDB{
|
||||
inmemory: inmemory,
|
||||
dataDir: dataDir,
|
||||
name: name,
|
||||
cache: 16,
|
||||
handles: 16,
|
||||
db: NewShardKV(),
|
||||
}, nil
|
||||
}
|
||||
return &ShardDB{
|
||||
dataDir: dataDir,
|
||||
name: name,
|
||||
@@ -35,12 +46,14 @@ func NewShardDB(dataDir string, name string) (*ShardDB, error) {
|
||||
// Start the shard DB service.
|
||||
func (s *ShardDB) Start() {
|
||||
log.Info("Starting shardDB service")
|
||||
db, err := ethdb.NewLDBDatabase(filepath.Join(s.dataDir, s.name), s.cache, s.handles)
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprintf("Could not start shard DB: %v", err))
|
||||
return
|
||||
if !s.inmemory {
|
||||
db, err := ethdb.NewLDBDatabase(filepath.Join(s.dataDir, s.name), s.cache, s.handles)
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprintf("Could not start shard DB: %v", err))
|
||||
return
|
||||
}
|
||||
s.db = db
|
||||
}
|
||||
s.db = db
|
||||
}
|
||||
|
||||
// Stop the shard DB service gracefully.
|
||||
|
||||
@@ -15,7 +15,7 @@ var _ = sharding.Service(&ShardDB{})
|
||||
var testDB *ShardDB
|
||||
|
||||
func init() {
|
||||
shardDB, _ := NewShardDB("/tmp/datadir", "shardchaindata")
|
||||
shardDB, _ := NewShardDB("/tmp/datadir", "shardchaindata", false)
|
||||
testDB = shardDB
|
||||
testDB.Start()
|
||||
}
|
||||
@@ -24,7 +24,7 @@ func TestLifecycle(t *testing.T) {
|
||||
h := internal.NewLogHandler(t)
|
||||
log.Root().SetHandler(h)
|
||||
|
||||
s, err := NewShardDB("/tmp/datadir", "shardchaindb")
|
||||
s, err := NewShardDB("/tmp/datadir", "shardchaindb", false)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not initialize a new sb: %v", err)
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ type ShardKV struct {
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
// NewShardKV initializes a keyval store in memory.
|
||||
// NewShardKV creates an in-memory, key-value store.
|
||||
func NewShardKV() *ShardKV {
|
||||
return &ShardKV{kv: make(map[common.Hash][]byte)}
|
||||
}
|
||||
|
||||
@@ -184,7 +184,7 @@ func (s *ShardEthereum) registerShardChainDB(ctx *cli.Context) error {
|
||||
path = ctx.GlobalString(utils.DataDirFlag.Name)
|
||||
}
|
||||
return s.Register(func(ctx *sharding.ServiceContext) (sharding.Service, error) {
|
||||
return database.NewShardDB(path, shardChainDbName)
|
||||
return database.NewShardDB(path, shardChainDbName, false)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -263,7 +263,7 @@ func (s *ShardEthereum) registerSimulatorService(config *params.Config, shardID
|
||||
ctx.RetrieveService(&p2p)
|
||||
var smcClient *mainchain.SMCClient
|
||||
ctx.RetrieveService(&smcClient)
|
||||
return simulator.NewSimulator(config, smcClient.ChainReader(), smcClient.SMCCaller(), p2p, shardID, 15) // 15 second delay between simulator requests.
|
||||
return simulator.NewSimulator(config, smcClient, p2p, shardID, 15) // 15 second delay between simulator requests.
|
||||
})
|
||||
}
|
||||
|
||||
@@ -275,6 +275,6 @@ func (s *ShardEthereum) registerSyncerService(config *params.Config, shardID int
|
||||
ctx.RetrieveService(&smcClient)
|
||||
var shardChainDB *database.ShardDB
|
||||
ctx.RetrieveService(&shardChainDB)
|
||||
return syncer.NewSyncer(config, smcClient, p2p, shardChainDB.DB(), shardID)
|
||||
return syncer.NewSyncer(config, smcClient, p2p, shardChainDB, shardID)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -8,6 +8,12 @@ import (
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
// Sender represents a struct that is able to relay information via shardp2p.
|
||||
// Server implements this interface.
|
||||
type Sender interface {
|
||||
Send(msg interface{}, peer Peer)
|
||||
}
|
||||
|
||||
// Server is a placeholder for a p2p service. To be designed.
|
||||
type Server struct {
|
||||
feeds map[reflect.Type]*event.Feed
|
||||
|
||||
@@ -13,44 +13,42 @@ import (
|
||||
"github.com/ethereum/go-ethereum/sharding/p2p/messages"
|
||||
"github.com/ethereum/go-ethereum/sharding/params"
|
||||
"github.com/ethereum/go-ethereum/sharding/syncer"
|
||||
"github.com/ethereum/go-ethereum/sharding/utils"
|
||||
)
|
||||
|
||||
// Simulator is a service in a shard node
|
||||
// that simulates requests from remote notes coming over
|
||||
// the shardp2p network. For example, if we are running a
|
||||
// proposer service, we would want to simulate notary requests
|
||||
// coming to us via a p2p feed. This service will be removed once
|
||||
// p2p internals and end-to-end testing across remote nodes have been
|
||||
// implemented.
|
||||
// Simulator is a service in a shard node that simulates requests from
|
||||
// remote notes coming over the shardp2p network. For example, if
|
||||
// we are running a proposer service, we would want to simulate notary requests
|
||||
// requests coming to us via a p2p feed. This service will be removed
|
||||
// once p2p internals and end-to-end testing across remote
|
||||
// nodes have been implemented.
|
||||
type Simulator struct {
|
||||
config *params.Config
|
||||
reader mainchain.Reader
|
||||
fetcher mainchain.RecordFetcher
|
||||
client *mainchain.SMCClient
|
||||
p2p *p2p.Server
|
||||
shardID int
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
errChan chan error // Useful channel for handling errors at the service layer.
|
||||
delay time.Duration // The delay (in seconds) between simulator requests sent via p2p.
|
||||
requestSent chan int // Useful channel for handling outgoing requests from the service.
|
||||
errChan chan error // Useful channel for handling errors at the service layer.
|
||||
delay time.Duration
|
||||
requestFeed *event.Feed
|
||||
}
|
||||
|
||||
// NewSimulator creates a struct instance of a simulator service.
|
||||
// It will have access to config, a mainchain client, a p2p server,
|
||||
// and a shardID.
|
||||
func NewSimulator(config *params.Config, reader mainchain.Reader, fetcher mainchain.RecordFetcher, p2p *p2p.Server, shardID int, delay time.Duration) (*Simulator, error) {
|
||||
func NewSimulator(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Server, shardID int, delay time.Duration) (*Simulator, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
errChan := make(chan error)
|
||||
requestSent := make(chan int)
|
||||
return &Simulator{config, reader, fetcher, p2p, shardID, ctx, cancel, errChan, delay, requestSent}, nil
|
||||
return &Simulator{config, client, p2p, shardID, ctx, cancel, errChan, delay, nil}, nil
|
||||
}
|
||||
|
||||
// Start the main loop for simulating p2p requests.
|
||||
func (s *Simulator) Start() {
|
||||
log.Info("Starting simulator service")
|
||||
feed := s.p2p.Feed(messages.CollationBodyRequest{})
|
||||
go s.simulateNotaryRequests(s.fetcher, s.reader, feed)
|
||||
go s.handleServiceErrors()
|
||||
s.requestFeed = s.p2p.Feed(messages.CollationBodyRequest{})
|
||||
go utils.HandleServiceErrors(s.ctx.Done(), s.errChan)
|
||||
go s.simulateNotaryRequests(s.client.SMCCaller(), s.client.ChainReader(), time.Tick(time.Second*s.delay))
|
||||
}
|
||||
|
||||
// Stop the main loop for simulator requests.
|
||||
@@ -58,37 +56,24 @@ func (s *Simulator) Stop() error {
|
||||
// Triggers a cancel call in the service's context which shuts down every goroutine
|
||||
// in this service.
|
||||
defer s.cancel()
|
||||
defer close(s.errChan)
|
||||
log.Warn("Stopping simulator service")
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleServiceErrors manages a goroutine that listens for errors broadcast to
|
||||
// this service's error channel. This serves as a final step for error logging
|
||||
// and is stopped upon the service shutting down.
|
||||
func (s *Simulator) handleServiceErrors() {
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case err := <-s.errChan:
|
||||
log.Error(fmt.Sprintf("Simulator service error: %v", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// simulateNotaryRequests simulates p2p message sent out by notaries
|
||||
// once the system is in production. Notaries will be performing
|
||||
// this action within their own service when they are selected on a shard, period
|
||||
// pair to perform their responsibilities. This function in particular simulates
|
||||
// requests for collation bodies that will be relayed to the appropriate proposer
|
||||
// by the p2p feed layer.
|
||||
func (s *Simulator) simulateNotaryRequests(fetcher mainchain.RecordFetcher, reader mainchain.Reader, feed *event.Feed) {
|
||||
func (s *Simulator) simulateNotaryRequests(fetcher mainchain.RecordFetcher, reader mainchain.Reader, delayChan <-chan time.Time) {
|
||||
for {
|
||||
select {
|
||||
// Makes sure to close this goroutine when the service stops.
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case <-time.After(time.Second * s.delay):
|
||||
case <-delayChan:
|
||||
blockNumber, err := reader.BlockByNumber(s.ctx, nil)
|
||||
if err != nil {
|
||||
s.errChan <- fmt.Errorf("could not fetch current block number: %v", err)
|
||||
@@ -106,9 +91,8 @@ func (s *Simulator) simulateNotaryRequests(fetcher mainchain.RecordFetcher, read
|
||||
Peer: p2p.Peer{},
|
||||
Data: *req,
|
||||
}
|
||||
feed.Send(msg)
|
||||
s.requestFeed.Send(msg)
|
||||
log.Info("Sent request for collation body via a shardp2p feed")
|
||||
s.requestSent <- 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,8 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/sharding/mainchain"
|
||||
|
||||
ethereum "github.com/ethereum/go-ethereum"
|
||||
"github.com/ethereum/go-ethereum/accounts/abi/bind"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
@@ -90,15 +92,11 @@ func TestStartStop(t *testing.T) {
|
||||
t.Fatalf("Unable to setup p2p server: %v", err)
|
||||
}
|
||||
|
||||
simulator, err := NewSimulator(params.DefaultConfig, &goodReader{}, &goodSMCCaller{}, server, shardID, 0)
|
||||
simulator, err := NewSimulator(params.DefaultConfig, &mainchain.SMCClient{}, server, shardID, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to setup simulator service: %v", err)
|
||||
}
|
||||
|
||||
simulator.Start()
|
||||
|
||||
h.VerifyLogMsg("Starting simulator service")
|
||||
|
||||
if err := simulator.Stop(); err != nil {
|
||||
t.Fatalf("Unable to stop simulator service: %v", err)
|
||||
}
|
||||
@@ -115,34 +113,29 @@ func TestStartStop(t *testing.T) {
|
||||
// in the simulateNotaryRequests goroutine when reading the block number from
|
||||
// the mainchain via RPC.
|
||||
func TestSimulateNotaryRequests_FaultyReader(t *testing.T) {
|
||||
h := internal.NewLogHandler(t)
|
||||
log.Root().SetHandler(h)
|
||||
|
||||
shardID := 0
|
||||
server, err := p2p.NewServer()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to setup p2p server: %v", err)
|
||||
}
|
||||
|
||||
simulator, err := NewSimulator(params.DefaultConfig, &faultyReader{}, &goodSMCCaller{}, server, shardID, 0)
|
||||
simulator, err := NewSimulator(params.DefaultConfig, &mainchain.SMCClient{}, server, shardID, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to setup simulator service: %v", err)
|
||||
}
|
||||
|
||||
feed := server.Feed(messages.CollationBodyRequest{})
|
||||
simulator.requestFeed = server.Feed(messages.CollationBodyRequest{})
|
||||
simulator.errChan = make(chan error)
|
||||
|
||||
faultyReader := &faultyReader{}
|
||||
go simulator.simulateNotaryRequests(&goodSMCCaller{}, faultyReader, feed)
|
||||
go simulator.simulateNotaryRequests(&goodSMCCaller{}, &faultyReader{}, time.After(time.Second*0))
|
||||
|
||||
receivedErr := <-simulator.errChan
|
||||
expectedErr := "could not fetch current block number"
|
||||
if !strings.Contains(receivedErr.Error(), expectedErr) {
|
||||
t.Errorf("Expected error did not match. want: %v, got: %v", expectedErr, receivedErr)
|
||||
}
|
||||
if err := simulator.Stop(); err != nil {
|
||||
t.Fatalf("Unable to stop simulator service: %v", err)
|
||||
}
|
||||
h.VerifyLogMsg("Stopping simulator service")
|
||||
|
||||
simulator.cancel()
|
||||
|
||||
// The context should have been canceled.
|
||||
if simulator.ctx.Err() == nil {
|
||||
@@ -154,34 +147,29 @@ func TestSimulateNotaryRequests_FaultyReader(t *testing.T) {
|
||||
// in the simulateNotaryRequests goroutine when reading the collation records
|
||||
// from the SMC.
|
||||
func TestSimulateNotaryRequests_FaultyCaller(t *testing.T) {
|
||||
h := internal.NewLogHandler(t)
|
||||
log.Root().SetHandler(h)
|
||||
|
||||
shardID := 0
|
||||
server, err := p2p.NewServer()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to setup p2p server: %v", err)
|
||||
}
|
||||
|
||||
simulator, err := NewSimulator(params.DefaultConfig, &goodReader{}, &faultySMCCaller{}, server, shardID, 0)
|
||||
simulator, err := NewSimulator(params.DefaultConfig, &mainchain.SMCClient{}, server, shardID, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to setup simulator service: %v", err)
|
||||
}
|
||||
|
||||
feed := server.Feed(messages.CollationBodyRequest{})
|
||||
reader := &goodReader{}
|
||||
simulator.requestFeed = server.Feed(messages.CollationBodyRequest{})
|
||||
simulator.errChan = make(chan error)
|
||||
|
||||
go simulator.simulateNotaryRequests(&faultySMCCaller{}, reader, feed)
|
||||
go simulator.simulateNotaryRequests(&faultySMCCaller{}, &goodReader{}, time.After(time.Second*0))
|
||||
|
||||
receivedErr := <-simulator.errChan
|
||||
expectedErr := "error constructing collation body request"
|
||||
if !strings.Contains(receivedErr.Error(), expectedErr) {
|
||||
t.Errorf("Expected error did not match. want: %v, got: %v", expectedErr, receivedErr)
|
||||
}
|
||||
if err := simulator.Stop(); err != nil {
|
||||
t.Fatalf("Unable to stop simulator service: %v", err)
|
||||
}
|
||||
h.VerifyLogMsg("Stopping simulator service")
|
||||
|
||||
simulator.cancel()
|
||||
|
||||
// The context should have been canceled.
|
||||
if simulator.ctx.Err() == nil {
|
||||
@@ -202,66 +190,25 @@ func TestSimulateNotaryRequests(t *testing.T) {
|
||||
t.Fatalf("Unable to setup p2p server: %v", err)
|
||||
}
|
||||
|
||||
simulator, err := NewSimulator(params.DefaultConfig, &goodReader{}, &goodSMCCaller{}, server, shardID, 0)
|
||||
simulator, err := NewSimulator(params.DefaultConfig, &mainchain.SMCClient{}, server, shardID, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to setup simulator service: %v", err)
|
||||
}
|
||||
|
||||
feed := server.Feed(messages.CollationBodyRequest{})
|
||||
reader := &goodReader{}
|
||||
simulator.requestFeed = server.Feed(messages.CollationBodyRequest{})
|
||||
simulator.errChan = make(chan error)
|
||||
delayChan := make(chan time.Time)
|
||||
|
||||
go simulator.simulateNotaryRequests(&goodSMCCaller{}, reader, feed)
|
||||
go simulator.simulateNotaryRequests(&goodSMCCaller{}, &goodReader{}, delayChan)
|
||||
|
||||
delayChan <- time.Time{}
|
||||
delayChan <- time.Time{}
|
||||
|
||||
<-simulator.requestSent
|
||||
h.VerifyLogMsg("Sent request for collation body via a shardp2p feed")
|
||||
|
||||
simulator.cancel()
|
||||
// The context should have been canceled.
|
||||
if simulator.ctx.Err() == nil {
|
||||
t.Fatal("Context was not canceled")
|
||||
t.Error("Context was not canceled")
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Move this to the utils package along with the handleServiceErrors
|
||||
// function.
|
||||
func TestHandleServiceErrors(t *testing.T) {
|
||||
h := internal.NewLogHandler(t)
|
||||
log.Root().SetHandler(h)
|
||||
|
||||
shardID := 0
|
||||
server, err := p2p.NewServer()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to setup p2p server: %v", err)
|
||||
}
|
||||
|
||||
simulator, err := NewSimulator(params.DefaultConfig, &goodReader{}, &goodSMCCaller{}, server, shardID, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to setup simulator service: %v", err)
|
||||
}
|
||||
|
||||
go simulator.handleServiceErrors()
|
||||
|
||||
expectedErr := "testing the error channel"
|
||||
complete := make(chan int)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-simulator.ctx.Done():
|
||||
return
|
||||
default:
|
||||
simulator.errChan <- errors.New(expectedErr)
|
||||
complete <- 1
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
<-complete
|
||||
simulator.cancel()
|
||||
|
||||
// The context should have been canceled.
|
||||
if simulator.ctx.Err() == nil {
|
||||
t.Fatal("Context was not canceled")
|
||||
}
|
||||
time.Sleep(time.Millisecond * 500)
|
||||
h.VerifyLogMsg(fmt.Sprintf("Simulator service error: %v", expectedErr))
|
||||
}
|
||||
|
||||
@@ -2,17 +2,19 @@ package syncer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/sharding"
|
||||
"github.com/ethereum/go-ethereum/sharding/database"
|
||||
"github.com/ethereum/go-ethereum/sharding/mainchain"
|
||||
"github.com/ethereum/go-ethereum/sharding/p2p"
|
||||
"github.com/ethereum/go-ethereum/sharding/p2p/messages"
|
||||
"github.com/ethereum/go-ethereum/sharding/params"
|
||||
"github.com/ethereum/go-ethereum/sharding/utils"
|
||||
)
|
||||
|
||||
// Syncer represents a service that provides handlers for shard chain
|
||||
@@ -21,31 +23,36 @@ import (
|
||||
// items such as transactions and in future sharding iterations: state.
|
||||
type Syncer struct {
|
||||
config *params.Config
|
||||
signer mainchain.Signer
|
||||
shard *sharding.Shard
|
||||
client *mainchain.SMCClient
|
||||
shardID int
|
||||
shardChainDB *database.ShardDB
|
||||
p2p *p2p.Server
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
msgChan chan p2p.Message
|
||||
bodyRequests event.Subscription
|
||||
errChan chan error // Useful channel for handling errors at the service layer.
|
||||
responseSent chan int // Useful channel for handling outgoing responses from the service.
|
||||
}
|
||||
|
||||
// NewSyncer creates a struct instance of a syncer service.
|
||||
// It will have access to config, a signer, a p2p server,
|
||||
// a shardChainDb, and a shardID.
|
||||
func NewSyncer(config *params.Config, signer mainchain.Signer, p2p *p2p.Server, shardChainDB ethdb.Database, shardID int) (*Syncer, error) {
|
||||
func NewSyncer(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Server, shardChainDB *database.ShardDB, shardID int) (*Syncer, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
shard := sharding.NewShard(big.NewInt(int64(shardID)), shardChainDB)
|
||||
errChan := make(chan error)
|
||||
responseSent := make(chan int)
|
||||
return &Syncer{config, signer, shard, p2p, ctx, cancel, errChan, responseSent}, nil
|
||||
return &Syncer{config, client, shardID, shardChainDB, p2p, ctx, cancel, nil, nil, errChan}, nil
|
||||
}
|
||||
|
||||
// Start the main loop for handling shard chain data requests.
|
||||
func (s *Syncer) Start() {
|
||||
log.Info("Starting sync service")
|
||||
go s.handleCollationBodyRequests(s.signer, s.p2p.Feed(messages.CollationBodyRequest{}))
|
||||
go s.handleServiceErrors()
|
||||
|
||||
shard := sharding.NewShard(big.NewInt(int64(s.shardID)), s.shardChainDB.DB())
|
||||
|
||||
s.msgChan = make(chan p2p.Message, 100)
|
||||
s.bodyRequests = s.p2p.Feed(messages.CollationBodyRequest{}).Subscribe(s.msgChan)
|
||||
go s.handleCollationBodyRequests(s.client, shard)
|
||||
go utils.HandleServiceErrors(s.ctx.Done(), s.errChan)
|
||||
}
|
||||
|
||||
// Stop the main loop.
|
||||
@@ -53,43 +60,26 @@ func (s *Syncer) Stop() error {
|
||||
// Triggers a cancel call in the service's context which shuts down every goroutine
|
||||
// in this service.
|
||||
defer s.cancel()
|
||||
defer close(s.errChan)
|
||||
defer close(s.msgChan)
|
||||
log.Warn("Stopping sync service")
|
||||
s.bodyRequests.Unsubscribe()
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleServiceErrors manages a goroutine that listens for errors broadcast to
|
||||
// this service's error channel. This serves as a final step for error logging
|
||||
// and is stopped upon the service shutting down.
|
||||
func (s *Syncer) handleServiceErrors() {
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case err := <-s.errChan:
|
||||
log.Error(fmt.Sprintf("Sync service error: %v", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleCollationBodyRequests subscribes to messages from the shardp2p
|
||||
// network and responds to a specific peer that requested the body using
|
||||
// the Send method exposed by the p2p server's API (implementing the p2p.Sender interface).
|
||||
func (s *Syncer) handleCollationBodyRequests(signer mainchain.Signer, feed *event.Feed) {
|
||||
|
||||
ch := make(chan p2p.Message, 100)
|
||||
sub := feed.Subscribe(ch)
|
||||
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
func (s *Syncer) handleCollationBodyRequests(signer mainchain.Signer, collationFetcher sharding.CollationFetcher) {
|
||||
for {
|
||||
select {
|
||||
// Makes sure to close this goroutine when the service stops.
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case req := <-ch:
|
||||
case req := <-s.msgChan:
|
||||
if req.Data != nil {
|
||||
log.Info(fmt.Sprintf("Received p2p request of type: %T", req))
|
||||
res, err := RespondCollationBody(req, signer, s.shard)
|
||||
res, err := RespondCollationBody(req, signer, collationFetcher)
|
||||
if err != nil {
|
||||
s.errChan <- fmt.Errorf("could not construct response: %v", err)
|
||||
continue
|
||||
@@ -98,8 +88,10 @@ func (s *Syncer) handleCollationBodyRequests(signer mainchain.Signer, feed *even
|
||||
// Reply to that specific peer only.
|
||||
s.p2p.Send(*res, req.Peer)
|
||||
log.Info(fmt.Sprintf("Responding to p2p request with collation with headerHash: %v", res.HeaderHash.Hex()))
|
||||
s.responseSent <- 1
|
||||
}
|
||||
case <-s.bodyRequests.Err():
|
||||
s.errChan <- errors.New("subscriber failed")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
package syncer
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/sharding/mainchain"
|
||||
"github.com/ethereum/go-ethereum/sharding/params"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
@@ -19,30 +20,33 @@ import (
|
||||
"github.com/ethereum/go-ethereum/sharding/database"
|
||||
internal "github.com/ethereum/go-ethereum/sharding/internal"
|
||||
"github.com/ethereum/go-ethereum/sharding/p2p"
|
||||
"github.com/ethereum/go-ethereum/sharding/params"
|
||||
)
|
||||
|
||||
var _ = sharding.Service(&Syncer{})
|
||||
|
||||
func TestStartStop(t *testing.T) {
|
||||
func TestStop(t *testing.T) {
|
||||
h := internal.NewLogHandler(t)
|
||||
log.Root().SetHandler(h)
|
||||
|
||||
shardChainDB := database.NewShardKV()
|
||||
shardChainDB, err := database.NewShardDB("", "", true)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to setup db: %v", err)
|
||||
}
|
||||
shardID := 0
|
||||
server, err := p2p.NewServer()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to setup p2p server: %v", err)
|
||||
}
|
||||
|
||||
syncer, err := NewSyncer(params.DefaultConfig, &mockSigner{}, server, shardChainDB, shardID)
|
||||
syncer, err := NewSyncer(params.DefaultConfig, &mainchain.SMCClient{}, server, shardChainDB, shardID)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to setup sync service: %v", err)
|
||||
}
|
||||
|
||||
syncer.Start()
|
||||
|
||||
h.VerifyLogMsg("Starting sync service")
|
||||
feed := server.Feed(messages.CollationBodyRequest{})
|
||||
syncer.msgChan = make(chan p2p.Message)
|
||||
syncer.errChan = make(chan error)
|
||||
syncer.bodyRequests = feed.Subscribe(syncer.msgChan)
|
||||
|
||||
if err := syncer.Stop(); err != nil {
|
||||
t.Fatalf("Unable to stop sync service: %v", err)
|
||||
@@ -63,46 +67,46 @@ func TestHandleCollationBodyRequests_FaultySigner(t *testing.T) {
|
||||
h := internal.NewLogHandler(t)
|
||||
log.Root().SetHandler(h)
|
||||
|
||||
shardChainDB := database.NewShardKV()
|
||||
shardChainDB, err := database.NewShardDB("", "", true)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to setup db: %v", err)
|
||||
}
|
||||
shardID := 0
|
||||
server, err := p2p.NewServer()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to setup p2p server: %v", err)
|
||||
}
|
||||
|
||||
syncer, err := NewSyncer(params.DefaultConfig, &mockSigner{}, server, shardChainDB, shardID)
|
||||
syncer, err := NewSyncer(params.DefaultConfig, &mainchain.SMCClient{}, server, shardChainDB, shardID)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to setup syncer service: %v", err)
|
||||
}
|
||||
|
||||
feed := server.Feed(messages.CollationBodyRequest{})
|
||||
shard := sharding.NewShard(big.NewInt(int64(shardID)), shardChainDB.DB())
|
||||
|
||||
go syncer.handleCollationBodyRequests(&faultySigner{}, feed)
|
||||
syncer.msgChan = make(chan p2p.Message)
|
||||
syncer.errChan = make(chan error)
|
||||
syncer.bodyRequests = feed.Subscribe(syncer.msgChan)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-syncer.ctx.Done():
|
||||
return
|
||||
default:
|
||||
msg := p2p.Message{
|
||||
Peer: p2p.Peer{},
|
||||
Data: messages.CollationBodyRequest{},
|
||||
}
|
||||
feed.Send(msg)
|
||||
}
|
||||
}
|
||||
}()
|
||||
go syncer.handleCollationBodyRequests(&faultySigner{}, shard)
|
||||
|
||||
msg := p2p.Message{
|
||||
Peer: p2p.Peer{},
|
||||
Data: messages.CollationBodyRequest{},
|
||||
}
|
||||
syncer.msgChan <- msg
|
||||
receivedErr := <-syncer.errChan
|
||||
expectedErr := "could not construct response"
|
||||
if !strings.Contains(receivedErr.Error(), expectedErr) {
|
||||
t.Errorf("Expected error did not match. want: %v, got: %v", expectedErr, receivedErr)
|
||||
}
|
||||
|
||||
syncer.cancel()
|
||||
|
||||
// The context should have been canceled.
|
||||
if syncer.ctx.Err() == nil {
|
||||
t.Fatal("Context was not canceled")
|
||||
t.Error("Context was not canceled")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -113,7 +117,10 @@ func TestHandleCollationBodyRequests(t *testing.T) {
|
||||
h := internal.NewLogHandler(t)
|
||||
log.Root().SetHandler(h)
|
||||
|
||||
shardChainDB := database.NewShardKV()
|
||||
shardChainDB, err := database.NewShardDB("", "", true)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to setup db: %v", err)
|
||||
}
|
||||
server, err := p2p.NewServer()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to setup p2p server: %v", err)
|
||||
@@ -138,94 +145,42 @@ func TestHandleCollationBodyRequests(t *testing.T) {
|
||||
// Stores the collation into the inmemory kv store shardChainDB.
|
||||
collation := sharding.NewCollation(header, body, nil)
|
||||
|
||||
shard := sharding.NewShard(shardID, shardChainDB)
|
||||
shard := sharding.NewShard(shardID, shardChainDB.DB())
|
||||
|
||||
if err := shard.SaveCollation(collation); err != nil {
|
||||
t.Fatalf("Could not store collation in shardChainDB: %v", err)
|
||||
}
|
||||
|
||||
syncer, err := NewSyncer(params.DefaultConfig, &mockSigner{}, server, shardChainDB, 0)
|
||||
syncer, err := NewSyncer(params.DefaultConfig, &mainchain.SMCClient{}, server, shardChainDB, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to setup syncer service: %v", err)
|
||||
}
|
||||
|
||||
feed := server.Feed(messages.CollationBodyRequest{})
|
||||
|
||||
go syncer.handleCollationBodyRequests(&mockSigner{}, feed)
|
||||
syncer.msgChan = make(chan p2p.Message)
|
||||
syncer.errChan = make(chan error)
|
||||
syncer.bodyRequests = feed.Subscribe(syncer.msgChan)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-syncer.ctx.Done():
|
||||
return
|
||||
default:
|
||||
msg := p2p.Message{
|
||||
Peer: p2p.Peer{},
|
||||
Data: messages.CollationBodyRequest{
|
||||
ChunkRoot: &chunkRoot,
|
||||
ShardID: shardID,
|
||||
Period: period,
|
||||
Proposer: &proposerAddress,
|
||||
},
|
||||
}
|
||||
feed.Send(msg)
|
||||
}
|
||||
}
|
||||
}()
|
||||
go syncer.handleCollationBodyRequests(&mockSigner{}, shard)
|
||||
|
||||
msg := p2p.Message{
|
||||
Peer: p2p.Peer{},
|
||||
Data: messages.CollationBodyRequest{
|
||||
ChunkRoot: &chunkRoot,
|
||||
ShardID: shardID,
|
||||
Period: period,
|
||||
Proposer: &proposerAddress,
|
||||
},
|
||||
}
|
||||
syncer.msgChan <- msg
|
||||
|
||||
<-syncer.responseSent
|
||||
h.VerifyLogMsg(fmt.Sprintf("Received p2p request of type: %T", p2p.Message{}))
|
||||
h.VerifyLogMsg(fmt.Sprintf("Responding to p2p request with collation with headerHash: %v", header.Hash().Hex()))
|
||||
|
||||
syncer.cancel()
|
||||
// The context should have been canceled.
|
||||
if syncer.ctx.Err() == nil {
|
||||
t.Fatal("Context was not canceled")
|
||||
t.Error("Context was not canceled")
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Move this to the utils package along with the handleServiceErrors
|
||||
// function.
|
||||
func TestHandleServiceErrors(t *testing.T) {
|
||||
|
||||
h := internal.NewLogHandler(t)
|
||||
log.Root().SetHandler(h)
|
||||
|
||||
shardChainDB := database.NewShardKV()
|
||||
shardID := 0
|
||||
server, err := p2p.NewServer()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to setup p2p server: %v", err)
|
||||
}
|
||||
|
||||
syncer, err := NewSyncer(params.DefaultConfig, &mockSigner{}, server, shardChainDB, shardID)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to setup syncer service: %v", err)
|
||||
}
|
||||
|
||||
go syncer.handleServiceErrors()
|
||||
|
||||
expectedErr := "testing the error channel"
|
||||
complete := make(chan int)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-syncer.ctx.Done():
|
||||
return
|
||||
default:
|
||||
syncer.errChan <- errors.New(expectedErr)
|
||||
complete <- 1
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
<-complete
|
||||
syncer.cancel()
|
||||
|
||||
// The context should have been canceled.
|
||||
if syncer.ctx.Err() == nil {
|
||||
t.Fatal("Context was not canceled")
|
||||
}
|
||||
time.Sleep(time.Millisecond * 500)
|
||||
h.VerifyLogMsg(fmt.Sprintf("Sync service error: %v", expectedErr))
|
||||
}
|
||||
|
||||
19
sharding/utils/service.go
Normal file
19
sharding/utils/service.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
// HandleServiceErrors manages a goroutine that listens for errors broadcast to
|
||||
// this service's error channel. This serves as a final step for error logging
|
||||
// and is stopped upon the service shutting down.
|
||||
func HandleServiceErrors(done <-chan struct{}, errChan <-chan error) {
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case err := <-errChan:
|
||||
log.Error(err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
22
sharding/utils/service_test.go
Normal file
22
sharding/utils/service_test.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/sharding/internal"
|
||||
)
|
||||
|
||||
func TestHandleServiceErrors(t *testing.T) {
|
||||
h := internal.NewLogHandler(t)
|
||||
log.Root().SetHandler(h)
|
||||
done := make(chan struct{})
|
||||
errChan := make(chan error)
|
||||
|
||||
go HandleServiceErrors(done, errChan)
|
||||
|
||||
errChan <- errors.New("something wrong")
|
||||
done <- struct{}{}
|
||||
h.VerifyLogMsg("something wrong")
|
||||
}
|
||||
Reference in New Issue
Block a user