Cleanup refactoring for sharding client interface and collator.

Former-commit-id: 9d452ada62e9afe7295d07b2e7650736e640b39a [formerly ef6fcf4365cff18f14e9bfbd43d6b9d362abfbe4]
Former-commit-id: ede05e77ef22b10fd7d12ac635d2879165416904
This commit is contained in:
Preston Van Loon
2018-03-31 14:25:39 -04:00
parent 9614c81228
commit 13789b6e63
6 changed files with 101 additions and 103 deletions

View File

@@ -36,12 +36,8 @@ Launches a sharding proposer client that connects to a running geth node and pro
)
func collatorClient(ctx *cli.Context) error {
c := collator.NewCollatorClient(ctx)
if err := collator.CollatorStart(c); err != nil {
return err
}
c.Wait()
return nil
c := collator.NewCollator(ctx)
return c.Start()
}
func proposerClient(ctx *cli.Context) error {

View File

@@ -8,7 +8,7 @@ import (
"math/big"
"os"
"github.com/ethereum/go-ethereum"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
@@ -17,7 +17,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/sharding/contracts"
@@ -30,22 +29,27 @@ const (
// General Client for Collator. Communicates to Geth node via JSON RPC.
type ShardingClient struct {
endpoint string // Endpoint to JSON RPC
client *ethclient.Client // Ethereum RPC client.
keystore *keystore.KeyStore // Keystore containing the single signer
Ctx *cli.Context // Command line context
Smc *contracts.SMC // The deployed sharding management contract
type shardingClient struct {
endpoint string // Endpoint to JSON RPC
client *ethclient.Client // Ethereum RPC client.
keystore *keystore.KeyStore // Keystore containing the single signer
ctx *cli.Context // Command line context
smc *contracts.SMC // The deployed sharding management contract
rpcClient *rpc.Client // The RPC client connection to the main geth node
}
type Client interface {
MakeClient(*cli.Context) *ShardingClient
// MakeClient(*cli.Context) *shardingClient
Start() error
Close()
CreateTXOps(*big.Int) (*bind.TransactOpts, error)
initSMC() error
ChainReader() ethereum.ChainReader
Account() *accounts.Account
SMCCaller() *contracts.SMCCaller
SMCTransactor() *contracts.SMCTransactor
}
func MakeClient(ctx *cli.Context) *ShardingClient {
func NewClient(ctx *cli.Context) *shardingClient {
path := node.DefaultDataDir()
if ctx.GlobalIsSet(utils.DataDirFlag.Name) {
path = ctx.GlobalString(utils.DataDirFlag.Name)
@@ -69,51 +73,54 @@ func MakeClient(ctx *cli.Context) *ShardingClient {
}
ks := keystore.NewKeyStore(keydir, scryptN, scryptP)
return &ShardingClient{
return &shardingClient{
endpoint: endpoint,
keystore: ks,
Ctx: ctx,
ctx: ctx,
}
}
// Start the sharding client.
// * Connects to Geth node.
// * Verifies or deploys the sharding manager contract.
func (c *ShardingClient) Start() (*rpc.Client, error) {
func (c *shardingClient) Start() error {
rpcClient, err := dialRPC(c.endpoint)
if err != nil {
return nil, fmt.Errorf("cannot start rpc client. %v", err)
return fmt.Errorf("cannot start rpc client. %v", err)
}
c.rpcClient = rpcClient
c.client = ethclient.NewClient(rpcClient)
// Check account existence and unlock account before starting collator client
accounts := c.keystore.Accounts()
if len(accounts) == 0 {
return nil, fmt.Errorf("no accounts found")
return fmt.Errorf("no accounts found")
}
if err := c.unlockAccount(accounts[0]); err != nil {
return nil, fmt.Errorf("cannot unlock account. %v", err)
return fmt.Errorf("cannot unlock account. %v", err)
}
if err := initSMC(c); err != nil {
return nil, err
smc, err := initSMC(c)
if err != nil {
return err
}
c.smc = smc
return rpcClient, nil
return nil
}
// Wait until collator client is shutdown.
func (c *ShardingClient) Wait() {
log.Info("Sharding client has been shutdown...")
// Close the RPC client connection
func (c *shardingClient) Close() {
c.rpcClient.Close()
}
// UnlockAccount will unlock the specified account using utils.PasswordFileFlag or empty string if unset.
func (c *ShardingClient) unlockAccount(account accounts.Account) error {
func (c *shardingClient) unlockAccount(account accounts.Account) error {
pass := ""
if c.Ctx.GlobalIsSet(utils.PasswordFileFlag.Name) {
file, err := os.Open(c.Ctx.GlobalString(utils.PasswordFileFlag.Name))
if c.ctx.GlobalIsSet(utils.PasswordFileFlag.Name) {
file, err := os.Open(c.ctx.GlobalString(utils.PasswordFileFlag.Name))
if err != nil {
return fmt.Errorf("unable to open file containing account password %s. %v", utils.PasswordFileFlag.Value, err)
}
@@ -133,7 +140,8 @@ func (c *ShardingClient) unlockAccount(account accounts.Account) error {
return c.keystore.Unlock(account, pass)
}
func (c *ShardingClient) CreateTXOps(value *big.Int) (*bind.TransactOpts, error) {
// CreateTXOps creates a *TransactOpts with a signer using the default account on the keystore.
func (c *shardingClient) CreateTXOps(value *big.Int) (*bind.TransactOpts, error) {
account := c.Account()
return &bind.TransactOpts{
@@ -150,25 +158,29 @@ func (c *ShardingClient) CreateTXOps(value *big.Int) (*bind.TransactOpts, error)
}
// Account to use for sharding transactions.
func (c *ShardingClient) Account() *accounts.Account {
func (c *shardingClient) Account() *accounts.Account {
accounts := c.keystore.Accounts()
return &accounts[0]
}
// ChainReader for interacting with the chain.
func (c *ShardingClient) ChainReader() ethereum.ChainReader {
func (c *shardingClient) ChainReader() ethereum.ChainReader {
return ethereum.ChainReader(c.client)
}
// Client to interact with ethereum node.
func (c *ShardingClient) ethereumClient() *ethclient.Client {
func (c *shardingClient) ethereumClient() *ethclient.Client {
return c.client
}
// SMCCaller to interact with the sharding manager contract.
func (c *ShardingClient) SMCCaller() *contracts.SMCCaller {
return &c.Smc.SMCCaller
func (c *shardingClient) SMCCaller() *contracts.SMCCaller {
return &c.smc.SMCCaller
}
func (c *shardingClient) SMCTransactor() *contracts.SMCTransactor {
return &c.smc.SMCTransactor
}
// dialRPC endpoint to node.

View File

@@ -13,41 +13,42 @@ import (
// initSMC initializes the sharding manager contract bindings.
// If the SMC does not exist, it will be deployed.
func initSMC(c *ShardingClient) error {
func initSMC(c *shardingClient) (*contracts.SMC, error) {
b, err := c.client.CodeAt(context.Background(), sharding.ShardingManagerAddress, nil)
if err != nil {
return fmt.Errorf("unable to get contract code at %s: %v", sharding.ShardingManagerAddress, err)
return nil, fmt.Errorf("unable to get contract code at %s: %v", sharding.ShardingManagerAddress, err)
}
// Deploy SMC for development only.
// TODO: Separate contract deployment from the sharding client. It would only need to be deployed
// once on the mainnet, so this code would not need to ship with the client.
if len(b) == 0 {
log.Info(fmt.Sprintf("No sharding manager contract found at %s. Deploying new contract.", sharding.ShardingManagerAddress.String()))
txOps, err := c.CreateTXOps(big.NewInt(0))
if err != nil {
return fmt.Errorf("unable to intiate the transaction: %v", err)
return nil, fmt.Errorf("unable to intiate the transaction: %v", err)
}
addr, tx, contract, err := contracts.DeploySMC(txOps, c.client)
if err != nil {
return fmt.Errorf("unable to deploy sharding manager contract: %v", err)
return nil, fmt.Errorf("unable to deploy sharding manager contract: %v", err)
}
for pending := true; pending; _, pending, err = c.client.TransactionByHash(context.Background(), tx.Hash()) {
if err != nil {
return fmt.Errorf("unable to get transaction by hash: %v", err)
return nil, fmt.Errorf("unable to get transaction by hash: %v", err)
}
time.Sleep(1 * time.Second)
}
c.Smc = contract
return contract, nil
log.Info(fmt.Sprintf("New contract deployed at %s", addr.String()))
} else {
contract, err := contracts.NewSMC(sharding.ShardingManagerAddress, c.client)
if err != nil {
return fmt.Errorf("failed to create sharding contract: %v", err)
}
c.Smc = contract
}
return nil
contract, err := contracts.NewSMC(sharding.ShardingManagerAddress, c.client)
if err != nil {
return nil, fmt.Errorf("failed to create sharding contract: %v", err)
}
return contract, nil
}

View File

@@ -5,31 +5,21 @@ import (
"fmt"
"math/big"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding"
"github.com/ethereum/go-ethereum/sharding/contracts"
"github.com/ethereum/go-ethereum/sharding/client"
)
type collator interface {
Account() *accounts.Account
ChainReader() ethereum.ChainReader
SMCCaller() *contracts.SMCCaller
}
// SubscribeBlockHeaders checks incoming block headers and determines if
// we are an eligible collator for collations. Then, it finds the pending tx's
// from the running geth node and sorts them by descending order of gas price,
// eliminates those that ask for too much gas, and routes them over
// to the SMC to create a collation
func subscribeBlockHeaders(c collator) error {
func subscribeBlockHeaders(c client.Client) error {
headerChan := make(chan *types.Header, 16)
account := c.Account()
_, err := c.ChainReader().SubscribeNewHead(context.Background(), headerChan)
if err != nil {
return fmt.Errorf("unable to subscribe to incoming headers. %v", err)
@@ -54,7 +44,6 @@ func subscribeBlockHeaders(c collator) error {
return fmt.Errorf("unable to watch shards. %v", err)
}
} else {
log.Warn(fmt.Sprintf("Account %s not in collator pool.", account.Address.String()))
}
}
@@ -64,9 +53,7 @@ func subscribeBlockHeaders(c collator) error {
// collation for the available shards in the SMC. The function calls
// getEligibleCollator from the SMC and collator a collation if
// conditions are met
func checkSMCForCollator(c collator, head *types.Header) error {
account := c.Account()
func checkSMCForCollator(c client.Client, head *types.Header) error {
log.Info("Checking if we are an eligible collation collator for a shard...")
period := big.NewInt(0).Div(head.Number, big.NewInt(sharding.PeriodLength))
for s := int64(0); s < sharding.ShardCount; s++ {
@@ -78,7 +65,7 @@ func checkSMCForCollator(c collator, head *types.Header) error {
}
// If output is non-empty and the addr == coinbase
if addr == account.Address {
if addr == c.Account().Address {
log.Info(fmt.Sprintf("Selected as collator on shard: %d", s))
err := submitCollation(s)
if err != nil {
@@ -94,10 +81,14 @@ func checkSMCForCollator(c collator, head *types.Header) error {
// we can't guarantee our tx for deposit will be in the next block header we receive.
// The function calls IsCollatorDeposited from the SMC and returns true if
// the client is in the collator pool
func isAccountInCollatorPool(c collator) (bool, error) {
func isAccountInCollatorPool(c client.Client) (bool, error) {
account := c.Account()
// Checks if our deposit has gone through according to the SMC
return c.SMCCaller().IsCollatorDeposited(&bind.CallOpts{}, account.Address)
b, err := c.SMCCaller().IsCollatorDeposited(&bind.CallOpts{}, account.Address)
if !b && err != nil {
log.Warn(fmt.Sprintf("Account %s not in collator pool.", account.Address.String()))
}
return b, err
}
// submitCollation interacts with the SMC directly to add a collation header

View File

@@ -1,33 +1,38 @@
package collator
import (
"github.com/ethereum/go-ethereum/sharding/client"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding/client"
cli "gopkg.in/urfave/cli.v1"
)
func NewCollatorClient(ctx *cli.Context) *client.ShardingClient {
c := client.MakeClient(ctx)
return c
type Collator interface {
Start() error
}
func CollatorStart(sclient *client.ShardingClient) error {
type collator struct {
client client.Client
}
// NewCollator creates a new collator instance.
func NewCollator(ctx *cli.Context) Collator {
return &collator{
client: client.NewClient(ctx),
}
}
// Start the main routine for a collator.
func (c *collator) Start() error {
log.Info("Starting collator client")
rpcClient, err := sclient.Start()
defer rpcClient.Close()
err := c.client.Start()
if err != nil {
return err
}
defer c.client.Close()
if err := joinCollatorPool(sclient); err != nil {
return err
}
if err := subscribeBlockHeaders(sclient); err != nil {
if err := joinCollatorPool(c.client); err != nil {
return err
}
return nil
}

View File

@@ -4,7 +4,6 @@ import (
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/sharding"
@@ -13,25 +12,19 @@ import (
// joinCollatorPool checks if the account is a collator in the SMC. If
// the account is not in the set, it will deposit 100ETH into contract.
func joinCollatorPool(c *client.ShardingClient) error {
if c.Ctx.GlobalBool(utils.DepositFlag.Name) {
log.Info("Joining collator pool")
txOps, err := c.CreateTXOps(sharding.DepositSize)
if err != nil {
return fmt.Errorf("unable to intiate the deposit transaction: %v", err)
}
tx, err := c.Smc.SMCTransactor.Deposit(txOps)
if err != nil {
return fmt.Errorf("unable to deposit eth and become a collator: %v", err)
}
log.Info(fmt.Sprintf("Deposited %dETH into contract with transaction hash: %s", new(big.Int).Div(sharding.DepositSize, big.NewInt(params.Ether)), tx.Hash().String()))
} else {
log.Info("Not joining collator pool")
func joinCollatorPool(c client.Client) error {
log.Info("Joining collator pool")
txOps, err := c.CreateTXOps(sharding.DepositSize)
if err != nil {
return fmt.Errorf("unable to intiate the deposit transaction: %v", err)
}
tx, err := c.SMCTransactor().Deposit(txOps)
if err != nil {
return fmt.Errorf("unable to deposit eth and become a collator: %v", err)
}
log.Info(fmt.Sprintf("Deposited %dETH into contract with transaction hash: %s", new(big.Int).Div(sharding.DepositSize, big.NewInt(params.Ether)), tx.Hash().String()))
return nil
}