diff --git a/sharding/database/database.go b/sharding/database/database.go index 7d5897986e..2964a7e9fb 100644 --- a/sharding/database/database.go +++ b/sharding/database/database.go @@ -12,17 +12,28 @@ import ( ) type ShardDB struct { - dataDir string - name string - cache int - handles int - db *ethdb.LDBDatabase + inmemory bool + dataDir string + name string + cache int + handles int + db ethdb.Database } // NewShardDB initializes a shardDB. -func NewShardDB(dataDir string, name string) (*ShardDB, error) { +func NewShardDB(dataDir string, name string, inmemory bool) (*ShardDB, error) { // Uses default cache and handles values. // TODO: allow these arguments to be set based on cli context. + if inmemory { + return &ShardDB{ + inmemory: inmemory, + dataDir: dataDir, + name: name, + cache: 16, + handles: 16, + db: NewShardKV(), + }, nil + } return &ShardDB{ dataDir: dataDir, name: name, @@ -35,12 +46,14 @@ func NewShardDB(dataDir string, name string) (*ShardDB, error) { // Start the shard DB service. func (s *ShardDB) Start() { log.Info("Starting shardDB service") - db, err := ethdb.NewLDBDatabase(filepath.Join(s.dataDir, s.name), s.cache, s.handles) - if err != nil { - log.Error(fmt.Sprintf("Could not start shard DB: %v", err)) - return + if !s.inmemory { + db, err := ethdb.NewLDBDatabase(filepath.Join(s.dataDir, s.name), s.cache, s.handles) + if err != nil { + log.Error(fmt.Sprintf("Could not start shard DB: %v", err)) + return + } + s.db = db } - s.db = db } // Stop the shard DB service gracefully. diff --git a/sharding/database/database_test.go b/sharding/database/database_test.go index eaefa2024c..026ce0925f 100644 --- a/sharding/database/database_test.go +++ b/sharding/database/database_test.go @@ -15,7 +15,7 @@ var _ = sharding.Service(&ShardDB{}) var testDB *ShardDB func init() { - shardDB, _ := NewShardDB("/tmp/datadir", "shardchaindata") + shardDB, _ := NewShardDB("/tmp/datadir", "shardchaindata", false) testDB = shardDB testDB.Start() } @@ -24,7 +24,7 @@ func TestLifecycle(t *testing.T) { h := internal.NewLogHandler(t) log.Root().SetHandler(h) - s, err := NewShardDB("/tmp/datadir", "shardchaindb") + s, err := NewShardDB("/tmp/datadir", "shardchaindb", false) if err != nil { t.Fatalf("Could not initialize a new sb: %v", err) } diff --git a/sharding/database/inmemory.go b/sharding/database/inmemory.go index 448951b5fe..7b703c91ff 100644 --- a/sharding/database/inmemory.go +++ b/sharding/database/inmemory.go @@ -14,7 +14,7 @@ type ShardKV struct { lock sync.RWMutex } -// NewShardKV initializes a keyval store in memory. +// NewShardKV creates an in-memory, key-value store. func NewShardKV() *ShardKV { return &ShardKV{kv: make(map[common.Hash][]byte)} } diff --git a/sharding/node/backend.go b/sharding/node/backend.go index ff91ad8960..1913c0e921 100644 --- a/sharding/node/backend.go +++ b/sharding/node/backend.go @@ -184,7 +184,7 @@ func (s *ShardEthereum) registerShardChainDB(ctx *cli.Context) error { path = ctx.GlobalString(utils.DataDirFlag.Name) } return s.Register(func(ctx *sharding.ServiceContext) (sharding.Service, error) { - return database.NewShardDB(path, shardChainDbName) + return database.NewShardDB(path, shardChainDbName, false) }) } @@ -263,7 +263,7 @@ func (s *ShardEthereum) registerSimulatorService(config *params.Config, shardID ctx.RetrieveService(&p2p) var smcClient *mainchain.SMCClient ctx.RetrieveService(&smcClient) - return simulator.NewSimulator(config, smcClient.ChainReader(), smcClient.SMCCaller(), p2p, shardID, 15) // 15 second delay between simulator requests. + return simulator.NewSimulator(config, smcClient, p2p, shardID, 15) // 15 second delay between simulator requests. }) } @@ -275,6 +275,6 @@ func (s *ShardEthereum) registerSyncerService(config *params.Config, shardID int ctx.RetrieveService(&smcClient) var shardChainDB *database.ShardDB ctx.RetrieveService(&shardChainDB) - return syncer.NewSyncer(config, smcClient, p2p, shardChainDB.DB(), shardID) + return syncer.NewSyncer(config, smcClient, p2p, shardChainDB, shardID) }) } diff --git a/sharding/p2p/service.go b/sharding/p2p/service.go index b73b7580b8..9280902248 100644 --- a/sharding/p2p/service.go +++ b/sharding/p2p/service.go @@ -8,6 +8,12 @@ import ( "github.com/ethereum/go-ethereum/log" ) +// Sender represents a struct that is able to relay information via shardp2p. +// Server implements this interface. +type Sender interface { + Send(msg interface{}, peer Peer) +} + // Server is a placeholder for a p2p service. To be designed. type Server struct { feeds map[reflect.Type]*event.Feed diff --git a/sharding/simulator/service.go b/sharding/simulator/service.go index cbcfe82f4b..8ac0f27b94 100644 --- a/sharding/simulator/service.go +++ b/sharding/simulator/service.go @@ -13,44 +13,42 @@ import ( "github.com/ethereum/go-ethereum/sharding/p2p/messages" "github.com/ethereum/go-ethereum/sharding/params" "github.com/ethereum/go-ethereum/sharding/syncer" + "github.com/ethereum/go-ethereum/sharding/utils" ) -// Simulator is a service in a shard node -// that simulates requests from remote notes coming over -// the shardp2p network. For example, if we are running a -// proposer service, we would want to simulate notary requests -// coming to us via a p2p feed. This service will be removed once -// p2p internals and end-to-end testing across remote nodes have been -// implemented. +// Simulator is a service in a shard node that simulates requests from +// remote notes coming over the shardp2p network. For example, if +// we are running a proposer service, we would want to simulate notary requests +// requests coming to us via a p2p feed. This service will be removed +// once p2p internals and end-to-end testing across remote +// nodes have been implemented. type Simulator struct { config *params.Config - reader mainchain.Reader - fetcher mainchain.RecordFetcher + client *mainchain.SMCClient p2p *p2p.Server shardID int ctx context.Context cancel context.CancelFunc - errChan chan error // Useful channel for handling errors at the service layer. - delay time.Duration // The delay (in seconds) between simulator requests sent via p2p. - requestSent chan int // Useful channel for handling outgoing requests from the service. + errChan chan error // Useful channel for handling errors at the service layer. + delay time.Duration + requestFeed *event.Feed } // NewSimulator creates a struct instance of a simulator service. // It will have access to config, a mainchain client, a p2p server, // and a shardID. -func NewSimulator(config *params.Config, reader mainchain.Reader, fetcher mainchain.RecordFetcher, p2p *p2p.Server, shardID int, delay time.Duration) (*Simulator, error) { +func NewSimulator(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Server, shardID int, delay time.Duration) (*Simulator, error) { ctx, cancel := context.WithCancel(context.Background()) errChan := make(chan error) - requestSent := make(chan int) - return &Simulator{config, reader, fetcher, p2p, shardID, ctx, cancel, errChan, delay, requestSent}, nil + return &Simulator{config, client, p2p, shardID, ctx, cancel, errChan, delay, nil}, nil } // Start the main loop for simulating p2p requests. func (s *Simulator) Start() { log.Info("Starting simulator service") - feed := s.p2p.Feed(messages.CollationBodyRequest{}) - go s.simulateNotaryRequests(s.fetcher, s.reader, feed) - go s.handleServiceErrors() + s.requestFeed = s.p2p.Feed(messages.CollationBodyRequest{}) + go utils.HandleServiceErrors(s.ctx.Done(), s.errChan) + go s.simulateNotaryRequests(s.client.SMCCaller(), s.client.ChainReader(), time.Tick(time.Second*s.delay)) } // Stop the main loop for simulator requests. @@ -58,37 +56,24 @@ func (s *Simulator) Stop() error { // Triggers a cancel call in the service's context which shuts down every goroutine // in this service. defer s.cancel() + defer close(s.errChan) log.Warn("Stopping simulator service") return nil } -// handleServiceErrors manages a goroutine that listens for errors broadcast to -// this service's error channel. This serves as a final step for error logging -// and is stopped upon the service shutting down. -func (s *Simulator) handleServiceErrors() { - for { - select { - case <-s.ctx.Done(): - return - case err := <-s.errChan: - log.Error(fmt.Sprintf("Simulator service error: %v", err)) - } - } -} - // simulateNotaryRequests simulates p2p message sent out by notaries // once the system is in production. Notaries will be performing // this action within their own service when they are selected on a shard, period // pair to perform their responsibilities. This function in particular simulates // requests for collation bodies that will be relayed to the appropriate proposer // by the p2p feed layer. -func (s *Simulator) simulateNotaryRequests(fetcher mainchain.RecordFetcher, reader mainchain.Reader, feed *event.Feed) { +func (s *Simulator) simulateNotaryRequests(fetcher mainchain.RecordFetcher, reader mainchain.Reader, delayChan <-chan time.Time) { for { select { // Makes sure to close this goroutine when the service stops. case <-s.ctx.Done(): return - case <-time.After(time.Second * s.delay): + case <-delayChan: blockNumber, err := reader.BlockByNumber(s.ctx, nil) if err != nil { s.errChan <- fmt.Errorf("could not fetch current block number: %v", err) @@ -106,9 +91,8 @@ func (s *Simulator) simulateNotaryRequests(fetcher mainchain.RecordFetcher, read Peer: p2p.Peer{}, Data: *req, } - feed.Send(msg) + s.requestFeed.Send(msg) log.Info("Sent request for collation body via a shardp2p feed") - s.requestSent <- 1 } } } diff --git a/sharding/simulator/service_test.go b/sharding/simulator/service_test.go index a684792049..d2559e854c 100644 --- a/sharding/simulator/service_test.go +++ b/sharding/simulator/service_test.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/sharding/mainchain" + ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -90,15 +92,11 @@ func TestStartStop(t *testing.T) { t.Fatalf("Unable to setup p2p server: %v", err) } - simulator, err := NewSimulator(params.DefaultConfig, &goodReader{}, &goodSMCCaller{}, server, shardID, 0) + simulator, err := NewSimulator(params.DefaultConfig, &mainchain.SMCClient{}, server, shardID, 0) if err != nil { t.Fatalf("Unable to setup simulator service: %v", err) } - simulator.Start() - - h.VerifyLogMsg("Starting simulator service") - if err := simulator.Stop(); err != nil { t.Fatalf("Unable to stop simulator service: %v", err) } @@ -115,34 +113,29 @@ func TestStartStop(t *testing.T) { // in the simulateNotaryRequests goroutine when reading the block number from // the mainchain via RPC. func TestSimulateNotaryRequests_FaultyReader(t *testing.T) { - h := internal.NewLogHandler(t) - log.Root().SetHandler(h) - shardID := 0 server, err := p2p.NewServer() if err != nil { t.Fatalf("Unable to setup p2p server: %v", err) } - simulator, err := NewSimulator(params.DefaultConfig, &faultyReader{}, &goodSMCCaller{}, server, shardID, 0) + simulator, err := NewSimulator(params.DefaultConfig, &mainchain.SMCClient{}, server, shardID, 0) if err != nil { t.Fatalf("Unable to setup simulator service: %v", err) } - feed := server.Feed(messages.CollationBodyRequest{}) + simulator.requestFeed = server.Feed(messages.CollationBodyRequest{}) + simulator.errChan = make(chan error) - faultyReader := &faultyReader{} - go simulator.simulateNotaryRequests(&goodSMCCaller{}, faultyReader, feed) + go simulator.simulateNotaryRequests(&goodSMCCaller{}, &faultyReader{}, time.After(time.Second*0)) receivedErr := <-simulator.errChan expectedErr := "could not fetch current block number" if !strings.Contains(receivedErr.Error(), expectedErr) { t.Errorf("Expected error did not match. want: %v, got: %v", expectedErr, receivedErr) } - if err := simulator.Stop(); err != nil { - t.Fatalf("Unable to stop simulator service: %v", err) - } - h.VerifyLogMsg("Stopping simulator service") + + simulator.cancel() // The context should have been canceled. if simulator.ctx.Err() == nil { @@ -154,34 +147,29 @@ func TestSimulateNotaryRequests_FaultyReader(t *testing.T) { // in the simulateNotaryRequests goroutine when reading the collation records // from the SMC. func TestSimulateNotaryRequests_FaultyCaller(t *testing.T) { - h := internal.NewLogHandler(t) - log.Root().SetHandler(h) - shardID := 0 server, err := p2p.NewServer() if err != nil { t.Fatalf("Unable to setup p2p server: %v", err) } - simulator, err := NewSimulator(params.DefaultConfig, &goodReader{}, &faultySMCCaller{}, server, shardID, 0) + simulator, err := NewSimulator(params.DefaultConfig, &mainchain.SMCClient{}, server, shardID, 0) if err != nil { t.Fatalf("Unable to setup simulator service: %v", err) } - feed := server.Feed(messages.CollationBodyRequest{}) - reader := &goodReader{} + simulator.requestFeed = server.Feed(messages.CollationBodyRequest{}) + simulator.errChan = make(chan error) - go simulator.simulateNotaryRequests(&faultySMCCaller{}, reader, feed) + go simulator.simulateNotaryRequests(&faultySMCCaller{}, &goodReader{}, time.After(time.Second*0)) receivedErr := <-simulator.errChan expectedErr := "error constructing collation body request" if !strings.Contains(receivedErr.Error(), expectedErr) { t.Errorf("Expected error did not match. want: %v, got: %v", expectedErr, receivedErr) } - if err := simulator.Stop(); err != nil { - t.Fatalf("Unable to stop simulator service: %v", err) - } - h.VerifyLogMsg("Stopping simulator service") + + simulator.cancel() // The context should have been canceled. if simulator.ctx.Err() == nil { @@ -202,66 +190,25 @@ func TestSimulateNotaryRequests(t *testing.T) { t.Fatalf("Unable to setup p2p server: %v", err) } - simulator, err := NewSimulator(params.DefaultConfig, &goodReader{}, &goodSMCCaller{}, server, shardID, 0) + simulator, err := NewSimulator(params.DefaultConfig, &mainchain.SMCClient{}, server, shardID, 0) if err != nil { t.Fatalf("Unable to setup simulator service: %v", err) } - feed := server.Feed(messages.CollationBodyRequest{}) - reader := &goodReader{} + simulator.requestFeed = server.Feed(messages.CollationBodyRequest{}) + simulator.errChan = make(chan error) + delayChan := make(chan time.Time) - go simulator.simulateNotaryRequests(&goodSMCCaller{}, reader, feed) + go simulator.simulateNotaryRequests(&goodSMCCaller{}, &goodReader{}, delayChan) + + delayChan <- time.Time{} + delayChan <- time.Time{} - <-simulator.requestSent h.VerifyLogMsg("Sent request for collation body via a shardp2p feed") + simulator.cancel() // The context should have been canceled. if simulator.ctx.Err() == nil { - t.Fatal("Context was not canceled") + t.Error("Context was not canceled") } } - -// TODO: Move this to the utils package along with the handleServiceErrors -// function. -func TestHandleServiceErrors(t *testing.T) { - h := internal.NewLogHandler(t) - log.Root().SetHandler(h) - - shardID := 0 - server, err := p2p.NewServer() - if err != nil { - t.Fatalf("Unable to setup p2p server: %v", err) - } - - simulator, err := NewSimulator(params.DefaultConfig, &goodReader{}, &goodSMCCaller{}, server, shardID, 0) - if err != nil { - t.Fatalf("Unable to setup simulator service: %v", err) - } - - go simulator.handleServiceErrors() - - expectedErr := "testing the error channel" - complete := make(chan int) - - go func() { - for { - select { - case <-simulator.ctx.Done(): - return - default: - simulator.errChan <- errors.New(expectedErr) - complete <- 1 - } - } - }() - - <-complete - simulator.cancel() - - // The context should have been canceled. - if simulator.ctx.Err() == nil { - t.Fatal("Context was not canceled") - } - time.Sleep(time.Millisecond * 500) - h.VerifyLogMsg(fmt.Sprintf("Simulator service error: %v", expectedErr)) -} diff --git a/sharding/syncer/service.go b/sharding/syncer/service.go index f62cf15926..8ca70919e3 100644 --- a/sharding/syncer/service.go +++ b/sharding/syncer/service.go @@ -2,17 +2,19 @@ package syncer import ( "context" + "errors" "fmt" "math/big" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/sharding" + "github.com/ethereum/go-ethereum/sharding/database" "github.com/ethereum/go-ethereum/sharding/mainchain" "github.com/ethereum/go-ethereum/sharding/p2p" "github.com/ethereum/go-ethereum/sharding/p2p/messages" "github.com/ethereum/go-ethereum/sharding/params" + "github.com/ethereum/go-ethereum/sharding/utils" ) // Syncer represents a service that provides handlers for shard chain @@ -21,31 +23,36 @@ import ( // items such as transactions and in future sharding iterations: state. type Syncer struct { config *params.Config - signer mainchain.Signer - shard *sharding.Shard + client *mainchain.SMCClient + shardID int + shardChainDB *database.ShardDB p2p *p2p.Server ctx context.Context cancel context.CancelFunc + msgChan chan p2p.Message + bodyRequests event.Subscription errChan chan error // Useful channel for handling errors at the service layer. - responseSent chan int // Useful channel for handling outgoing responses from the service. } // NewSyncer creates a struct instance of a syncer service. // It will have access to config, a signer, a p2p server, // a shardChainDb, and a shardID. -func NewSyncer(config *params.Config, signer mainchain.Signer, p2p *p2p.Server, shardChainDB ethdb.Database, shardID int) (*Syncer, error) { +func NewSyncer(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Server, shardChainDB *database.ShardDB, shardID int) (*Syncer, error) { ctx, cancel := context.WithCancel(context.Background()) - shard := sharding.NewShard(big.NewInt(int64(shardID)), shardChainDB) errChan := make(chan error) - responseSent := make(chan int) - return &Syncer{config, signer, shard, p2p, ctx, cancel, errChan, responseSent}, nil + return &Syncer{config, client, shardID, shardChainDB, p2p, ctx, cancel, nil, nil, errChan}, nil } // Start the main loop for handling shard chain data requests. func (s *Syncer) Start() { log.Info("Starting sync service") - go s.handleCollationBodyRequests(s.signer, s.p2p.Feed(messages.CollationBodyRequest{})) - go s.handleServiceErrors() + + shard := sharding.NewShard(big.NewInt(int64(s.shardID)), s.shardChainDB.DB()) + + s.msgChan = make(chan p2p.Message, 100) + s.bodyRequests = s.p2p.Feed(messages.CollationBodyRequest{}).Subscribe(s.msgChan) + go s.handleCollationBodyRequests(s.client, shard) + go utils.HandleServiceErrors(s.ctx.Done(), s.errChan) } // Stop the main loop. @@ -53,43 +60,26 @@ func (s *Syncer) Stop() error { // Triggers a cancel call in the service's context which shuts down every goroutine // in this service. defer s.cancel() + defer close(s.errChan) + defer close(s.msgChan) log.Warn("Stopping sync service") + s.bodyRequests.Unsubscribe() return nil } -// handleServiceErrors manages a goroutine that listens for errors broadcast to -// this service's error channel. This serves as a final step for error logging -// and is stopped upon the service shutting down. -func (s *Syncer) handleServiceErrors() { - for { - select { - case <-s.ctx.Done(): - return - case err := <-s.errChan: - log.Error(fmt.Sprintf("Sync service error: %v", err)) - } - } -} - // handleCollationBodyRequests subscribes to messages from the shardp2p // network and responds to a specific peer that requested the body using // the Send method exposed by the p2p server's API (implementing the p2p.Sender interface). -func (s *Syncer) handleCollationBodyRequests(signer mainchain.Signer, feed *event.Feed) { - - ch := make(chan p2p.Message, 100) - sub := feed.Subscribe(ch) - - defer sub.Unsubscribe() - +func (s *Syncer) handleCollationBodyRequests(signer mainchain.Signer, collationFetcher sharding.CollationFetcher) { for { select { // Makes sure to close this goroutine when the service stops. case <-s.ctx.Done(): return - case req := <-ch: + case req := <-s.msgChan: if req.Data != nil { log.Info(fmt.Sprintf("Received p2p request of type: %T", req)) - res, err := RespondCollationBody(req, signer, s.shard) + res, err := RespondCollationBody(req, signer, collationFetcher) if err != nil { s.errChan <- fmt.Errorf("could not construct response: %v", err) continue @@ -98,8 +88,10 @@ func (s *Syncer) handleCollationBodyRequests(signer mainchain.Signer, feed *even // Reply to that specific peer only. s.p2p.Send(*res, req.Peer) log.Info(fmt.Sprintf("Responding to p2p request with collation with headerHash: %v", res.HeaderHash.Hex())) - s.responseSent <- 1 } + case <-s.bodyRequests.Err(): + s.errChan <- errors.New("subscriber failed") + return } } } diff --git a/sharding/syncer/service_test.go b/sharding/syncer/service_test.go index db32cdbd44..5c81986677 100644 --- a/sharding/syncer/service_test.go +++ b/sharding/syncer/service_test.go @@ -1,12 +1,13 @@ package syncer import ( - "errors" "fmt" "math/big" "strings" "testing" - "time" + + "github.com/ethereum/go-ethereum/sharding/mainchain" + "github.com/ethereum/go-ethereum/sharding/params" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -19,30 +20,33 @@ import ( "github.com/ethereum/go-ethereum/sharding/database" internal "github.com/ethereum/go-ethereum/sharding/internal" "github.com/ethereum/go-ethereum/sharding/p2p" - "github.com/ethereum/go-ethereum/sharding/params" ) var _ = sharding.Service(&Syncer{}) -func TestStartStop(t *testing.T) { +func TestStop(t *testing.T) { h := internal.NewLogHandler(t) log.Root().SetHandler(h) - shardChainDB := database.NewShardKV() + shardChainDB, err := database.NewShardDB("", "", true) + if err != nil { + t.Fatalf("unable to setup db: %v", err) + } shardID := 0 server, err := p2p.NewServer() if err != nil { t.Fatalf("Unable to setup p2p server: %v", err) } - syncer, err := NewSyncer(params.DefaultConfig, &mockSigner{}, server, shardChainDB, shardID) + syncer, err := NewSyncer(params.DefaultConfig, &mainchain.SMCClient{}, server, shardChainDB, shardID) if err != nil { t.Fatalf("Unable to setup sync service: %v", err) } - syncer.Start() - - h.VerifyLogMsg("Starting sync service") + feed := server.Feed(messages.CollationBodyRequest{}) + syncer.msgChan = make(chan p2p.Message) + syncer.errChan = make(chan error) + syncer.bodyRequests = feed.Subscribe(syncer.msgChan) if err := syncer.Stop(); err != nil { t.Fatalf("Unable to stop sync service: %v", err) @@ -63,46 +67,46 @@ func TestHandleCollationBodyRequests_FaultySigner(t *testing.T) { h := internal.NewLogHandler(t) log.Root().SetHandler(h) - shardChainDB := database.NewShardKV() + shardChainDB, err := database.NewShardDB("", "", true) + if err != nil { + t.Fatalf("unable to setup db: %v", err) + } shardID := 0 server, err := p2p.NewServer() if err != nil { t.Fatalf("Unable to setup p2p server: %v", err) } - syncer, err := NewSyncer(params.DefaultConfig, &mockSigner{}, server, shardChainDB, shardID) + syncer, err := NewSyncer(params.DefaultConfig, &mainchain.SMCClient{}, server, shardChainDB, shardID) if err != nil { t.Fatalf("Unable to setup syncer service: %v", err) } feed := server.Feed(messages.CollationBodyRequest{}) + shard := sharding.NewShard(big.NewInt(int64(shardID)), shardChainDB.DB()) - go syncer.handleCollationBodyRequests(&faultySigner{}, feed) + syncer.msgChan = make(chan p2p.Message) + syncer.errChan = make(chan error) + syncer.bodyRequests = feed.Subscribe(syncer.msgChan) - go func() { - for { - select { - case <-syncer.ctx.Done(): - return - default: - msg := p2p.Message{ - Peer: p2p.Peer{}, - Data: messages.CollationBodyRequest{}, - } - feed.Send(msg) - } - } - }() + go syncer.handleCollationBodyRequests(&faultySigner{}, shard) + msg := p2p.Message{ + Peer: p2p.Peer{}, + Data: messages.CollationBodyRequest{}, + } + syncer.msgChan <- msg receivedErr := <-syncer.errChan expectedErr := "could not construct response" if !strings.Contains(receivedErr.Error(), expectedErr) { t.Errorf("Expected error did not match. want: %v, got: %v", expectedErr, receivedErr) } + syncer.cancel() + // The context should have been canceled. if syncer.ctx.Err() == nil { - t.Fatal("Context was not canceled") + t.Error("Context was not canceled") } } @@ -113,7 +117,10 @@ func TestHandleCollationBodyRequests(t *testing.T) { h := internal.NewLogHandler(t) log.Root().SetHandler(h) - shardChainDB := database.NewShardKV() + shardChainDB, err := database.NewShardDB("", "", true) + if err != nil { + t.Fatalf("unable to setup db: %v", err) + } server, err := p2p.NewServer() if err != nil { t.Fatalf("Unable to setup p2p server: %v", err) @@ -138,94 +145,42 @@ func TestHandleCollationBodyRequests(t *testing.T) { // Stores the collation into the inmemory kv store shardChainDB. collation := sharding.NewCollation(header, body, nil) - shard := sharding.NewShard(shardID, shardChainDB) + shard := sharding.NewShard(shardID, shardChainDB.DB()) if err := shard.SaveCollation(collation); err != nil { t.Fatalf("Could not store collation in shardChainDB: %v", err) } - syncer, err := NewSyncer(params.DefaultConfig, &mockSigner{}, server, shardChainDB, 0) + syncer, err := NewSyncer(params.DefaultConfig, &mainchain.SMCClient{}, server, shardChainDB, 0) if err != nil { t.Fatalf("Unable to setup syncer service: %v", err) } feed := server.Feed(messages.CollationBodyRequest{}) - go syncer.handleCollationBodyRequests(&mockSigner{}, feed) + syncer.msgChan = make(chan p2p.Message) + syncer.errChan = make(chan error) + syncer.bodyRequests = feed.Subscribe(syncer.msgChan) - go func() { - for { - select { - case <-syncer.ctx.Done(): - return - default: - msg := p2p.Message{ - Peer: p2p.Peer{}, - Data: messages.CollationBodyRequest{ - ChunkRoot: &chunkRoot, - ShardID: shardID, - Period: period, - Proposer: &proposerAddress, - }, - } - feed.Send(msg) - } - } - }() + go syncer.handleCollationBodyRequests(&mockSigner{}, shard) + + msg := p2p.Message{ + Peer: p2p.Peer{}, + Data: messages.CollationBodyRequest{ + ChunkRoot: &chunkRoot, + ShardID: shardID, + Period: period, + Proposer: &proposerAddress, + }, + } + syncer.msgChan <- msg - <-syncer.responseSent h.VerifyLogMsg(fmt.Sprintf("Received p2p request of type: %T", p2p.Message{})) h.VerifyLogMsg(fmt.Sprintf("Responding to p2p request with collation with headerHash: %v", header.Hash().Hex())) + syncer.cancel() // The context should have been canceled. if syncer.ctx.Err() == nil { - t.Fatal("Context was not canceled") + t.Error("Context was not canceled") } } - -// TODO: Move this to the utils package along with the handleServiceErrors -// function. -func TestHandleServiceErrors(t *testing.T) { - - h := internal.NewLogHandler(t) - log.Root().SetHandler(h) - - shardChainDB := database.NewShardKV() - shardID := 0 - server, err := p2p.NewServer() - if err != nil { - t.Fatalf("Unable to setup p2p server: %v", err) - } - - syncer, err := NewSyncer(params.DefaultConfig, &mockSigner{}, server, shardChainDB, shardID) - if err != nil { - t.Fatalf("Unable to setup syncer service: %v", err) - } - - go syncer.handleServiceErrors() - - expectedErr := "testing the error channel" - complete := make(chan int) - - go func() { - for { - select { - case <-syncer.ctx.Done(): - return - default: - syncer.errChan <- errors.New(expectedErr) - complete <- 1 - } - } - }() - - <-complete - syncer.cancel() - - // The context should have been canceled. - if syncer.ctx.Err() == nil { - t.Fatal("Context was not canceled") - } - time.Sleep(time.Millisecond * 500) - h.VerifyLogMsg(fmt.Sprintf("Sync service error: %v", expectedErr)) -} diff --git a/sharding/utils/service.go b/sharding/utils/service.go new file mode 100644 index 0000000000..fb9ea00f21 --- /dev/null +++ b/sharding/utils/service.go @@ -0,0 +1,19 @@ +package utils + +import ( + "github.com/ethereum/go-ethereum/log" +) + +// HandleServiceErrors manages a goroutine that listens for errors broadcast to +// this service's error channel. This serves as a final step for error logging +// and is stopped upon the service shutting down. +func HandleServiceErrors(done <-chan struct{}, errChan <-chan error) { + for { + select { + case <-done: + return + case err := <-errChan: + log.Error(err.Error()) + } + } +} diff --git a/sharding/utils/service_test.go b/sharding/utils/service_test.go new file mode 100644 index 0000000000..1578948b9e --- /dev/null +++ b/sharding/utils/service_test.go @@ -0,0 +1,22 @@ +package utils + +import ( + "errors" + "testing" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/sharding/internal" +) + +func TestHandleServiceErrors(t *testing.T) { + h := internal.NewLogHandler(t) + log.Root().SetHandler(h) + done := make(chan struct{}) + errChan := make(chan error) + + go HandleServiceErrors(done, errChan) + + errChan <- errors.New("something wrong") + done <- struct{}{} + h.VerifyLogMsg("something wrong") +}