FIx start/stop of ShardEthereum (#185)

* sharding: Fix mainchain.Client starting RPC connections during ShardEthereum.New. Fix graceful stop

* sharding: Just pass the cli.Context rather than keeping it on the shardEthereum

* sharding: add doc

* sharding: add doc

* Sharding: remove exgtra newline

* sharding:fix lint


Former-commit-id: fdaf8160245d9233b693f685ba6078e4b15fa279 [formerly f7fa71912b7d8340ede6cd08b357056fafbab014]
Former-commit-id: 0e8bfbbc579451178f76263364fdbcd00a91d651
This commit is contained in:
Preston Van Loon
2018-06-16 22:26:03 -04:00
committed by GitHub
parent 9e1c76e693
commit 912b3b65bd
7 changed files with 116 additions and 60 deletions

View File

@@ -31,7 +31,6 @@ func shardingCmd(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("could not initialize sharding node instance: %v", err)
}
defer shardingNode.Close()
// starts a connection to a geth node and kicks off every registered service.
shardingNode.Start()
return nil

View File

@@ -23,6 +23,9 @@ Now you should have a remote pointing to the `origin` repo (geth-sharding) and t
- Go to the [geth-sharding](https://github.com/prysmaticlabs/geth-sharding) repository on Github and start a PR comparing `geth-sharding:master` with `go-ethereum:collations-pool` (your fork on your profile).
- Add a clear PR title along with a description of what this PR encompasses, when it can be closed, and what you are currently working on. Github markdown checklists work great for this.
Pull requests must be cleanly rebased ontop of master. If master advances while your PR is in review, please keep rebasing it.
Before the pull request is merged, make sure that you squash your commits into one commit using `git rebase -i` and `git push -f`. After every commit the test suite must be passing.
## Contributor Responsibilities
We consider two types of contributions to our repo and categorize them as follows:

View File

@@ -126,6 +126,7 @@ After reading the Sharding FAQ, it is important to understand the minimal implem
## Necessary Go Knowledge & Readings
- [The Go Programming Language (Only Recommended Book)](https://www.amazon.com/Programming-Language-Addison-Wesley-Professional-Computing/dp/0134190440)
- [Ethereum Development with Go] (https://goethereumbook.org)
- [How to Write Go Code](http://golang.org/doc/code.html)
- [The Go Programming Language Tour](http://tour.golang.org/)
- [Getting Started With Go](http://www.youtube.com/watch?v=2KmHtgtEZ1s)

View File

@@ -74,41 +74,55 @@ func NewSMCClient(endpoint string, dataDirPath string, depositFlag bool, passwor
ks := keystore.NewKeyStore(keydir, scryptN, scryptP)
// Sets up a connection to a Geth node via RPC.
rpcClient, err := dialRPC(endpoint)
if err != nil {
return nil, fmt.Errorf("cannot start rpc client: %v", err)
}
client := ethclient.NewClient(rpcClient)
// Check account existence and unlock account before starting.
accounts := ks.Accounts()
if len(accounts) == 0 {
return nil, fmt.Errorf("no accounts found")
}
smcClient := &SMCClient{
keystore: ks,
endpoint: endpoint,
depositFlag: depositFlag,
dataDirPath: dataDirPath,
passwordFile: passwordFile,
rpcClient: rpcClient,
client: client,
}
if err := smcClient.unlockAccount(accounts[0]); err != nil {
return nil, fmt.Errorf("cannot unlock account: %v", err)
return smcClient, nil
}
// Start the SMC Client and connect to running geth node.
func (s *SMCClient) Start() {
// Sets up a connection to a Geth node via RPC.
rpcClient, err := dialRPC(s.endpoint)
if err != nil {
log.Crit(fmt.Sprintf("Cannot start rpc client: %v", err))
return
}
s.rpcClient = rpcClient
s.client = ethclient.NewClient(rpcClient)
// Check account existence and unlock account before starting.
accounts := s.keystore.Accounts()
if len(accounts) == 0 {
log.Crit("No accounts found")
return
}
if err := s.unlockAccount(accounts[0]); err != nil {
log.Crit(fmt.Sprintf("Cannot unlock account: %v", err))
return
}
// Initializes bindings to SMC.
smc, err := initSMC(smcClient)
smc, err := initSMC(s)
if err != nil {
return nil, err
log.Crit(fmt.Sprintf("Failed to initialize SMC: %v", err))
return
}
smcClient.smc = smc
return smcClient, nil
s.smc = smc
}
// Stop SMCClient immediately. This cancels any pending RPC connections.
func (s *SMCClient) Stop() error {
s.rpcClient.Close()
return nil
}
// CreateTXOpts creates a *TransactOpts with a signer using the default account on the keystore.

View File

@@ -1,4 +1,9 @@
package mainchain
import "github.com/ethereum/go-ethereum/sharding"
// Verifies that SMCClient implements the Client interface.
var _ = Client(&SMCClient{})
// Verifies that SMCCLient implements the sharding Service inteface.
var _ = sharding.Service(&SMCClient{})

View File

@@ -36,24 +36,24 @@ const shardChainDbName = "shardchaindata"
// it contains APIs and fields that handle the different components of the sharded
// Ethereum network.
type ShardEthereum struct {
shardConfig *params.Config // Holds necessary information to configure shards.
txPool *txpool.TXPool // Defines the sharding-specific txpool. To be designed.
actor sharding.Actor // Either notary, proposer, or observer.
shardChainDb ethdb.Database // Access to the persistent db to store shard data.
eventFeed *event.Feed // Used to enable P2P related interactions via different sharding actors.
smcClient *mainchain.SMCClient // Provides bindings to the SMC deployed on the Ethereum mainchain.
shardConfig *params.Config // Holds necessary information to configure shards.
txPool *txpool.TXPool // Defines the sharding-specific txpool. To be designed.
actor sharding.Actor // Either notary, proposer, or observer.
shardChainDb ethdb.Database // Access to the persistent db to store shard data.
eventFeed *event.Feed // Used to enable P2P related interactions via different sharding actors.
// Lifecycle and service stores.
services map[reflect.Type]sharding.Service // Service registry.
lock sync.RWMutex
stop chan struct{} // Channel to wait for termination notifications
}
// New creates a new sharding-enabled Ethereum instance. This is called in the main
// geth sharding entrypoint.
func New(ctx *cli.Context) (*ShardEthereum, error) {
shardEthereum := &ShardEthereum{
services: make(map[reflect.Type]sharding.Service),
stop: make(chan struct{}),
}
path := node.DefaultDataDir()
@@ -61,46 +61,32 @@ func New(ctx *cli.Context) (*ShardEthereum, error) {
path = ctx.GlobalString(utils.DataDirFlag.Name)
}
endpoint := ctx.Args().First()
if endpoint == "" {
endpoint = fmt.Sprintf("%s/%s.ipc", path, mainchain.ClientIdentifier)
}
if ctx.GlobalIsSet(utils.IPCPathFlag.Name) {
endpoint = ctx.GlobalString(utils.IPCPathFlag.Name)
}
passwordFile := ctx.GlobalString(utils.PasswordFileFlag.Name)
depositFlag := ctx.GlobalBool(utils.DepositFlag.Name)
actorFlag := ctx.GlobalString(utils.ActorFlag.Name)
shardIDFlag := ctx.GlobalInt(utils.ShardIDFlag.Name)
smcClient, err := mainchain.NewSMCClient(endpoint, path, depositFlag, passwordFile)
if err != nil {
return nil, err
}
shardChainDb, err := database.NewShardDB(path, shardChainDbName)
if err != nil {
return nil, err
}
// Adds the initialized SMCClient to the ShardEthereum instance.
shardEthereum.smcClient = smcClient
// Configure shardConfig by loading the default.
shardEthereum.shardConfig = params.DefaultConfig
// Adds the initialized shardChainDb to the ShardEthereum instance.
// TODO: Move out of here!
shardEthereum.shardChainDb = shardChainDb
if err := shardEthereum.registerP2P(); err != nil {
return nil, err
}
if err := shardEthereum.registerMainchainClient(ctx); err != nil {
return nil, err
}
actorFlag := ctx.GlobalString(utils.ActorFlag.Name)
if err := shardEthereum.registerTXPool(actorFlag); err != nil {
return nil, err
}
shardIDFlag := ctx.GlobalInt(utils.ShardIDFlag.Name)
if err := shardEthereum.registerActorService(shardEthereum.shardConfig, actorFlag, shardIDFlag); err != nil {
return nil, err
}
@@ -110,6 +96,7 @@ func New(ctx *cli.Context) (*ShardEthereum, error) {
// Start the ShardEthereum service and kicks off the p2p and actor's main loop.
func (s *ShardEthereum) Start() {
s.lock.Lock()
log.Info("Starting sharding node")
@@ -118,6 +105,9 @@ func (s *ShardEthereum) Start() {
service.Start()
}
stop := s.stop
s.lock.Unlock()
go func() {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
@@ -136,24 +126,24 @@ func (s *ShardEthereum) Start() {
debug.LoudPanic("boom")
}()
// hang forever...
select {}
// Wait for stop channel to be closed
<-stop
}
// Close handles graceful shutdown of the system.
func (s *ShardEthereum) Close() {
s.lock.Lock()
defer s.lock.Unlock()
for kind, service := range s.services {
if err := service.Stop(); err != nil {
log.Crit(fmt.Sprintf("Could not stop the following service: %v, %v", kind, err))
}
}
log.Info("Stopping sharding node")
}
// SMCClient returns an instance of a client that communicates to a mainchain node via
// RPC and provides helpful bindings to the Sharding Manager Contract.
func (s *ShardEthereum) SMCClient() *mainchain.SMCClient {
return s.smcClient
// unblock n.Wait
close(s.stop)
}
// Register appends a service constructor function to the service registry of the
@@ -194,6 +184,28 @@ func (s *ShardEthereum) registerP2P() error {
})
}
// registerMainchainClient
func (s *ShardEthereum) registerMainchainClient(ctx *cli.Context) error {
path := node.DefaultDataDir()
if ctx.GlobalIsSet(utils.DataDirFlag.Name) {
path = ctx.GlobalString(utils.DataDirFlag.Name)
}
endpoint := ctx.Args().First()
if endpoint == "" {
endpoint = fmt.Sprintf("%s/%s.ipc", path, mainchain.ClientIdentifier)
}
if ctx.GlobalIsSet(utils.IPCPathFlag.Name) {
endpoint = ctx.GlobalString(utils.IPCPathFlag.Name)
}
passwordFile := ctx.GlobalString(utils.PasswordFileFlag.Name)
depositFlag := ctx.GlobalBool(utils.DepositFlag.Name)
return s.Register(func(ctx *sharding.ServiceContext) (sharding.Service, error) {
return mainchain.NewSMCClient(endpoint, path, depositFlag, passwordFile)
})
}
// registerTXPool is only relevant to proposers in the sharded system. It will
// spin up a transaction pool that will relay incoming transactions via an
// event feed. For our first releases, this can just relay test/fake transaction data
@@ -216,13 +228,15 @@ func (s *ShardEthereum) registerActorService(config *params.Config, actor string
var p2p *p2p.Server
ctx.RetrieveService(&p2p)
var smcClient *mainchain.SMCClient
ctx.RetrieveService(&smcClient)
if actor == "notary" {
return notary.NewNotary(config, s.smcClient, p2p, s.shardChainDb)
return notary.NewNotary(config, smcClient, p2p, s.shardChainDb)
} else if actor == "proposer" {
var txPool *txpool.TXPool
ctx.RetrieveService(&txPool)
return proposer.NewProposer(config, s.smcClient, p2p, txPool, s.shardChainDb, shardID)
return proposer.NewProposer(config, smcClient, p2p, txPool, s.shardChainDb, shardID)
}
return observer.NewObserver(p2p, s.shardChainDb, shardID)
})

View File

@@ -1,6 +1,26 @@
package node
import "github.com/ethereum/go-ethereum/sharding"
import (
"flag"
"testing"
"github.com/ethereum/go-ethereum/sharding"
cli "gopkg.in/urfave/cli.v1"
)
// Verifies that ShardEthereum implements the Node interface.
var _ = sharding.Node(&ShardEthereum{})
// Test that the sharding node can build with default flag values.
func TestNode_Builds(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
context := cli.NewContext(app, set, nil)
_, err := New(context)
if err != nil {
t.Fatalf("Failed to create ShardEthereum: %v", err)
}
}