Simplify Service Registry, Pass in References as Dependencies to Prevent Nil Pointer Errors (#217)

sharding: simplify service registry to prevent nil pointer errors
Former-commit-id: ba4833c385e5212723932491810baad62e3ff0f9 [formerly c550c6d0837999f46a6de55a36fb1ae92d2ecd6f]
Former-commit-id: 80e9e13bc811444b461dad6bdf9eec633b911bec
This commit is contained in:
Raul Jordan
2018-06-28 20:56:51 -04:00
committed by GitHub
parent f8d4cdda84
commit f3f5b8e5a6
9 changed files with 173 additions and 142 deletions

View File

@@ -1,9 +1,6 @@
package sharding
import (
"fmt"
"reflect"
"github.com/ethereum/go-ethereum/common"
)
@@ -13,7 +10,6 @@ import (
type Node interface {
Start()
Close()
Register(constructor ServiceConstructor) error
}
// Actor refers to either a notary, proposer, or observer in the sharding spec.
@@ -28,17 +24,6 @@ type CollationFetcher interface {
CollationByHeaderHash(headerHash *common.Hash) (*Collation, error)
}
// ServiceContext is a collection of service independent options inherited from
// the protocol stack, that is passed to all constructors to be optionally used;
// as well as utility methods to operate on the service environment.
type ServiceContext struct {
Services map[reflect.Type]Service // Index of the already constructed services
}
// ServiceConstructor is the function signature of the constructors needed to be
// registered for service instantiation.
type ServiceConstructor func(ctx *ServiceContext) (Service, error)
// Service is an individual protocol that can be registered into a node. Having a sharding
// node maintain a service registry allows for easy, shared-dependencies. For example,
// a proposer service might depend on a p2p server, a txpool, an smc client, etc.
@@ -50,14 +35,3 @@ type Service interface {
// blocking until they are all terminated.
Stop() error
}
// RetrieveService sets the `service` argument to a currently running service
// registered of a specific type.
func (ctx *ServiceContext) RetrieveService(service interface{}) error {
element := reflect.ValueOf(service).Elem()
if running, ok := ctx.Services[element.Type()]; ok {
element.Set(reflect.ValueOf(running))
return nil
}
return fmt.Errorf("unknown service: %T", service)
}

View File

@@ -11,6 +11,7 @@ import (
"reflect"
"sync"
"syscall"
"time"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/event"
@@ -31,7 +32,7 @@ import (
"gopkg.in/urfave/cli.v1"
)
const shardChainDbName = "shardchaindata"
const shardChainDBName = "shardchaindata"
// ShardEthereum is a service that is registered and started when geth is launched.
// it contains APIs and fields that handle the different components of the sharded
@@ -43,9 +44,10 @@ type ShardEthereum struct {
eventFeed *event.Feed // Used to enable P2P related interactions via different sharding actors.
// Lifecycle and service stores.
services map[reflect.Type]sharding.Service // Service registry.
lock sync.RWMutex
stop chan struct{} // Channel to wait for termination notifications
services map[reflect.Type]sharding.Service // Service registry.
serviceTypes []reflect.Type // Keeps an ordered slice of registered service types.
lock sync.RWMutex
stop chan struct{} // Channel to wait for termination notifications
}
// New creates a new sharding-enabled Ethereum instance. This is called in the main
@@ -81,12 +83,8 @@ func New(ctx *cli.Context) (*ShardEthereum, error) {
return nil, err
}
// Should not trigger simulation requests if actor is a notary, as this
// is supposed to "simulate" notaries sending requests via p2p.
if actorFlag != "notary" {
if err := shardEthereum.registerSimulatorService(shardEthereum.shardConfig, shardIDFlag); err != nil {
return nil, err
}
if err := shardEthereum.registerSimulatorService(actorFlag, shardEthereum.shardConfig, shardIDFlag); err != nil {
return nil, err
}
if err := shardEthereum.registerSyncerService(shardEthereum.shardConfig, shardIDFlag); err != nil {
@@ -102,9 +100,9 @@ func (s *ShardEthereum) Start() {
log.Info("Starting sharding node")
for _, service := range s.services {
// Start the next service.
service.Start()
for _, kind := range s.serviceTypes {
// Start each service in order of registration.
s.services[kind].Start()
}
stop := s.stop
@@ -148,53 +146,55 @@ func (s *ShardEthereum) Close() {
close(s.stop)
}
// Register appends a service constructor function to the service registry of the
// registerService appends a service constructor function to the service registry of the
// sharding node.
func (s *ShardEthereum) Register(constructor sharding.ServiceConstructor) error {
s.lock.Lock()
defer s.lock.Unlock()
ctx := &sharding.ServiceContext{
Services: make(map[reflect.Type]sharding.Service),
}
// Copy needed for threaded access.
for kind, s := range s.services {
ctx.Services[kind] = s
}
service, err := constructor(ctx)
if err != nil {
return err
}
func (s *ShardEthereum) registerService(service sharding.Service) error {
kind := reflect.TypeOf(service)
if _, exists := s.services[kind]; exists {
return fmt.Errorf("service already exists: %v", kind)
}
s.services[kind] = service
s.serviceTypes = append(s.serviceTypes, kind)
return nil
}
// fetchService takes in a struct pointer and sets the value of that pointer
// to a service currently stored in the service registry. This ensures the input argument is
// set to the right pointer that refers to the originally registered service.
func (s *ShardEthereum) fetchService(service interface{}) error {
if reflect.TypeOf(service).Kind() != reflect.Ptr {
return fmt.Errorf("input must be of pointer type, received value type instead: %T", service)
}
element := reflect.ValueOf(service).Elem()
if running, ok := s.services[element.Type()]; ok {
element.Set(reflect.ValueOf(running))
return nil
}
return fmt.Errorf("unknown service: %T", service)
}
// registerShardChainDB attaches a LevelDB wrapped object to the shardEthereum instance.
func (s *ShardEthereum) registerShardChainDB(ctx *cli.Context) error {
path := node.DefaultDataDir()
if ctx.GlobalIsSet(utils.DataDirFlag.Name) {
path = ctx.GlobalString(utils.DataDirFlag.Name)
}
return s.Register(func(ctx *sharding.ServiceContext) (sharding.Service, error) {
return database.NewShardDB(path, shardChainDbName, false)
})
shardDB, err := database.NewShardDB(path, shardChainDBName, false)
if err != nil {
return fmt.Errorf("could not register shardDB service: %v", err)
}
return s.registerService(shardDB)
}
// registerP2P attaches a p2p server to the ShardEthereum instance.
// TODO: Design this p2p service and the methods it should expose as well as
// its event loop.
func (s *ShardEthereum) registerP2P() error {
return s.Register(func(ctx *sharding.ServiceContext) (sharding.Service, error) {
return p2p.NewServer()
})
shardp2p, err := p2p.NewServer()
if err != nil {
return fmt.Errorf("could not register shardp2p service: %v", err)
}
return s.registerService(shardp2p)
}
// registerMainchainClient
@@ -214,9 +214,11 @@ func (s *ShardEthereum) registerMainchainClient(ctx *cli.Context) error {
passwordFile := ctx.GlobalString(utils.PasswordFileFlag.Name)
depositFlag := ctx.GlobalBool(utils.DepositFlag.Name)
return s.Register(func(ctx *sharding.ServiceContext) (sharding.Service, error) {
return mainchain.NewSMCClient(endpoint, path, depositFlag, passwordFile)
})
client, err := mainchain.NewSMCClient(endpoint, path, depositFlag, passwordFile)
if err != nil {
return fmt.Errorf("could not register smc client service: %v", err)
}
return s.registerService(client)
}
// registerTXPool is only relevant to proposers in the sharded system. It will
@@ -228,53 +230,101 @@ func (s *ShardEthereum) registerTXPool(actor string) error {
if actor != "proposer" {
return nil
}
return s.Register(func(ctx *sharding.ServiceContext) (sharding.Service, error) {
var p2p *p2p.Server
ctx.RetrieveService(&p2p)
return txpool.NewTXPool(p2p)
})
var shardp2p *p2p.Server
if err := s.fetchService(&shardp2p); err != nil {
return err
}
pool, err := txpool.NewTXPool(shardp2p)
if err != nil {
return fmt.Errorf("could not register shard txpool service: %v", err)
}
return s.registerService(pool)
}
// Registers the actor according to CLI flags. Either notary/proposer/observer.
func (s *ShardEthereum) registerActorService(config *params.Config, actor string, shardID int) error {
return s.Register(func(ctx *sharding.ServiceContext) (sharding.Service, error) {
var shardp2p *p2p.Server
if err := s.fetchService(&shardp2p); err != nil {
return err
}
var client *mainchain.SMCClient
if err := s.fetchService(&client); err != nil {
return err
}
var p2p *p2p.Server
ctx.RetrieveService(&p2p)
var smcClient *mainchain.SMCClient
ctx.RetrieveService(&smcClient)
var shardChainDB *database.ShardDB
ctx.RetrieveService(&shardChainDB)
var shardChainDB *database.ShardDB
if err := s.fetchService(&shardChainDB); err != nil {
return err
}
if actor == "notary" {
return notary.NewNotary(config, smcClient, p2p, shardChainDB)
} else if actor == "proposer" {
var txPool *txpool.TXPool
ctx.RetrieveService(&txPool)
return proposer.NewProposer(config, smcClient, p2p, txPool, shardChainDB.DB(), shardID)
if actor == "notary" {
not, err := notary.NewNotary(config, client, shardp2p, shardChainDB)
if err != nil {
return fmt.Errorf("could not register notary service: %v", err)
}
return observer.NewObserver(p2p, shardChainDB.DB(), shardID)
})
return s.registerService(not)
} else if actor == "proposer" {
var pool *txpool.TXPool
if err := s.fetchService(&pool); err != nil {
return err
}
prop, err := proposer.NewProposer(config, client, shardp2p, pool, shardChainDB, shardID)
if err != nil {
return fmt.Errorf("could not register proposer service: %v", err)
}
return s.registerService(prop)
}
obs, err := observer.NewObserver(shardp2p, shardChainDB, shardID)
if err != nil {
return fmt.Errorf("could not register observer service: %v", err)
}
return s.registerService(obs)
}
func (s *ShardEthereum) registerSimulatorService(config *params.Config, shardID int) error {
return s.Register(func(ctx *sharding.ServiceContext) (sharding.Service, error) {
var p2p *p2p.Server
ctx.RetrieveService(&p2p)
var smcClient *mainchain.SMCClient
ctx.RetrieveService(&smcClient)
return simulator.NewSimulator(config, smcClient, p2p, shardID, 15) // 15 second delay between simulator requests.
})
func (s *ShardEthereum) registerSimulatorService(actorFlag string, config *params.Config, shardID int) error {
// Should not trigger simulation requests if actor is a notary, as this
// is supposed to "simulate" notaries sending requests via p2p.
if actorFlag == "notary" {
return nil
}
var shardp2p *p2p.Server
if err := s.fetchService(&shardp2p); err != nil {
return err
}
var client *mainchain.SMCClient
if err := s.fetchService(&client); err != nil {
return err
}
// 15 second delay between simulator requests.
sim, err := simulator.NewSimulator(config, client, shardp2p, shardID, 15*time.Second)
if err != nil {
return fmt.Errorf("could not register simulator service: %v", err)
}
return s.registerService(sim)
}
func (s *ShardEthereum) registerSyncerService(config *params.Config, shardID int) error {
return s.Register(func(ctx *sharding.ServiceContext) (sharding.Service, error) {
var p2p *p2p.Server
ctx.RetrieveService(&p2p)
var smcClient *mainchain.SMCClient
ctx.RetrieveService(&smcClient)
var shardChainDB *database.ShardDB
ctx.RetrieveService(&shardChainDB)
return syncer.NewSyncer(config, smcClient, p2p, shardChainDB, shardID)
})
var shardp2p *p2p.Server
if err := s.fetchService(&shardp2p); err != nil {
return err
}
var client *mainchain.SMCClient
if err := s.fetchService(&client); err != nil {
return err
}
var shardChainDB *database.ShardDB
if err := s.fetchService(&shardChainDB); err != nil {
return err
}
sync, err := syncer.NewSyncer(config, client, shardp2p, shardChainDB, shardID)
if err != nil {
return fmt.Errorf("could not register syncer service: %v", err)
}
return s.registerService(sync)
}

View File

@@ -16,15 +16,15 @@ import (
// in a sharded system. Must satisfy the Service interface defined in
// sharding/service.go.
type Notary struct {
config *params.Config
smcClient *mainchain.SMCClient
p2p *p2p.Server
shardChainDb *database.ShardDB
config *params.Config
smcClient *mainchain.SMCClient
p2p *p2p.Server
dbService *database.ShardDB
}
// NewNotary creates a new notary instance.
func NewNotary(config *params.Config, smcClient *mainchain.SMCClient, p2p *p2p.Server, shardChainDb *database.ShardDB) (*Notary, error) {
return &Notary{config, smcClient, p2p, shardChainDb}, nil
func NewNotary(config *params.Config, smcClient *mainchain.SMCClient, p2p *p2p.Server, dbService *database.ShardDB) (*Notary, error) {
return &Notary{config, smcClient, p2p, dbService}, nil
}
// Start the main routine for a notary.

View File

@@ -5,11 +5,9 @@ package observer
import (
"context"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding"
"github.com/ethereum/go-ethereum/sharding/database"
"github.com/ethereum/go-ethereum/sharding/p2p"
)
@@ -17,23 +15,24 @@ import (
// in a sharded system. Must satisfy the Service interface defined in
// sharding/service.go.
type Observer struct {
p2p *p2p.Server
shard *sharding.Shard
ctx context.Context
cancel context.CancelFunc
p2p *p2p.Server
dbService *database.ShardDB
shardID int
ctx context.Context
cancel context.CancelFunc
}
// NewObserver creates a struct instance of a observer service,
// it will have access to a p2p server and a shardChainDb.
func NewObserver(p2p *p2p.Server, shardChainDB ethdb.Database, shardID int) (*Observer, error) {
// it will have access to a p2p server and a shardChainDB.
func NewObserver(p2p *p2p.Server, dbService *database.ShardDB, shardID int) (*Observer, error) {
ctx, cancel := context.WithCancel(context.Background())
shard := sharding.NewShard(big.NewInt(int64(shardID)), shardChainDB)
return &Observer{p2p, shard, ctx, cancel}, nil
return &Observer{p2p, dbService, shardID, ctx, cancel}, nil
}
// Start the main loop for observer service.
func (o *Observer) Start() {
log.Info(fmt.Sprintf("Starting observer service"))
// shard := sharding.NewShard(big.NewInt(int64(o.shardID)), o.dbService.DB())
}
// Stop the main loop for observer service.

View File

@@ -21,7 +21,10 @@ func TestStartStop(t *testing.T) {
if err != nil {
t.Fatalf("Unable to setup p2p server: %v", err)
}
shardChainDB := database.NewShardKV()
shardChainDB, err := database.NewShardDB("", "", true)
if err != nil {
t.Fatalf("Unable to setup db: %v", err)
}
shardID := 0
observer, err := NewObserver(server, shardChainDB, shardID)

View File

@@ -8,10 +8,10 @@ import (
"math/big"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding"
"github.com/ethereum/go-ethereum/sharding/database"
"github.com/ethereum/go-ethereum/sharding/mainchain"
"github.com/ethereum/go-ethereum/sharding/p2p"
"github.com/ethereum/go-ethereum/sharding/params"
@@ -22,31 +22,32 @@ import (
// in a sharded system. Must satisfy the Service interface defined in
// sharding/service.go.
type Proposer struct {
config *params.Config
client *mainchain.SMCClient
p2p *p2p.Server
txpool *txpool.TXPool
txpoolSub event.Subscription
shardChainDb ethdb.Database
shard *sharding.Shard
ctx context.Context
cancel context.CancelFunc
config *params.Config
client *mainchain.SMCClient
p2p *p2p.Server
txpool *txpool.TXPool
txpoolSub event.Subscription
dbService *database.ShardDB
shardID int
shard *sharding.Shard
ctx context.Context
cancel context.CancelFunc
}
// NewProposer creates a struct instance of a proposer service.
// It will have access to a mainchain client, a p2p network,
// and a shard transaction pool.
func NewProposer(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Server, txpool *txpool.TXPool, shardChainDB ethdb.Database, shardID int) (*Proposer, error) {
func NewProposer(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Server, txpool *txpool.TXPool, dbService *database.ShardDB, shardID int) (*Proposer, error) {
ctx, cancel := context.WithCancel(context.Background())
shard := sharding.NewShard(big.NewInt(int64(shardID)), shardChainDB)
return &Proposer{
config,
client,
p2p,
txpool,
nil,
shardChainDB,
shard,
dbService,
shardID,
nil,
ctx,
cancel}, nil
}
@@ -54,6 +55,8 @@ func NewProposer(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Se
// Start the main loop for proposing collations.
func (p *Proposer) Start() {
log.Info("Starting proposer service")
shard := sharding.NewShard(big.NewInt(int64(p.shardID)), p.dbService.DB())
p.shard = shard
go p.proposeCollations()
}
@@ -78,10 +81,10 @@ func (p *Proposer) proposeCollations() {
log.Error(fmt.Sprintf("Create collation failed: %v", err))
}
case <-p.ctx.Done():
log.Error("Proposer context closed, exiting goroutine")
log.Debug("Proposer context closed, exiting goroutine")
return
case err := <-p.txpoolSub.Err():
log.Error(fmt.Sprintf("Subscriber closed: %v", err))
case <-p.txpoolSub.Err():
log.Debug("Subscriber closed")
return
}
}
@@ -107,7 +110,7 @@ func (p *Proposer) createCollation(ctx context.Context, txs []*types.Transaction
return nil
}
log.Info(fmt.Sprintf("Saved collation with header hash %v to shardChainDb", collation.Header().Hash().Hex()))
log.Info(fmt.Sprintf("Saved collation with header hash %v to shardChainDB", collation.Header().Hash().Hex()))
// Check SMC if we can submit header before addHeader.
canAdd, err := checkHeaderAdded(p.client, p.shard.ShardID(), period)

View File

@@ -72,6 +72,7 @@ func (s *Simulator) simulateNotaryRequests(fetcher mainchain.RecordFetcher, read
select {
// Makes sure to close this goroutine when the service stops.
case <-s.ctx.Done():
log.Debug("Simulator context closed, exiting goroutine")
return
case <-delayChan:
blockNumber, err := reader.BlockByNumber(s.ctx, nil)

View File

@@ -204,6 +204,7 @@ func TestSimulateNotaryRequests(t *testing.T) {
delayChan <- time.Time{}
delayChan <- time.Time{}
h.VerifyLogMsg("Simulator context closed, exiting goroutine")
h.VerifyLogMsg("Sent request for collation body via a shardp2p feed")
simulator.cancel()

View File

@@ -36,7 +36,7 @@ type Syncer struct {
// NewSyncer creates a struct instance of a syncer service.
// It will have access to config, a signer, a p2p server,
// a shardChainDb, and a shardID.
// a shardChainDB, and a shardID.
func NewSyncer(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Server, shardChainDB *database.ShardDB, shardID int) (*Syncer, error) {
ctx, cancel := context.WithCancel(context.Background())
errChan := make(chan error)