diff --git a/sharding/READINGS.md b/sharding/READINGS.md index ee07bd84d8..046c72d1ce 100644 --- a/sharding/READINGS.md +++ b/sharding/READINGS.md @@ -126,7 +126,7 @@ After reading the Sharding FAQ, it is important to understand the minimal implem ## Necessary Go Knowledge & Readings - [The Go Programming Language (Only Recommended Book)](https://www.amazon.com/Programming-Language-Addison-Wesley-Professional-Computing/dp/0134190440) -- [Ethereum Development with Go] (https://goethereumbook.org) +- [Ethereum Development with Go](https://goethereumbook.org) - [How to Write Go Code](http://golang.org/doc/code.html) - [The Go Programming Language Tour](http://tour.golang.org/) - [Getting Started With Go](http://www.youtube.com/watch?v=2KmHtgtEZ1s) diff --git a/sharding/database/database.go b/sharding/database/database.go index 7b1f155ba2..7d5897986e 100644 --- a/sharding/database/database.go +++ b/sharding/database/database.go @@ -4,14 +4,53 @@ package database import ( + "fmt" "path/filepath" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" ) -// NewShardDB initializes a shardDB that writes to local disk. -func NewShardDB(dataDir string, name string) (ethdb.Database, error) { +type ShardDB struct { + dataDir string + name string + cache int + handles int + db *ethdb.LDBDatabase +} + +// NewShardDB initializes a shardDB. +func NewShardDB(dataDir string, name string) (*ShardDB, error) { // Uses default cache and handles values. // TODO: allow these arguments to be set based on cli context. - return ethdb.NewLDBDatabase(filepath.Join(dataDir, name), 16, 16) + return &ShardDB{ + dataDir: dataDir, + name: name, + cache: 16, + handles: 16, + db: nil, + }, nil +} + +// Start the shard DB service. +func (s *ShardDB) Start() { + log.Info("Starting shardDB service") + db, err := ethdb.NewLDBDatabase(filepath.Join(s.dataDir, s.name), s.cache, s.handles) + if err != nil { + log.Error(fmt.Sprintf("Could not start shard DB: %v", err)) + return + } + s.db = db +} + +// Stop the shard DB service gracefully. +func (s *ShardDB) Stop() error { + log.Info("Stopping shardDB service") + s.db.Close() + return nil +} + +// DB returns the attached ethdb instance. +func (s *ShardDB) DB() ethdb.Database { + return s.db } diff --git a/sharding/database/database_test.go b/sharding/database/database_test.go index f1c9f4d37c..eaefa2024c 100644 --- a/sharding/database/database_test.go +++ b/sharding/database/database_test.go @@ -4,24 +4,52 @@ import ( "strconv" "testing" - "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/sharding" + internal "github.com/ethereum/go-ethereum/sharding/internal" ) -var db ethdb.Database +// Verifies that ShardDB implements the sharding Service inteface. +var _ = sharding.Service(&ShardDB{}) + +var testDB *ShardDB func init() { - shardDB, err := NewShardDB("/tmp/datadir", "shardchaindata") + shardDB, _ := NewShardDB("/tmp/datadir", "shardchaindata") + testDB = shardDB + testDB.Start() +} + +func TestLifecycle(t *testing.T) { + h := internal.NewLogHandler(t) + log.Root().SetHandler(h) + + s, err := NewShardDB("/tmp/datadir", "shardchaindb") if err != nil { - panic(err) + t.Fatalf("Could not initialize a new sb: %v", err) + } + + s.Start() + h.VerifyLogMsg("Starting shardDB service") + // ethdb.NewLDBDatabase logs the following + h.VerifyLogMsg("Allocated cache and file handles") + + s.Stop() + h.VerifyLogMsg("Stopping shardDB service") + + // Access DB after it's stopped, this should fail + _, err = s.db.Get([]byte("ralph merkle")) + + if err.Error() != "leveldb: closed" { + t.Fatalf("shardDB close function did not work") } - db = shardDB } // Testing the concurrency of the shardDB with multiple processes attempting to write. func Test_DBConcurrent(t *testing.T) { for i := 0; i < 100; i++ { go func(val string) { - if err := db.Put([]byte("ralph merkle"), []byte(val)); err != nil { + if err := testDB.db.Put([]byte("ralph merkle"), []byte(val)); err != nil { t.Errorf("could not save value in db: %v", err) } }(strconv.Itoa(i)) @@ -29,7 +57,7 @@ func Test_DBConcurrent(t *testing.T) { } func Test_DBPut(t *testing.T) { - if err := db.Put([]byte("ralph merkle"), []byte{1, 2, 3}); err != nil { + if err := testDB.db.Put([]byte("ralph merkle"), []byte{1, 2, 3}); err != nil { t.Errorf("could not save value in db: %v", err) } } @@ -37,11 +65,11 @@ func Test_DBPut(t *testing.T) { func Test_DBHas(t *testing.T) { key := []byte("ralph merkle") - if err := db.Put(key, []byte{1, 2, 3}); err != nil { + if err := testDB.db.Put(key, []byte{1, 2, 3}); err != nil { t.Fatalf("could not save value in db: %v", err) } - has, err := db.Has(key) + has, err := testDB.db.Has(key) if err != nil { t.Errorf("could not check if db has key: %v", err) } @@ -50,7 +78,7 @@ func Test_DBHas(t *testing.T) { } key2 := []byte{} - has2, err := db.Has(key2) + has2, err := testDB.db.Has(key2) if err != nil { t.Errorf("could not check if db has key: %v", err) } @@ -62,11 +90,11 @@ func Test_DBHas(t *testing.T) { func Test_DBGet(t *testing.T) { key := []byte("ralph merkle") - if err := db.Put(key, []byte{1, 2, 3}); err != nil { + if err := testDB.db.Put(key, []byte{1, 2, 3}); err != nil { t.Fatalf("could not save value in db: %v", err) } - val, err := db.Get(key) + val, err := testDB.db.Get(key) if err != nil { t.Errorf("get failed: %v", err) } @@ -75,7 +103,7 @@ func Test_DBGet(t *testing.T) { } key2 := []byte{} - val2, err := db.Get(key2) + val2, err := testDB.db.Get(key2) if len(val2) != 0 { t.Errorf("non-existent key should not have a value. key=%v, value=%v", key2, val2) } @@ -84,11 +112,11 @@ func Test_DBGet(t *testing.T) { func Test_DBDelete(t *testing.T) { key := []byte("ralph merkle") - if err := db.Put(key, []byte{1, 2, 3}); err != nil { + if err := testDB.db.Put(key, []byte{1, 2, 3}); err != nil { t.Fatalf("could not save value in db: %v", err) } - if err := db.Delete(key); err != nil { + if err := testDB.db.Delete(key); err != nil { t.Errorf("could not delete key: %v", key) } } diff --git a/sharding/internal/log_helper.go b/sharding/internal/log_helper.go index cfbd32b172..374ba8f4c8 100644 --- a/sharding/internal/log_helper.go +++ b/sharding/internal/log_helper.go @@ -43,6 +43,6 @@ func (h *LogHandler) VerifyLogMsg(str string) { h.t.Error("Expected a log, but there were none!") } if l := h.Pop(); l.Msg != str { - h.t.Errorf("Unexpected log: %v. Wanted: %s", l, str) + h.t.Errorf("Unexpected log: %v. Wanted: %s", l.Msg, str) } } diff --git a/sharding/node/backend.go b/sharding/node/backend.go index ed10c5e633..d4f2285da9 100644 --- a/sharding/node/backend.go +++ b/sharding/node/backend.go @@ -13,7 +13,6 @@ import ( "syscall" "github.com/ethereum/go-ethereum/cmd/utils" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/debug" "github.com/ethereum/go-ethereum/log" @@ -36,11 +35,10 @@ const shardChainDbName = "shardchaindata" // it contains APIs and fields that handle the different components of the sharded // Ethereum network. type ShardEthereum struct { - shardConfig *params.Config // Holds necessary information to configure shards. - txPool *txpool.TXPool // Defines the sharding-specific txpool. To be designed. - actor sharding.Actor // Either notary, proposer, or observer. - shardChainDb ethdb.Database // Access to the persistent db to store shard data. - eventFeed *event.Feed // Used to enable P2P related interactions via different sharding actors. + shardConfig *params.Config // Holds necessary information to configure shards. + txPool *txpool.TXPool // Defines the sharding-specific txpool. To be designed. + actor sharding.Actor // Either notary, proposer, or observer. + eventFeed *event.Feed // Used to enable P2P related interactions via different sharding actors. // Lifecycle and service stores. services map[reflect.Type]sharding.Service // Service registry. @@ -56,22 +54,12 @@ func New(ctx *cli.Context) (*ShardEthereum, error) { stop: make(chan struct{}), } - path := node.DefaultDataDir() - if ctx.GlobalIsSet(utils.DataDirFlag.Name) { - path = ctx.GlobalString(utils.DataDirFlag.Name) - } - - shardChainDb, err := database.NewShardDB(path, shardChainDbName) - if err != nil { - return nil, err - } - // Configure shardConfig by loading the default. shardEthereum.shardConfig = params.DefaultConfig - // Adds the initialized shardChainDb to the ShardEthereum instance. - // TODO: Move out of here! - shardEthereum.shardChainDb = shardChainDb + if err := shardEthereum.registerShardChainDB(ctx); err != nil { + return nil, err + } if err := shardEthereum.registerP2P(); err != nil { return nil, err @@ -175,6 +163,17 @@ func (s *ShardEthereum) Register(constructor sharding.ServiceConstructor) error return nil } +// registerShardChainDB attaches a LevelDB wrapped object to the shardEthereum instance. +func (s *ShardEthereum) registerShardChainDB(ctx *cli.Context) error { + path := node.DefaultDataDir() + if ctx.GlobalIsSet(utils.DataDirFlag.Name) { + path = ctx.GlobalString(utils.DataDirFlag.Name) + } + return s.Register(func(ctx *sharding.ServiceContext) (sharding.Service, error) { + return database.NewShardDB(path, shardChainDbName) + }) +} + // registerP2P attaches a p2p server to the ShardEthereum instance. // TODO: Design this p2p service and the methods it should expose as well as // its event loop. @@ -230,14 +229,16 @@ func (s *ShardEthereum) registerActorService(config *params.Config, actor string ctx.RetrieveService(&p2p) var smcClient *mainchain.SMCClient ctx.RetrieveService(&smcClient) + var shardChainDB *database.ShardDB + ctx.RetrieveService(&shardChainDB) if actor == "notary" { - return notary.NewNotary(config, smcClient, p2p, s.shardChainDb) + return notary.NewNotary(config, smcClient, p2p, shardChainDB) } else if actor == "proposer" { var txPool *txpool.TXPool ctx.RetrieveService(&txPool) - return proposer.NewProposer(config, smcClient, p2p, txPool, s.shardChainDb, shardID) + return proposer.NewProposer(config, smcClient, p2p, txPool, shardChainDB, shardID) } - return observer.NewObserver(p2p, s.shardChainDb, shardID) + return observer.NewObserver(p2p, shardChainDB.DB(), shardID) }) } diff --git a/sharding/notary/service.go b/sharding/notary/service.go index eae73a96eb..ef0da97fc1 100644 --- a/sharding/notary/service.go +++ b/sharding/notary/service.go @@ -5,8 +5,8 @@ package notary import ( "fmt" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/sharding/database" "github.com/ethereum/go-ethereum/sharding/mainchain" "github.com/ethereum/go-ethereum/sharding/p2p" "github.com/ethereum/go-ethereum/sharding/params" @@ -19,11 +19,11 @@ type Notary struct { config *params.Config smcClient *mainchain.SMCClient p2p *p2p.Server - shardChainDb ethdb.Database + shardChainDb *database.ShardDB } // NewNotary creates a new notary instance. -func NewNotary(config *params.Config, smcClient *mainchain.SMCClient, p2p *p2p.Server, shardChainDb ethdb.Database) (*Notary, error) { +func NewNotary(config *params.Config, smcClient *mainchain.SMCClient, p2p *p2p.Server, shardChainDb *database.ShardDB) (*Notary, error) { return &Notary{config, smcClient, p2p, shardChainDb}, nil } diff --git a/sharding/observer/service.go b/sharding/observer/service.go index 5671c796b2..f82702b89e 100644 --- a/sharding/observer/service.go +++ b/sharding/observer/service.go @@ -3,10 +3,13 @@ package observer import ( + "context" "fmt" + "math/big" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/sharding" "github.com/ethereum/go-ethereum/sharding/p2p" ) @@ -14,23 +17,30 @@ import ( // in a sharded system. Must satisfy the Service interface defined in // sharding/service.go. type Observer struct { - p2p *p2p.Server - shardChainDb ethdb.Database - shardID int + p2p *p2p.Server + shard *sharding.Shard + ctx context.Context + cancel context.CancelFunc } -// NewObserver creates a new observer instance. +// NewObserver creates a struct instance of a observer service, +// it will have access to a p2p server and a shardChainDb. func NewObserver(p2p *p2p.Server, shardChainDb ethdb.Database, shardID int) (*Observer, error) { - return &Observer{p2p, shardChainDb, shardID}, nil + ctx, cancel := context.WithCancel(context.Background()) + shard := sharding.NewShard(big.NewInt(int64(shardID)), shardChainDb) + return &Observer{p2p, shard, ctx, cancel}, nil } -// Start the main routine for an observer. +// Start the main loop for observer service. func (o *Observer) Start() { - log.Info(fmt.Sprintf("Starting observer service in shard %d", o.shardID)) + log.Info(fmt.Sprintf("Starting observer service")) } -// Stop the main loop for observing the shard network. +// Stop the main loop for observer service. func (o *Observer) Stop() error { - log.Info(fmt.Sprintf("Starting observer service in shard %d", o.shardID)) + // Triggers a cancel call in the service's context which shuts down every goroutine + // in this service. + defer o.cancel() + log.Info(fmt.Sprintf("Stopping observer service")) return nil } diff --git a/sharding/observer/service_test.go b/sharding/observer/service_test.go index 51edff1015..889145e62d 100644 --- a/sharding/observer/service_test.go +++ b/sharding/observer/service_test.go @@ -1,6 +1,46 @@ package observer -import "github.com/ethereum/go-ethereum/sharding" +import ( + "testing" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/sharding" + "github.com/ethereum/go-ethereum/sharding/database" + internal "github.com/ethereum/go-ethereum/sharding/internal" + "github.com/ethereum/go-ethereum/sharding/p2p" +) // Verifies that Observer implements the Actor interface. var _ = sharding.Actor(&Observer{}) + +func TestStartStop(t *testing.T) { + h := internal.NewLogHandler(t) + log.Root().SetHandler(h) + + server, err := p2p.NewServer() + if err != nil { + t.Fatalf("Unable to setup p2p server: %v", err) + } + shardChainDB := database.NewShardKV() + shardID := 0 + + observer, err := NewObserver(server, shardChainDB, shardID) + if err != nil { + t.Fatalf("Unable to set up observer service: %v", err) + } + + observer.Start() + + h.VerifyLogMsg("Starting observer service") + + err = observer.Stop() + if err != nil { + t.Fatalf("Unable to stop observer service: %v", err) + } + + h.VerifyLogMsg("Stopping observer service") + + if observer.ctx.Err() == nil { + t.Errorf("Context was not cancelled") + } +} diff --git a/sharding/proposer/service.go b/sharding/proposer/service.go index 754e4ab6c9..c550f4a2d7 100644 --- a/sharding/proposer/service.go +++ b/sharding/proposer/service.go @@ -4,14 +4,13 @@ package proposer import ( "context" - "crypto/rand" "fmt" "math/big" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/sharding/database" "github.com/ethereum/go-ethereum/sharding/mainchain" "github.com/ethereum/go-ethereum/sharding/p2p" "github.com/ethereum/go-ethereum/sharding/params" @@ -26,15 +25,28 @@ type Proposer struct { client *mainchain.SMCClient p2p *p2p.Server txpool *txpool.TXPool - shardChainDb ethdb.Database + txpoolSub event.Subscription + shardChainDb *database.ShardDB shardID int + ctx context.Context + cancel context.CancelFunc } // NewProposer creates a struct instance of a proposer service. // It will have access to a mainchain client, a p2p network, // and a shard transaction pool. -func NewProposer(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Server, txpool *txpool.TXPool, shardChainDb ethdb.Database, shardID int) (*Proposer, error) { - return &Proposer{config, client, p2p, txpool, shardChainDb, shardID}, nil +func NewProposer(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Server, txpool *txpool.TXPool, shardChainDb *database.ShardDB, shardID int) (*Proposer, error) { + ctx, cancel := context.WithCancel(context.Background()) + return &Proposer{ + config, + client, + p2p, + txpool, + nil, + shardChainDb, + shardID, + ctx, + cancel}, nil } // Start the main loop for proposing collations. @@ -46,42 +58,55 @@ func (p *Proposer) Start() { // Stop the main loop for proposing collations. func (p *Proposer) Stop() error { log.Info(fmt.Sprintf("Stopping proposer service in shard %d", p.shardID)) + defer p.cancel() + p.txpoolSub.Unsubscribe() return nil } +// proposeCollations listens to the transaction feed and submits collations over an interval. func (p *Proposer) proposeCollations() { + requests := make(chan *types.Transaction) + p.txpoolSub = p.txpool.TransactionsFeed().Subscribe(requests) - // TODO: Receive TXs from shard TX generator or TXpool (Github Issues 153 and 161) - var txs []*types.Transaction - for i := 0; i < 10; i++ { - data := make([]byte, 1024) - rand.Read(data) - txs = append(txs, types.NewTransaction(0, common.HexToAddress("0x0"), - nil, 0, nil, data)) + for { + select { + case tx := <-requests: + log.Info(fmt.Sprintf("Received transaction: %x", tx.Hash())) + if err := p.createCollation(p.ctx, []*types.Transaction{tx}); err != nil { + log.Error(fmt.Sprintf("Create collation failed: %v", err)) + } + case <-p.ctx.Done(): + log.Error("Proposer context closed, exiting goroutine") + return + case err := <-p.txpoolSub.Err(): + log.Error(fmt.Sprintf("Subscriber closed: %v", err)) + return + } } +} +func (p *Proposer) createCollation(ctx context.Context, txs []*types.Transaction) error { // Get current block number. - blockNumber, err := p.client.ChainReader().BlockByNumber(context.Background(), nil) + blockNumber, err := p.client.ChainReader().BlockByNumber(ctx, nil) if err != nil { - log.Error(fmt.Sprintf("Could not fetch current block number: %v", err)) - return + return err } period := new(big.Int).Div(blockNumber.Number(), big.NewInt(p.config.PeriodLength)) // Create collation. collation, err := createCollation(p.client, p.client.Account(), p.client, big.NewInt(int64(p.shardID)), period, txs) if err != nil { - log.Error(fmt.Sprintf("Could not create collation: %v", err)) - return + return err } // Check SMC if we can submit header before addHeader canAdd, err := checkHeaderAdded(p.client, big.NewInt(int64(p.shardID)), period) if err != nil { - log.Error(fmt.Sprintf("Could not check if we can submit header: %v", err)) - return + return err } if canAdd { addHeader(p.client, collation) } + + return nil } diff --git a/sharding/proposer/service_test.go b/sharding/proposer/service_test.go index c6155adfa2..02871885e6 100644 --- a/sharding/proposer/service_test.go +++ b/sharding/proposer/service_test.go @@ -1,11 +1,10 @@ package proposer import ( + "crypto/rand" "math/big" "testing" - "crypto/rand" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts/abi/bind" diff --git a/sharding/txpool/service.go b/sharding/txpool/service.go index dab3450265..3fdb146db0 100644 --- a/sharding/txpool/service.go +++ b/sharding/txpool/service.go @@ -2,27 +2,60 @@ package txpool import ( + "crypto/rand" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/sharding/p2p" ) // TXPool handles a transaction pool for a sharded system. type TXPool struct { - p2p *p2p.Server + p2p *p2p.Server + transactionsFeed *event.Feed + ticker *time.Ticker } // NewTXPool creates a new observer instance. func NewTXPool(p2p *p2p.Server) (*TXPool, error) { - return &TXPool{p2p}, nil + return &TXPool{p2p: p2p, transactionsFeed: new(event.Feed)}, nil } // Start the main routine for a shard transaction pool. func (p *TXPool) Start() { log.Info("Starting shard txpool service") + go p.sendTestTransaction() } // Stop the main loop for a transaction pool in the shard network. func (p *TXPool) Stop() error { log.Info("Stopping shard txpool service") + p.ticker.Stop() return nil } + +func (p *TXPool) TransactionsFeed() *event.Feed { + return p.transactionsFeed +} + +// sendTestTransaction sends a transaction with random bytes over a 5 second interval. +// This method is for testing purposes only, and will be replaced by a more functional CLI tool. +func (p *TXPool) sendTestTransaction() { + p.ticker = time.NewTicker(5 * time.Second) + + for range p.ticker.C { + tx := createTestTransaction() + nsent := p.transactionsFeed.Send(tx) + log.Info(fmt.Sprintf("Sent transaction %x to %d subscribers", tx.Hash(), nsent)) + } +} + +func createTestTransaction() *types.Transaction { + data := make([]byte, 1024) + rand.Read(data) + return types.NewTransaction(0, common.HexToAddress("0x0"), nil, 0, nil, data) +}