sharding: Test for Node Fetch and Register Service, 100% Simulator Coverage (#254)

Former-commit-id: 2fe6fdb393917f90c9a8a0c6dfb866738736803e [formerly 03d3519c59eb7f78e20ff08c081031d9b1f673bc]
Former-commit-id: a1cba8b7ec0517748d5df0bab458b2466f7b6329
This commit is contained in:
Raul Jordan
2018-07-13 11:47:57 -05:00
committed by GitHub
parent 0444ee81c4
commit 739f34f26c
21 changed files with 265 additions and 72 deletions

View File

@@ -30,6 +30,7 @@ go_test(
srcs = ["backend_test.go"],
embed = [":go_default_library"],
deps = [
"//sharding/params:go_default_library",
"//sharding/types:go_default_library",
"@com_github_urfave_cli//:go_default_library",
],

View File

@@ -260,14 +260,14 @@ func (s *ShardEthereum) registerActorService(config *params.Config, actor string
return err
}
if actor == "notary" {
switch actor {
case "notary":
not, err := notary.NewNotary(config, client, shardp2p, shardChainDB)
if err != nil {
return fmt.Errorf("could not register notary service: %v", err)
}
return s.registerService(not)
} else if actor == "proposer" {
case "proposer":
var pool *txpool.TXPool
if err := s.fetchService(&pool); err != nil {
return err
@@ -278,12 +278,19 @@ func (s *ShardEthereum) registerActorService(config *params.Config, actor string
return fmt.Errorf("could not register proposer service: %v", err)
}
return s.registerService(prop)
case "simulator":
sim, err := simulator.NewSimulator(config, client, shardp2p, shardID, 15) // 15 second delay between simulator requests.
if err != nil {
return fmt.Errorf("could not register simulator service: %v", err)
}
return s.registerService(sim)
default:
obs, err := observer.NewObserver(shardp2p, shardChainDB, shardID, sync, client)
if err != nil {
return fmt.Errorf("could not register observer service: %v", err)
}
return s.registerService(obs)
}
obs, err := observer.NewObserver(shardp2p, shardChainDB, shardID, sync, client)
if err != nil {
return fmt.Errorf("could not register observer service: %v", err)
}
return s.registerService(obs)
}
func (s *ShardEthereum) registerSimulatorService(actorFlag string, config *params.Config, shardID int) error {

View File

@@ -2,8 +2,10 @@ package node
import (
"flag"
"reflect"
"testing"
"github.com/prysmaticlabs/geth-sharding/sharding/params"
"github.com/prysmaticlabs/geth-sharding/sharding/types"
"github.com/urfave/cli"
@@ -12,6 +14,25 @@ import (
// Verifies that ShardEthereum implements the Node interface.
var _ = types.Node(&ShardEthereum{})
type mockService struct{}
type secondMockService struct{}
func (m *mockService) Start() {
return
}
func (m *mockService) Stop() error {
return nil
}
func (s *secondMockService) Start() {
return
}
func (s *secondMockService) Stop() error {
return nil
}
// Test that the sharding node can build with default flag values.
func TestNode_Builds(t *testing.T) {
app := cli.NewApp()
@@ -24,3 +45,92 @@ func TestNode_Builds(t *testing.T) {
t.Fatalf("Failed to create ShardEthereum: %v", err)
}
}
func TestRegisterServiceTwice(t *testing.T) {
shardEthereum := &ShardEthereum{
services: make(map[reflect.Type]types.Service),
stop: make(chan struct{}),
}
// Configure shardConfig by loading the default.
shardEthereum.shardConfig = params.DefaultConfig
m := &mockService{}
if err := shardEthereum.registerService(m); err != nil {
t.Fatalf("failed to register first service")
}
// checks if first service was indeed registered
if len(shardEthereum.serviceTypes) != 1 {
t.Fatalf("service types slice should contain 1 service, contained %v", len(shardEthereum.serviceTypes))
}
if err := shardEthereum.registerService(m); err == nil {
t.Errorf("should not be able to register a service twice, got nil error")
}
}
func TestRegisterDifferentServices(t *testing.T) {
shardEthereum := &ShardEthereum{
services: make(map[reflect.Type]types.Service),
stop: make(chan struct{}),
}
// Configure shardConfig by loading the default.
shardEthereum.shardConfig = params.DefaultConfig
m := &mockService{}
s := &secondMockService{}
if err := shardEthereum.registerService(m); err != nil {
t.Fatalf("failed to register first service")
}
if err := shardEthereum.registerService(s); err != nil {
t.Fatalf("failed to register second service")
}
if len(shardEthereum.serviceTypes) != 2 {
t.Errorf("service types slice should contain 2 services, contained %v", len(shardEthereum.serviceTypes))
}
if _, exists := shardEthereum.services[reflect.TypeOf(m)]; !exists {
t.Errorf("service of type %v not registered", reflect.TypeOf(m))
}
if _, exists := shardEthereum.services[reflect.TypeOf(s)]; !exists {
t.Errorf("service of type %v not registered", reflect.TypeOf(s))
}
}
func TestFetchService(t *testing.T) {
shardEthereum := &ShardEthereum{
services: make(map[reflect.Type]types.Service),
stop: make(chan struct{}),
}
// Configure shardConfig by loading the default.
shardEthereum.shardConfig = params.DefaultConfig
m := &mockService{}
if err := shardEthereum.registerService(m); err != nil {
t.Fatalf("failed to register first service")
}
if err := shardEthereum.fetchService(*m); err == nil {
t.Errorf("passing in a value should throw an error, received nil error")
}
var s *secondMockService
if err := shardEthereum.fetchService(&s); err == nil {
t.Errorf("fetching an unregistered service should return an error, got nil")
}
var m2 *mockService
if err := shardEthereum.fetchService(&m2); err != nil {
t.Fatalf("failed to fetch service")
}
if m2 != m {
t.Errorf("pointers were not equal, instead got %p, %p", m2, m)
}
}

View File

@@ -7,9 +7,9 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/crypto"
"github.com/prysmaticlabs/geth-sharding/sharding/types"
"github.com/prysmaticlabs/geth-sharding/sharding/internal"
shardparams "github.com/prysmaticlabs/geth-sharding/sharding/params"
"github.com/prysmaticlabs/geth-sharding/sharding/types"
)
var (

View File

@@ -5,5 +5,8 @@ go_library(
srcs = ["messages.go"],
importpath = "github.com/prysmaticlabs/geth-sharding/sharding/p2p/messages",
visibility = ["//sharding:__subpackages__"],
deps = ["@com_github_ethereum_go_ethereum//common:go_default_library"],
deps = [
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
],
)

View File

@@ -4,6 +4,7 @@ import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
// CollationBodyRequest defines a p2p message being sent over subscription feeds
@@ -22,3 +23,9 @@ type CollationBodyResponse struct {
HeaderHash *common.Hash
Body []byte
}
// TransactionBroadcast defines the p2p message broadcast from simulators
// to the rest of the actors for transactions to be included in collation.
type TransactionBroadcast struct {
Transaction *types.Transaction
}

View File

@@ -12,6 +12,7 @@ go_library(
"//sharding/database:go_default_library",
"//sharding/mainchain:go_default_library",
"//sharding/p2p:go_default_library",
"//sharding/p2p/messages:go_default_library",
"//sharding/params:go_default_library",
"//sharding/syncer:go_default_library",
"//sharding/txpool:go_default_library",

View File

@@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/geth-sharding/sharding/database"
"github.com/prysmaticlabs/geth-sharding/sharding/mainchain"
"github.com/prysmaticlabs/geth-sharding/sharding/p2p"
"github.com/prysmaticlabs/geth-sharding/sharding/p2p/messages"
"github.com/prysmaticlabs/geth-sharding/sharding/params"
"github.com/prysmaticlabs/geth-sharding/sharding/syncer"
"github.com/prysmaticlabs/geth-sharding/sharding/txpool"
@@ -73,14 +74,21 @@ func (p *Proposer) Stop() error {
// proposeCollations listens to the transaction feed and submits collations over an interval.
func (p *Proposer) proposeCollations() {
requests := make(chan *gethTypes.Transaction)
p.txpoolSub = p.txpool.TransactionsFeed().Subscribe(requests)
defer close(requests)
feed := p.p2p.Feed(messages.TransactionBroadcast{})
ch := make(chan p2p.Message, 20)
sub := feed.Subscribe(ch)
defer sub.Unsubscribe()
defer close(ch)
for {
select {
case tx := <-requests:
log.Infof("Received transaction: %x", tx.Hash())
if err := p.createCollation(p.ctx, []*gethTypes.Transaction{tx}); err != nil {
case msg := <-ch:
tx, ok := msg.Data.(messages.TransactionBroadcast)
if !ok {
log.Error("Received incorrect p2p message. Wanted a transaction broadcast message")
break
}
log.Infof("Received transaction: %x", tx.Transaction.Hash())
if err := p.createCollation(p.ctx, []*gethTypes.Transaction{tx.Transaction}); err != nil {
log.Errorf("Create collation failed: %v", err)
}
case <-p.ctx.Done():

View File

@@ -11,6 +11,8 @@ go_library(
"//sharding/p2p/messages:go_default_library",
"//sharding/params:go_default_library",
"//sharding/syncer:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_ethereum_go_ethereum//event:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],

View File

@@ -2,9 +2,12 @@ package simulator
import (
"context"
"crypto/rand"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/prysmaticlabs/geth-sharding/sharding/mainchain"
"github.com/prysmaticlabs/geth-sharding/sharding/p2p"
@@ -44,6 +47,8 @@ func (s *Simulator) Start() {
log.Info("Starting simulator service")
s.requestFeed = s.p2p.Feed(messages.CollationBodyRequest{})
go s.broadcastTransactions(time.Tick(time.Second*s.delay), s.ctx.Done())
go s.simulateNotaryRequests(s.client.SMCCaller(), s.client.ChainReader(), time.Tick(time.Second*s.delay), s.ctx.Done())
}
@@ -93,3 +98,28 @@ func (s *Simulator) simulateNotaryRequests(fetcher mainchain.RecordFetcher, read
}
}
}
// broadcastTransactions sends a transaction with random bytes over by a delay period,
// this method is for testing purposes only, and will be replaced by a more functional CLI tool.
func (s *Simulator) broadcastTransactions(delayChan <-chan time.Time, done <-chan struct{}) {
for {
select {
// Makes sure to close this goroutine when the service stops.
case <-done:
log.Debug("Simulator context closed, exiting goroutine")
return
case <-delayChan:
tx := createTestTx()
s.p2p.Broadcast(messages.TransactionBroadcast{Transaction: tx})
log.Info("Transaction broadcasted")
}
}
}
// createTestTx is a helper method to generate tx with random data bytes.
// it is used for broadcastTransactions.
func createTestTx() *types.Transaction {
data := make([]byte, 1024)
rand.Read(data)
return types.NewTransaction(0, common.HexToAddress("0x0"), nil, 0, nil, data)
}

View File

@@ -10,7 +10,7 @@ import (
"github.com/prysmaticlabs/geth-sharding/sharding/mainchain"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
@@ -94,11 +94,17 @@ func TestStartStop(t *testing.T) {
t.Fatalf("Unable to setup simulator service: %v", err)
}
simulator.Start()
msg := hook.LastEntry().Message
if msg != "Starting simulator service" {
t.Errorf("incorrect log, expected %s, got %s", "Starting simulator service", msg)
}
if err := simulator.Stop(); err != nil {
t.Fatalf("Unable to stop simulator service: %v", err)
}
msg := hook.LastEntry().Message
msg = hook.LastEntry().Message
if msg != "Stopping simulator service" {
t.Errorf("incorrect log, expected %s, got %s", "Stopping simulator service", msg)
}
@@ -229,3 +235,41 @@ func TestSimulateNotaryRequests(t *testing.T) {
exitRoutine <- true
hook.Reset()
}
// This test verifies actor simulator can successfully broadcast
// transactions to rest of the peers.
func TestBroadcastTransactions(t *testing.T) {
hook := logTest.NewGlobal()
shardID := 0
server, err := p2p.NewServer()
if err != nil {
t.Fatalf("Unable to setup p2p server: %v", err)
}
simulator, err := NewSimulator(params.DefaultConfig, &mainchain.SMCClient{}, server, shardID, 1)
if err != nil {
t.Fatalf("Unable to setup simulator service: %v", err)
}
delayChan := make(chan time.Time)
doneChan := make(chan struct{})
exitRoutine := make(chan bool)
go func() {
simulator.broadcastTransactions(delayChan, doneChan)
<-exitRoutine
}()
delayChan <- time.Time{}
doneChan <- struct{}{}
msg := hook.AllEntries()[0].Message
want := "Transaction broadcasted"
if msg != want {
t.Errorf("incorrect log, expected %s, got %s", want, msg)
}
exitRoutine <- true
hook.Reset()
}

View File

@@ -6,10 +6,10 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/prysmaticlabs/geth-sharding/sharding/types"
"github.com/prysmaticlabs/geth-sharding/sharding/mainchain"
"github.com/prysmaticlabs/geth-sharding/sharding/p2p"
"github.com/prysmaticlabs/geth-sharding/sharding/p2p/messages"
"github.com/prysmaticlabs/geth-sharding/sharding/types"
)
// RespondCollationBody is called by a node responding to another node's request

View File

@@ -7,8 +7,6 @@ go_library(
visibility = ["//sharding:__subpackages__"],
deps = [
"//sharding/p2p:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_ethereum_go_ethereum//event:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],

View File

@@ -2,11 +2,6 @@
package txpool
import (
"crypto/rand"
"time"
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/prysmaticlabs/geth-sharding/sharding/p2p"
log "github.com/sirupsen/logrus"
@@ -16,7 +11,6 @@ import (
type TXPool struct {
p2p *p2p.Server
transactionsFeed *event.Feed
ticker *time.Ticker
}
// NewTXPool creates a new observer instance.
@@ -27,34 +21,10 @@ func NewTXPool(p2p *p2p.Server) (*TXPool, error) {
// Start the main routine for a shard transaction pool.
func (p *TXPool) Start() {
log.Info("Starting shard txpool service")
go p.sendTestTransaction()
}
// Stop the main loop for a transaction pool in the shard network.
func (p *TXPool) Stop() error {
log.Info("Stopping shard txpool service")
p.ticker.Stop()
return nil
}
func (p *TXPool) TransactionsFeed() *event.Feed {
return p.transactionsFeed
}
// sendTestTransaction sends a transaction with random bytes over a 5 second interval.
// This method is for testing purposes only, and will be replaced by a more functional CLI tool.
func (p *TXPool) sendTestTransaction() {
p.ticker = time.NewTicker(5 * time.Second)
for range p.ticker.C {
tx := createTestTransaction()
nsent := p.transactionsFeed.Send(tx)
log.Infof("Sent transaction %x to %d subscribers", tx.Hash(), nsent)
}
}
func createTestTransaction() *gethTypes.Transaction {
data := make([]byte, 1024)
rand.Read(data)
return gethTypes.NewTransaction(0, common.HexToAddress("0x0"), nil, 0, nil, data)
}

View File

@@ -10,7 +10,7 @@ go_library(
importpath = "github.com/prysmaticlabs/geth-sharding/sharding/types",
visibility = ["//sharding:__subpackages__"],
deps = [
"//sharding/utils:go_default_library",
"//shared:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_ethereum_go_ethereum//crypto/sha3:go_default_library",
@@ -28,7 +28,7 @@ go_test(
embed = [":go_default_library"],
deps = [
"//sharding/database:go_default_library",
"//sharding/utils:go_default_library",
"//shared:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_ethereum_go_ethereum//crypto/sha3:go_default_library",

View File

@@ -9,7 +9,7 @@ import (
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/rlp"
"github.com/prysmaticlabs/geth-sharding/sharding/utils"
"github.com/prysmaticlabs/geth-sharding/shared"
)
// Collation defines a base struct that serves as a primitive equivalent of a "block"
@@ -142,11 +142,11 @@ func BytesToChunks(body []byte) Chunks {
}
// convertTxToRawBlob transactions into RawBlobs. This step encodes transactions uses RLP encoding
func convertTxToRawBlob(txs []*gethTypes.Transaction) ([]*utils.RawBlob, error) {
blobs := make([]*utils.RawBlob, len(txs))
func convertTxToRawBlob(txs []*gethTypes.Transaction) ([]*shared.RawBlob, error) {
blobs := make([]*shared.RawBlob, len(txs))
for i := 0; i < len(txs); i++ {
err := error(nil)
blobs[i], err = utils.NewRawBlob(txs[i], false)
blobs[i], err = shared.NewRawBlob(txs[i], false)
if err != nil {
return nil, err
}
@@ -161,7 +161,7 @@ func SerializeTxToBlob(txs []*gethTypes.Transaction) ([]byte, error) {
return nil, err
}
serializedTx, err := utils.Serialize(blobs)
serializedTx, err := shared.Serialize(blobs)
if err != nil {
return nil, err
}
@@ -174,13 +174,13 @@ func SerializeTxToBlob(txs []*gethTypes.Transaction) ([]byte, error) {
}
// convertRawBlobToTx converts raw blobs back to their original transactions.
func convertRawBlobToTx(rawBlobs []utils.RawBlob) ([]*gethTypes.Transaction, error) {
func convertRawBlobToTx(rawBlobs []shared.RawBlob) ([]*gethTypes.Transaction, error) {
blobs := make([]*gethTypes.Transaction, len(rawBlobs))
for i := 0; i < len(rawBlobs); i++ {
blobs[i] = gethTypes.NewTransaction(0, common.HexToAddress("0x"), nil, 0, nil, nil)
err := utils.ConvertFromRawBlob(&rawBlobs[i], blobs[i])
err := shared.ConvertFromRawBlob(&rawBlobs[i], blobs[i])
if err != nil {
return nil, fmt.Errorf("creation of transactions from raw blobs failed: %v", err)
}
@@ -191,7 +191,7 @@ func convertRawBlobToTx(rawBlobs []utils.RawBlob) ([]*gethTypes.Transaction, err
// DeserializeBlobToTx takes byte array blob and converts it back
// to original txs and returns the txs in tx array.
func DeserializeBlobToTx(serialisedBlob []byte) (*[]*gethTypes.Transaction, error) {
deserializedBlobs, err := utils.Deserialize(serialisedBlob)
deserializedBlobs, err := shared.Deserialize(serialisedBlob)
if err != nil {
return nil, err
}

View File

@@ -9,7 +9,7 @@ import (
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/prysmaticlabs/geth-sharding/sharding/utils"
"github.com/prysmaticlabs/geth-sharding/shared"
)
// fieldAccess is to access unexported fields in structs in another package
@@ -206,9 +206,9 @@ func runSerializeNoRLPBenchmark(b *testing.B, numTransactions int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := utils.Serialize(blobs)
_, err := shared.Serialize(blobs)
if err != nil {
b.Errorf("utils.Serialize failed: %v", err)
b.Errorf("shared.Serialize failed: %v", err)
}
}
}
@@ -242,9 +242,9 @@ func runDeserializeNoRLPBenchmark(b *testing.B, numTransactions int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := utils.Deserialize(blob)
_, err := shared.Deserialize(blob)
if err != nil {
b.Errorf("utils.Deserialize failed: %v", err)
b.Errorf("shared.Deserialize failed: %v", err)
}
}
}

View File

@@ -6,7 +6,6 @@ go_library(
"customflags.go",
"debug.go",
"flags.go",
"marshal.go",
"service.go",
],
importpath = "github.com/prysmaticlabs/geth-sharding/sharding/utils",
@@ -14,7 +13,6 @@ go_library(
deps = [
"//sharding/params:go_default_library",
"@com_github_ethereum_go_ethereum//node:go_default_library",
"@com_github_ethereum_go_ethereum//rlp:go_default_library",
"@com_github_fjl_memsize//memsizeui:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli//:go_default_library",
@@ -25,7 +23,6 @@ go_test(
name = "go_default_test",
srcs = [
"customflags_test.go",
"marshal_test.go",
"service_test.go",
],
embed = [":go_default_library"],

15
shared/BUILD.bazel Normal file
View File

@@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["marshal.go"],
importpath = "github.com/prysmaticlabs/geth-sharding/shared",
visibility = ["//visibility:public"],
deps = ["@com_github_ethereum_go_ethereum//rlp:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = ["marshal_test.go"],
embed = [":go_default_library"],
)

View File

@@ -1,6 +1,6 @@
// Package utils defines independent utilities helpful for a sharding-enabled,
// Package shared defines independent utilities helpful for a sharding-enabled,
// Ethereum blockchain such as blob serialization as more.
package utils
package shared
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package utils
package shared
import (
"math/rand"