Random transaction generator (#171)

sharding: random transaction generator
Former-commit-id: b41891c474372ab54cbf1e3fa34dc7399f42c6d4 [formerly 69b7da173694490fa44748fe3cb3b2aa84f36b46]
Former-commit-id: b642be5e1b7b4aad59011983bacda4a9b1166463
This commit is contained in:
Yutaro Mori
2018-06-20 12:59:02 +09:00
committed by terence tsao
parent ad291e93f6
commit 346b9ae8aa
11 changed files with 254 additions and 79 deletions

View File

@@ -126,7 +126,7 @@ After reading the Sharding FAQ, it is important to understand the minimal implem
## Necessary Go Knowledge & Readings
- [The Go Programming Language (Only Recommended Book)](https://www.amazon.com/Programming-Language-Addison-Wesley-Professional-Computing/dp/0134190440)
- [Ethereum Development with Go] (https://goethereumbook.org)
- [Ethereum Development with Go](https://goethereumbook.org)
- [How to Write Go Code](http://golang.org/doc/code.html)
- [The Go Programming Language Tour](http://tour.golang.org/)
- [Getting Started With Go](http://www.youtube.com/watch?v=2KmHtgtEZ1s)

View File

@@ -4,14 +4,53 @@
package database
import (
"fmt"
"path/filepath"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
)
// NewShardDB initializes a shardDB that writes to local disk.
func NewShardDB(dataDir string, name string) (ethdb.Database, error) {
type ShardDB struct {
dataDir string
name string
cache int
handles int
db *ethdb.LDBDatabase
}
// NewShardDB initializes a shardDB.
func NewShardDB(dataDir string, name string) (*ShardDB, error) {
// Uses default cache and handles values.
// TODO: allow these arguments to be set based on cli context.
return ethdb.NewLDBDatabase(filepath.Join(dataDir, name), 16, 16)
return &ShardDB{
dataDir: dataDir,
name: name,
cache: 16,
handles: 16,
db: nil,
}, nil
}
// Start the shard DB service.
func (s *ShardDB) Start() {
log.Info("Starting shardDB service")
db, err := ethdb.NewLDBDatabase(filepath.Join(s.dataDir, s.name), s.cache, s.handles)
if err != nil {
log.Error(fmt.Sprintf("Could not start shard DB: %v", err))
return
}
s.db = db
}
// Stop the shard DB service gracefully.
func (s *ShardDB) Stop() error {
log.Info("Stopping shardDB service")
s.db.Close()
return nil
}
// DB returns the attached ethdb instance.
func (s *ShardDB) DB() ethdb.Database {
return s.db
}

View File

@@ -4,24 +4,52 @@ import (
"strconv"
"testing"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding"
internal "github.com/ethereum/go-ethereum/sharding/internal"
)
var db ethdb.Database
// Verifies that ShardDB implements the sharding Service inteface.
var _ = sharding.Service(&ShardDB{})
var testDB *ShardDB
func init() {
shardDB, err := NewShardDB("/tmp/datadir", "shardchaindata")
shardDB, _ := NewShardDB("/tmp/datadir", "shardchaindata")
testDB = shardDB
testDB.Start()
}
func TestLifecycle(t *testing.T) {
h := internal.NewLogHandler(t)
log.Root().SetHandler(h)
s, err := NewShardDB("/tmp/datadir", "shardchaindb")
if err != nil {
panic(err)
t.Fatalf("Could not initialize a new sb: %v", err)
}
s.Start()
h.VerifyLogMsg("Starting shardDB service")
// ethdb.NewLDBDatabase logs the following
h.VerifyLogMsg("Allocated cache and file handles")
s.Stop()
h.VerifyLogMsg("Stopping shardDB service")
// Access DB after it's stopped, this should fail
_, err = s.db.Get([]byte("ralph merkle"))
if err.Error() != "leveldb: closed" {
t.Fatalf("shardDB close function did not work")
}
db = shardDB
}
// Testing the concurrency of the shardDB with multiple processes attempting to write.
func Test_DBConcurrent(t *testing.T) {
for i := 0; i < 100; i++ {
go func(val string) {
if err := db.Put([]byte("ralph merkle"), []byte(val)); err != nil {
if err := testDB.db.Put([]byte("ralph merkle"), []byte(val)); err != nil {
t.Errorf("could not save value in db: %v", err)
}
}(strconv.Itoa(i))
@@ -29,7 +57,7 @@ func Test_DBConcurrent(t *testing.T) {
}
func Test_DBPut(t *testing.T) {
if err := db.Put([]byte("ralph merkle"), []byte{1, 2, 3}); err != nil {
if err := testDB.db.Put([]byte("ralph merkle"), []byte{1, 2, 3}); err != nil {
t.Errorf("could not save value in db: %v", err)
}
}
@@ -37,11 +65,11 @@ func Test_DBPut(t *testing.T) {
func Test_DBHas(t *testing.T) {
key := []byte("ralph merkle")
if err := db.Put(key, []byte{1, 2, 3}); err != nil {
if err := testDB.db.Put(key, []byte{1, 2, 3}); err != nil {
t.Fatalf("could not save value in db: %v", err)
}
has, err := db.Has(key)
has, err := testDB.db.Has(key)
if err != nil {
t.Errorf("could not check if db has key: %v", err)
}
@@ -50,7 +78,7 @@ func Test_DBHas(t *testing.T) {
}
key2 := []byte{}
has2, err := db.Has(key2)
has2, err := testDB.db.Has(key2)
if err != nil {
t.Errorf("could not check if db has key: %v", err)
}
@@ -62,11 +90,11 @@ func Test_DBHas(t *testing.T) {
func Test_DBGet(t *testing.T) {
key := []byte("ralph merkle")
if err := db.Put(key, []byte{1, 2, 3}); err != nil {
if err := testDB.db.Put(key, []byte{1, 2, 3}); err != nil {
t.Fatalf("could not save value in db: %v", err)
}
val, err := db.Get(key)
val, err := testDB.db.Get(key)
if err != nil {
t.Errorf("get failed: %v", err)
}
@@ -75,7 +103,7 @@ func Test_DBGet(t *testing.T) {
}
key2 := []byte{}
val2, err := db.Get(key2)
val2, err := testDB.db.Get(key2)
if len(val2) != 0 {
t.Errorf("non-existent key should not have a value. key=%v, value=%v", key2, val2)
}
@@ -84,11 +112,11 @@ func Test_DBGet(t *testing.T) {
func Test_DBDelete(t *testing.T) {
key := []byte("ralph merkle")
if err := db.Put(key, []byte{1, 2, 3}); err != nil {
if err := testDB.db.Put(key, []byte{1, 2, 3}); err != nil {
t.Fatalf("could not save value in db: %v", err)
}
if err := db.Delete(key); err != nil {
if err := testDB.db.Delete(key); err != nil {
t.Errorf("could not delete key: %v", key)
}
}

View File

@@ -43,6 +43,6 @@ func (h *LogHandler) VerifyLogMsg(str string) {
h.t.Error("Expected a log, but there were none!")
}
if l := h.Pop(); l.Msg != str {
h.t.Errorf("Unexpected log: %v. Wanted: %s", l, str)
h.t.Errorf("Unexpected log: %v. Wanted: %s", l.Msg, str)
}
}

View File

@@ -13,7 +13,6 @@ import (
"syscall"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/debug"
"github.com/ethereum/go-ethereum/log"
@@ -36,11 +35,10 @@ const shardChainDbName = "shardchaindata"
// it contains APIs and fields that handle the different components of the sharded
// Ethereum network.
type ShardEthereum struct {
shardConfig *params.Config // Holds necessary information to configure shards.
txPool *txpool.TXPool // Defines the sharding-specific txpool. To be designed.
actor sharding.Actor // Either notary, proposer, or observer.
shardChainDb ethdb.Database // Access to the persistent db to store shard data.
eventFeed *event.Feed // Used to enable P2P related interactions via different sharding actors.
shardConfig *params.Config // Holds necessary information to configure shards.
txPool *txpool.TXPool // Defines the sharding-specific txpool. To be designed.
actor sharding.Actor // Either notary, proposer, or observer.
eventFeed *event.Feed // Used to enable P2P related interactions via different sharding actors.
// Lifecycle and service stores.
services map[reflect.Type]sharding.Service // Service registry.
@@ -56,22 +54,12 @@ func New(ctx *cli.Context) (*ShardEthereum, error) {
stop: make(chan struct{}),
}
path := node.DefaultDataDir()
if ctx.GlobalIsSet(utils.DataDirFlag.Name) {
path = ctx.GlobalString(utils.DataDirFlag.Name)
}
shardChainDb, err := database.NewShardDB(path, shardChainDbName)
if err != nil {
return nil, err
}
// Configure shardConfig by loading the default.
shardEthereum.shardConfig = params.DefaultConfig
// Adds the initialized shardChainDb to the ShardEthereum instance.
// TODO: Move out of here!
shardEthereum.shardChainDb = shardChainDb
if err := shardEthereum.registerShardChainDB(ctx); err != nil {
return nil, err
}
if err := shardEthereum.registerP2P(); err != nil {
return nil, err
@@ -175,6 +163,17 @@ func (s *ShardEthereum) Register(constructor sharding.ServiceConstructor) error
return nil
}
// registerShardChainDB attaches a LevelDB wrapped object to the shardEthereum instance.
func (s *ShardEthereum) registerShardChainDB(ctx *cli.Context) error {
path := node.DefaultDataDir()
if ctx.GlobalIsSet(utils.DataDirFlag.Name) {
path = ctx.GlobalString(utils.DataDirFlag.Name)
}
return s.Register(func(ctx *sharding.ServiceContext) (sharding.Service, error) {
return database.NewShardDB(path, shardChainDbName)
})
}
// registerP2P attaches a p2p server to the ShardEthereum instance.
// TODO: Design this p2p service and the methods it should expose as well as
// its event loop.
@@ -230,14 +229,16 @@ func (s *ShardEthereum) registerActorService(config *params.Config, actor string
ctx.RetrieveService(&p2p)
var smcClient *mainchain.SMCClient
ctx.RetrieveService(&smcClient)
var shardChainDB *database.ShardDB
ctx.RetrieveService(&shardChainDB)
if actor == "notary" {
return notary.NewNotary(config, smcClient, p2p, s.shardChainDb)
return notary.NewNotary(config, smcClient, p2p, shardChainDB)
} else if actor == "proposer" {
var txPool *txpool.TXPool
ctx.RetrieveService(&txPool)
return proposer.NewProposer(config, smcClient, p2p, txPool, s.shardChainDb, shardID)
return proposer.NewProposer(config, smcClient, p2p, txPool, shardChainDB, shardID)
}
return observer.NewObserver(p2p, s.shardChainDb, shardID)
return observer.NewObserver(p2p, shardChainDB.DB(), shardID)
})
}

View File

@@ -5,8 +5,8 @@ package notary
import (
"fmt"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding/database"
"github.com/ethereum/go-ethereum/sharding/mainchain"
"github.com/ethereum/go-ethereum/sharding/p2p"
"github.com/ethereum/go-ethereum/sharding/params"
@@ -19,11 +19,11 @@ type Notary struct {
config *params.Config
smcClient *mainchain.SMCClient
p2p *p2p.Server
shardChainDb ethdb.Database
shardChainDb *database.ShardDB
}
// NewNotary creates a new notary instance.
func NewNotary(config *params.Config, smcClient *mainchain.SMCClient, p2p *p2p.Server, shardChainDb ethdb.Database) (*Notary, error) {
func NewNotary(config *params.Config, smcClient *mainchain.SMCClient, p2p *p2p.Server, shardChainDb *database.ShardDB) (*Notary, error) {
return &Notary{config, smcClient, p2p, shardChainDb}, nil
}

View File

@@ -3,10 +3,13 @@
package observer
import (
"context"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding"
"github.com/ethereum/go-ethereum/sharding/p2p"
)
@@ -14,23 +17,30 @@ import (
// in a sharded system. Must satisfy the Service interface defined in
// sharding/service.go.
type Observer struct {
p2p *p2p.Server
shardChainDb ethdb.Database
shardID int
p2p *p2p.Server
shard *sharding.Shard
ctx context.Context
cancel context.CancelFunc
}
// NewObserver creates a new observer instance.
// NewObserver creates a struct instance of a observer service,
// it will have access to a p2p server and a shardChainDb.
func NewObserver(p2p *p2p.Server, shardChainDb ethdb.Database, shardID int) (*Observer, error) {
return &Observer{p2p, shardChainDb, shardID}, nil
ctx, cancel := context.WithCancel(context.Background())
shard := sharding.NewShard(big.NewInt(int64(shardID)), shardChainDb)
return &Observer{p2p, shard, ctx, cancel}, nil
}
// Start the main routine for an observer.
// Start the main loop for observer service.
func (o *Observer) Start() {
log.Info(fmt.Sprintf("Starting observer service in shard %d", o.shardID))
log.Info(fmt.Sprintf("Starting observer service"))
}
// Stop the main loop for observing the shard network.
// Stop the main loop for observer service.
func (o *Observer) Stop() error {
log.Info(fmt.Sprintf("Starting observer service in shard %d", o.shardID))
// Triggers a cancel call in the service's context which shuts down every goroutine
// in this service.
defer o.cancel()
log.Info(fmt.Sprintf("Stopping observer service"))
return nil
}

View File

@@ -1,6 +1,46 @@
package observer
import "github.com/ethereum/go-ethereum/sharding"
import (
"testing"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding"
"github.com/ethereum/go-ethereum/sharding/database"
internal "github.com/ethereum/go-ethereum/sharding/internal"
"github.com/ethereum/go-ethereum/sharding/p2p"
)
// Verifies that Observer implements the Actor interface.
var _ = sharding.Actor(&Observer{})
func TestStartStop(t *testing.T) {
h := internal.NewLogHandler(t)
log.Root().SetHandler(h)
server, err := p2p.NewServer()
if err != nil {
t.Fatalf("Unable to setup p2p server: %v", err)
}
shardChainDB := database.NewShardKV()
shardID := 0
observer, err := NewObserver(server, shardChainDB, shardID)
if err != nil {
t.Fatalf("Unable to set up observer service: %v", err)
}
observer.Start()
h.VerifyLogMsg("Starting observer service")
err = observer.Stop()
if err != nil {
t.Fatalf("Unable to stop observer service: %v", err)
}
h.VerifyLogMsg("Stopping observer service")
if observer.ctx.Err() == nil {
t.Errorf("Context was not cancelled")
}
}

View File

@@ -4,14 +4,13 @@ package proposer
import (
"context"
"crypto/rand"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding/database"
"github.com/ethereum/go-ethereum/sharding/mainchain"
"github.com/ethereum/go-ethereum/sharding/p2p"
"github.com/ethereum/go-ethereum/sharding/params"
@@ -26,15 +25,28 @@ type Proposer struct {
client *mainchain.SMCClient
p2p *p2p.Server
txpool *txpool.TXPool
shardChainDb ethdb.Database
txpoolSub event.Subscription
shardChainDb *database.ShardDB
shardID int
ctx context.Context
cancel context.CancelFunc
}
// NewProposer creates a struct instance of a proposer service.
// It will have access to a mainchain client, a p2p network,
// and a shard transaction pool.
func NewProposer(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Server, txpool *txpool.TXPool, shardChainDb ethdb.Database, shardID int) (*Proposer, error) {
return &Proposer{config, client, p2p, txpool, shardChainDb, shardID}, nil
func NewProposer(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Server, txpool *txpool.TXPool, shardChainDb *database.ShardDB, shardID int) (*Proposer, error) {
ctx, cancel := context.WithCancel(context.Background())
return &Proposer{
config,
client,
p2p,
txpool,
nil,
shardChainDb,
shardID,
ctx,
cancel}, nil
}
// Start the main loop for proposing collations.
@@ -46,42 +58,55 @@ func (p *Proposer) Start() {
// Stop the main loop for proposing collations.
func (p *Proposer) Stop() error {
log.Info(fmt.Sprintf("Stopping proposer service in shard %d", p.shardID))
defer p.cancel()
p.txpoolSub.Unsubscribe()
return nil
}
// proposeCollations listens to the transaction feed and submits collations over an interval.
func (p *Proposer) proposeCollations() {
requests := make(chan *types.Transaction)
p.txpoolSub = p.txpool.TransactionsFeed().Subscribe(requests)
// TODO: Receive TXs from shard TX generator or TXpool (Github Issues 153 and 161)
var txs []*types.Transaction
for i := 0; i < 10; i++ {
data := make([]byte, 1024)
rand.Read(data)
txs = append(txs, types.NewTransaction(0, common.HexToAddress("0x0"),
nil, 0, nil, data))
for {
select {
case tx := <-requests:
log.Info(fmt.Sprintf("Received transaction: %x", tx.Hash()))
if err := p.createCollation(p.ctx, []*types.Transaction{tx}); err != nil {
log.Error(fmt.Sprintf("Create collation failed: %v", err))
}
case <-p.ctx.Done():
log.Error("Proposer context closed, exiting goroutine")
return
case err := <-p.txpoolSub.Err():
log.Error(fmt.Sprintf("Subscriber closed: %v", err))
return
}
}
}
func (p *Proposer) createCollation(ctx context.Context, txs []*types.Transaction) error {
// Get current block number.
blockNumber, err := p.client.ChainReader().BlockByNumber(context.Background(), nil)
blockNumber, err := p.client.ChainReader().BlockByNumber(ctx, nil)
if err != nil {
log.Error(fmt.Sprintf("Could not fetch current block number: %v", err))
return
return err
}
period := new(big.Int).Div(blockNumber.Number(), big.NewInt(p.config.PeriodLength))
// Create collation.
collation, err := createCollation(p.client, p.client.Account(), p.client, big.NewInt(int64(p.shardID)), period, txs)
if err != nil {
log.Error(fmt.Sprintf("Could not create collation: %v", err))
return
return err
}
// Check SMC if we can submit header before addHeader
canAdd, err := checkHeaderAdded(p.client, big.NewInt(int64(p.shardID)), period)
if err != nil {
log.Error(fmt.Sprintf("Could not check if we can submit header: %v", err))
return
return err
}
if canAdd {
addHeader(p.client, collation)
}
return nil
}

View File

@@ -1,11 +1,10 @@
package proposer
import (
"crypto/rand"
"math/big"
"testing"
"crypto/rand"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"

View File

@@ -2,27 +2,60 @@
package txpool
import (
"crypto/rand"
"fmt"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding/p2p"
)
// TXPool handles a transaction pool for a sharded system.
type TXPool struct {
p2p *p2p.Server
p2p *p2p.Server
transactionsFeed *event.Feed
ticker *time.Ticker
}
// NewTXPool creates a new observer instance.
func NewTXPool(p2p *p2p.Server) (*TXPool, error) {
return &TXPool{p2p}, nil
return &TXPool{p2p: p2p, transactionsFeed: new(event.Feed)}, nil
}
// Start the main routine for a shard transaction pool.
func (p *TXPool) Start() {
log.Info("Starting shard txpool service")
go p.sendTestTransaction()
}
// Stop the main loop for a transaction pool in the shard network.
func (p *TXPool) Stop() error {
log.Info("Stopping shard txpool service")
p.ticker.Stop()
return nil
}
func (p *TXPool) TransactionsFeed() *event.Feed {
return p.transactionsFeed
}
// sendTestTransaction sends a transaction with random bytes over a 5 second interval.
// This method is for testing purposes only, and will be replaced by a more functional CLI tool.
func (p *TXPool) sendTestTransaction() {
p.ticker = time.NewTicker(5 * time.Second)
for range p.ticker.C {
tx := createTestTransaction()
nsent := p.transactionsFeed.Send(tx)
log.Info(fmt.Sprintf("Sent transaction %x to %d subscribers", tx.Hash(), nsent))
}
}
func createTestTransaction() *types.Transaction {
data := make([]byte, 1024)
rand.Read(data)
return types.NewTransaction(0, common.HexToAddress("0x0"), nil, 0, nil, data)
}