Handle and Simulate Collation Body Requests/Responses (#172)

sharding: create a syncer and a simulator package

Former-commit-id: b392885510ba5a96e61278cbbe2c0ec6f9722ee8 [formerly 3a435eaf6805d02beae55656f155b2c3a66ee663]
Former-commit-id: 0f6f3f2053ae77711e2072848b727b0dc9b92276
This commit is contained in:
Raul Jordan
2018-06-20 22:03:02 -05:00
committed by GitHub
parent 1eb7451266
commit 1ddb19bba6
20 changed files with 1170 additions and 54 deletions

View File

@@ -75,7 +75,7 @@ func (h *CollationHeader) AddSig(sig []byte) {
h.data.ProposerSignature = sig
}
// Signature of the collation corresponds to.
// Sig is the signature the collation corresponds to.
func (h *CollationHeader) Sig() []byte { return h.data.ProposerSignature }
// ShardID the collation corresponds to.

View File

@@ -3,6 +3,8 @@ package sharding
import (
"fmt"
"reflect"
"github.com/ethereum/go-ethereum/common"
)
// Node defines a a sharding-enabled Ethereum instance that provides
@@ -20,6 +22,12 @@ type Actor interface {
// TODO: will actors have actor-specific methods? To be decided.
}
// CollationFetcher defines functionality for a struct that is able to extract
// respond with collation information to the caller. Shard implements this interface.
type CollationFetcher interface {
CollationByHeaderHash(headerHash *common.Hash) (*Collation, error)
}
// ServiceContext is a collection of service independent options inherited from
// the protocol stack, that is passed to all constructors to be optionally used;
// as well as utility methods to operate on the service environment.

View File

@@ -1,3 +1,3 @@
# Sharding internal package
# Sharding Internal Package
This package should be used for test helpers and non-production code only!

View File

@@ -55,3 +55,13 @@ type Reader interface {
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
}
// RecordFetcher serves as an interface for a struct that can fetch collation information
// from a sharding manager contract on the Ethereum mainchain.
type RecordFetcher interface {
CollationRecords(opts *bind.CallOpts, arg0 *big.Int, arg1 *big.Int) (struct {
ChunkRoot [32]byte
Proposer common.Address
IsElected bool
}, error)
}

View File

@@ -138,16 +138,25 @@ func (s *SMCClient) ChainReader() ethereum.ChainReader {
// SMCCaller to interact with the sharding manager contract.
func (s *SMCClient) SMCCaller() *contracts.SMCCaller {
if s.smc == nil {
return nil
}
return &s.smc.SMCCaller
}
// SMCTransactor allows us to send tx's to the SMC programmatically.
func (s *SMCClient) SMCTransactor() *contracts.SMCTransactor {
if s.smc == nil {
return nil
}
return &s.smc.SMCTransactor
}
// SMCFilterer allows for easy filtering of events from the Sharding Manager Contract.
func (s *SMCClient) SMCFilterer() *contracts.SMCFilterer {
if s.smc == nil {
return nil
}
return &s.smc.SMCFilterer
}

View File

@@ -25,6 +25,8 @@ import (
"github.com/ethereum/go-ethereum/sharding/p2p"
"github.com/ethereum/go-ethereum/sharding/params"
"github.com/ethereum/go-ethereum/sharding/proposer"
"github.com/ethereum/go-ethereum/sharding/simulator"
"github.com/ethereum/go-ethereum/sharding/syncer"
"github.com/ethereum/go-ethereum/sharding/txpool"
"gopkg.in/urfave/cli.v1"
)
@@ -79,6 +81,18 @@ func New(ctx *cli.Context) (*ShardEthereum, error) {
return nil, err
}
// Should not trigger simulation requests if actor is a notary, as this
// is supposed to "simulate" notaries sending requests via p2p.
if actorFlag != "notary" {
if err := shardEthereum.registerSimulatorService(shardEthereum.shardConfig, shardIDFlag); err != nil {
return nil, err
}
}
if err := shardEthereum.registerSyncerService(shardEthereum.shardConfig, shardIDFlag); err != nil {
return nil, err
}
return shardEthereum, nil
}
@@ -237,8 +251,30 @@ func (s *ShardEthereum) registerActorService(config *params.Config, actor string
} else if actor == "proposer" {
var txPool *txpool.TXPool
ctx.RetrieveService(&txPool)
return proposer.NewProposer(config, smcClient, p2p, txPool, shardChainDB, shardID)
return proposer.NewProposer(config, smcClient, p2p, txPool, shardChainDB.DB(), shardID)
}
return observer.NewObserver(p2p, shardChainDB.DB(), shardID)
})
}
func (s *ShardEthereum) registerSimulatorService(config *params.Config, shardID int) error {
return s.Register(func(ctx *sharding.ServiceContext) (sharding.Service, error) {
var p2p *p2p.Server
ctx.RetrieveService(&p2p)
var smcClient *mainchain.SMCClient
ctx.RetrieveService(&smcClient)
return simulator.NewSimulator(config, smcClient, p2p, shardID, 15) // 15 second delay between simulator requests.
})
}
func (s *ShardEthereum) registerSyncerService(config *params.Config, shardID int) error {
return s.Register(func(ctx *sharding.ServiceContext) (sharding.Service, error) {
var p2p *p2p.Server
ctx.RetrieveService(&p2p)
var smcClient *mainchain.SMCClient
ctx.RetrieveService(&smcClient)
var shardChainDB *database.ShardDB
ctx.RetrieveService(&shardChainDB)
return syncer.NewSyncer(config, smcClient, p2p, shardChainDB.DB(), shardID)
})
}

View File

@@ -25,9 +25,9 @@ type Observer struct {
// NewObserver creates a struct instance of a observer service,
// it will have access to a p2p server and a shardChainDb.
func NewObserver(p2p *p2p.Server, shardChainDb ethdb.Database, shardID int) (*Observer, error) {
func NewObserver(p2p *p2p.Server, shardChainDB ethdb.Database, shardID int) (*Observer, error) {
ctx, cancel := context.WithCancel(context.Background())
shard := sharding.NewShard(big.NewInt(int64(shardID)), shardChainDb)
shard := sharding.NewShard(big.NewInt(int64(shardID)), shardChainDB)
return &Observer{p2p, shard, ctx, cancel}, nil
}

View File

@@ -24,10 +24,10 @@ import (
// // Wait until my message comes from a peer.
// msg := <- ch
// fmt.Printf("Message received: %v", msg.Data)
func (s *Server) Feed(msg interface{}) (*event.Feed, error) {
func (s *Server) Feed(msg interface{}) *event.Feed {
t := reflect.TypeOf(msg)
if s.feeds[t] == nil {
s.feeds[t] = new(event.Feed)
}
return s.feeds[t], nil
return s.feeds[t]
}

View File

@@ -15,11 +15,7 @@ func ExampleServer_Feed() {
Answer string
}
feed, err := s.Feed(Puzzle{})
if err != nil {
// This shouldn't happen, but we should handle it anyway.
panic(err)
}
feed := s.Feed(Puzzle{})
ch := make(chan Message, 5) // Small buffer size. I don't expect many puzzles.
sub := feed.Subscribe(ch)

View File

@@ -23,8 +23,8 @@ func TestFeed_ReturnsSameFeed(t *testing.T) {
s, _ := NewServer()
for _, tt := range tests {
feed1, _ := s.Feed(tt.a)
feed2, _ := s.Feed(tt.b)
feed1 := s.Feed(tt.a)
feed2 := s.Feed(tt.b)
if (feed1 == feed2) != tt.want {
t.Errorf("Expected %v == %v to be %t", feed1, feed2, tt.want)

View File

@@ -0,0 +1,23 @@
package messages
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
)
// CollationBodyRequest defines a p2p message being sent over subscription feeds
// by notaries to other notaries or to proposers.
type CollationBodyRequest struct {
ChunkRoot *common.Hash
ShardID *big.Int
Period *big.Int
Proposer *common.Address
}
// CollationBodyResponse defines the p2p message response sent back
// to the requesting peer.
type CollationBodyResponse struct {
HeaderHash *common.Hash
Body []byte
}

View File

@@ -12,6 +12,30 @@ import (
"github.com/ethereum/go-ethereum/sharding/mainchain"
)
// AddHeader adds the collation header to the main chain by sending
// an addHeader transaction to the sharding manager contract.
// There can only exist one header per period per shard, it is the proposer's
// responsibility to check if a header has been added.
func AddHeader(transactor mainchain.ContractTransactor, collation *sharding.Collation) error {
log.Info("Adding header to SMC")
txOps, err := transactor.CreateTXOpts(big.NewInt(0))
if err != nil {
return fmt.Errorf("unable to initiate add header transaction: %v", err)
}
// TODO: Copy is inefficient here. Let's research how to best convert hash to [32]byte.
var chunkRoot [32]byte
copy(chunkRoot[:], collation.Header().ChunkRoot().Bytes())
tx, err := transactor.SMCTransactor().AddHeader(txOps, collation.Header().ShardID(), collation.Header().Period(), chunkRoot)
if err != nil {
return fmt.Errorf("unable to add header to SMC: %v", err)
}
log.Info(fmt.Sprintf("Add header transaction hash: %v", tx.Hash().Hex()))
return nil
}
// createCollation creates collation base struct with header
// and body. Header consists of shardID, ChunkRoot, period,
// proposer addr and signatures. Body contains serialized blob
@@ -55,34 +79,10 @@ func createCollation(caller mainchain.ContractCaller, account *accounts.Account,
return collation, nil
}
// addHeader adds the collation header to the main chain by sending
// an addHeader transaction to the sharding manager contract.
// There can only exist one header per period per shard, it's proposer's
// responsibility to check if a header has been added.
func addHeader(transactor mainchain.ContractTransactor, collation *sharding.Collation) error {
log.Info("Adding header to SMC")
txOps, err := transactor.CreateTXOpts(big.NewInt(0))
if err != nil {
return fmt.Errorf("unable to initiate add header transaction: %v", err)
}
// TODO: Copy is inefficient here. Let's research how to best convert hash to [32]byte.
var chunkRoot [32]byte
copy(chunkRoot[:], collation.Header().ChunkRoot().Bytes())
tx, err := transactor.SMCTransactor().AddHeader(txOps, collation.Header().ShardID(), collation.Header().Period(), chunkRoot)
if err != nil {
return fmt.Errorf("unable to add header to SMC: %v", err)
}
log.Info(fmt.Sprintf("Add header transaction hash: %v", tx.Hash().Hex()))
return nil
}
// checkHeaderAdded checks if a collation header has already
// submitted to the main chain. There can only be one header per shard
// per period, proposer should check if a header's already submitted,
// checkHeaderAdded returns true if it's available, false if it's unavailable.
// checkHeaderAdded returns true if it is available, false if it is unavailable.
func checkHeaderAdded(caller mainchain.ContractCaller, shardID *big.Int, period *big.Int) (bool, error) {
// Get the period of the last header.
lastPeriod, err := caller.SMCCaller().LastSubmittedCollation(&bind.CallOpts{}, shardID)

View File

@@ -8,9 +8,10 @@ import (
"math/big"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding/database"
"github.com/ethereum/go-ethereum/sharding"
"github.com/ethereum/go-ethereum/sharding/mainchain"
"github.com/ethereum/go-ethereum/sharding/p2p"
"github.com/ethereum/go-ethereum/sharding/params"
@@ -26,8 +27,8 @@ type Proposer struct {
p2p *p2p.Server
txpool *txpool.TXPool
txpoolSub event.Subscription
shardChainDb *database.ShardDB
shardID int
shardChainDb ethdb.Database
shard *sharding.Shard
ctx context.Context
cancel context.CancelFunc
}
@@ -35,29 +36,30 @@ type Proposer struct {
// NewProposer creates a struct instance of a proposer service.
// It will have access to a mainchain client, a p2p network,
// and a shard transaction pool.
func NewProposer(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Server, txpool *txpool.TXPool, shardChainDb *database.ShardDB, shardID int) (*Proposer, error) {
func NewProposer(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Server, txpool *txpool.TXPool, shardChainDB ethdb.Database, shardID int) (*Proposer, error) {
ctx, cancel := context.WithCancel(context.Background())
shard := sharding.NewShard(big.NewInt(int64(shardID)), shardChainDB)
return &Proposer{
config,
client,
p2p,
txpool,
nil,
shardChainDb,
shardID,
shardChainDB,
shard,
ctx,
cancel}, nil
}
// Start the main loop for proposing collations.
func (p *Proposer) Start() {
log.Info(fmt.Sprintf("Starting proposer service in shard %d", p.shardID))
log.Info("Starting proposer service")
go p.proposeCollations()
}
// Stop the main loop for proposing collations.
func (p *Proposer) Stop() error {
log.Info(fmt.Sprintf("Stopping proposer service in shard %d", p.shardID))
log.Info(fmt.Sprintf("Stopping proposer service in shard %d", p.shard.ShardID()))
defer p.cancel()
p.txpoolSub.Unsubscribe()
return nil
@@ -67,7 +69,7 @@ func (p *Proposer) Stop() error {
func (p *Proposer) proposeCollations() {
requests := make(chan *types.Transaction)
p.txpoolSub = p.txpool.TransactionsFeed().Subscribe(requests)
defer close(requests)
for {
select {
case tx := <-requests:
@@ -94,18 +96,26 @@ func (p *Proposer) createCollation(ctx context.Context, txs []*types.Transaction
period := new(big.Int).Div(blockNumber.Number(), big.NewInt(p.config.PeriodLength))
// Create collation.
collation, err := createCollation(p.client, p.client.Account(), p.client, big.NewInt(int64(p.shardID)), period, txs)
collation, err := createCollation(p.client, p.client.Account(), p.client, p.shard.ShardID(), period, txs)
if err != nil {
return err
}
// Check SMC if we can submit header before addHeader
canAdd, err := checkHeaderAdded(p.client, big.NewInt(int64(p.shardID)), period)
// Saves the collation to persistent storage in the shardDB.
if err := p.shard.SaveCollation(collation); err != nil {
log.Error(fmt.Sprintf("Could not save collation to persistent storage: %v", err))
return nil
}
log.Info(fmt.Sprintf("Saved collation with header hash %v to shardChainDb", collation.Header().Hash().Hex()))
// Check SMC if we can submit header before addHeader.
canAdd, err := checkHeaderAdded(p.client, p.shard.ShardID(), period)
if err != nil {
return err
}
if canAdd {
addHeader(p.client, collation)
AddHeader(p.client, collation)
}
return nil

View File

@@ -169,7 +169,7 @@ func TestAddCollation(t *testing.T) {
}
// normal test case #1 create collation with normal parameters.
err = addHeader(node, collation)
err = AddHeader(node, collation)
if err != nil {
t.Errorf("%v", err)
}
@@ -215,7 +215,7 @@ func TestCheckCollation(t *testing.T) {
backend.Commit()
}
err = addHeader(node, collation)
err = AddHeader(node, collation)
if err != nil {
t.Errorf("%v", err)
}

View File

@@ -0,0 +1,114 @@
package simulator
import (
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding/mainchain"
"github.com/ethereum/go-ethereum/sharding/p2p"
"github.com/ethereum/go-ethereum/sharding/p2p/messages"
"github.com/ethereum/go-ethereum/sharding/params"
"github.com/ethereum/go-ethereum/sharding/syncer"
)
// Simulator is a service in a shard node
// that simulates requests from remote notes coming over
// the shardp2p network. For example, if we are running a
// proposer service, we would want to simulate notary requests
// coming to us via a p2p feed. This service will be removed once
// p2p internals and end-to-end testing across remote nodes have been
// implemented.
type Simulator struct {
config *params.Config
client *mainchain.SMCClient
p2p *p2p.Server
shardID int
ctx context.Context
cancel context.CancelFunc
errChan chan error // Useful channel for handling errors at the service layer.
delay time.Duration // The delay (in seconds) between simulator requests sent via p2p.
requestSent chan int // Useful channel for handling outgoing requests from the service.
}
// NewSimulator creates a struct instance of a simulator service.
// It will have access to config, a mainchain client, a p2p server,
// and a shardID.
func NewSimulator(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Server, shardID int, delay time.Duration) (*Simulator, error) {
ctx, cancel := context.WithCancel(context.Background())
errChan := make(chan error)
requestSent := make(chan int)
return &Simulator{config, client, p2p, shardID, ctx, cancel, errChan, delay, requestSent}, nil
}
// Start the main loop for simulating p2p requests.
func (s *Simulator) Start() {
log.Info("Starting simulator service")
feed := s.p2p.Feed(messages.CollationBodyRequest{})
go s.simulateNotaryRequests(s.client.SMCCaller(), s.client.ChainReader(), feed)
go s.handleServiceErrors()
}
// Stop the main loop for simulator requests.
func (s *Simulator) Stop() error {
// Triggers a cancel call in the service's context which shuts down every goroutine
// in this service.
defer s.cancel()
log.Warn("Stopping simulator service")
return nil
}
// handleServiceErrors manages a goroutine that listens for errors broadcast to
// this service's error channel. This serves as a final step for error logging
// and is stopped upon the service shutting down.
func (s *Simulator) handleServiceErrors() {
for {
select {
case <-s.ctx.Done():
return
case err := <-s.errChan:
log.Error(fmt.Sprintf("Simulator service error: %v", err))
}
}
}
// simulateNotaryRequests simulates p2p message sent out by notaries
// once the system is in production. Notaries will be performing
// this action within their own service when they are selected on a shard, period
// pair to perform their responsibilities. This function in particular simulates
// requests for collation bodies that will be relayed to the appropriate proposer
// by the p2p feed layer.
func (s *Simulator) simulateNotaryRequests(fetcher mainchain.RecordFetcher, reader mainchain.Reader, feed *event.Feed) {
for {
select {
// Makes sure to close this goroutine when the service stops.
case <-s.ctx.Done():
return
case <-time.After(time.Second * s.delay):
blockNumber, err := reader.BlockByNumber(s.ctx, nil)
if err != nil {
s.errChan <- fmt.Errorf("could not fetch current block number: %v", err)
continue
}
period := new(big.Int).Div(blockNumber.Number(), big.NewInt(s.config.PeriodLength))
req, err := syncer.RequestCollationBody(fetcher, big.NewInt(int64(s.shardID)), period)
if err != nil {
s.errChan <- fmt.Errorf("error constructing collation body request: %v", err)
continue
}
if req != nil {
msg := p2p.Message{
Peer: p2p.Peer{},
Data: *req,
}
feed.Send(msg)
log.Info("Sent request for collation body via a shardp2p feed")
s.requestSent <- 1
}
}
}
}

View File

@@ -0,0 +1,265 @@
package simulator
import (
"context"
"errors"
"fmt"
"math/big"
"strings"
"testing"
"time"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/sharding/p2p/messages"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding"
internal "github.com/ethereum/go-ethereum/sharding/internal"
"github.com/ethereum/go-ethereum/sharding/mainchain"
"github.com/ethereum/go-ethereum/sharding/p2p"
"github.com/ethereum/go-ethereum/sharding/params"
)
var _ = sharding.Service(&Simulator{})
type faultyReader struct{}
type goodReader struct{}
type faultySMCCaller struct{}
type goodSMCCaller struct{}
func (f *faultySMCCaller) CollationRecords(opts *bind.CallOpts, arg0 *big.Int, arg1 *big.Int) (struct {
ChunkRoot [32]byte
Proposer common.Address
IsElected bool
}, error) {
res := new(struct {
ChunkRoot [32]byte
Proposer common.Address
IsElected bool
})
return *res, errors.New("error fetching collation record")
}
func (g *goodSMCCaller) CollationRecords(opts *bind.CallOpts, arg0 *big.Int, arg1 *big.Int) (struct {
ChunkRoot [32]byte
Proposer common.Address
IsElected bool
}, error) {
res := new(struct {
ChunkRoot [32]byte
Proposer common.Address
IsElected bool
})
body := []byte{1, 2, 3, 4, 5}
res.ChunkRoot = [32]byte(types.DeriveSha(sharding.Chunks(body)))
res.Proposer = common.BytesToAddress([]byte{})
res.IsElected = false
return *res, nil
}
func (f *faultyReader) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
return nil, fmt.Errorf("cannot fetch block by number")
}
func (f *faultyReader) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
return nil, nil
}
func (g *goodReader) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
return types.NewBlock(&types.Header{Number: big.NewInt(0)}, nil, nil, nil), nil
}
func (g *goodReader) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
return nil, nil
}
func TestStartStop(t *testing.T) {
h := internal.NewLogHandler(t)
log.Root().SetHandler(h)
shardID := 0
server, err := p2p.NewServer()
if err != nil {
t.Fatalf("Unable to setup p2p server: %v", err)
}
simulator, err := NewSimulator(params.DefaultConfig, &mainchain.SMCClient{}, server, shardID, 0)
if err != nil {
t.Fatalf("Unable to setup simulator service: %v", err)
}
simulator.Start()
h.VerifyLogMsg("Starting simulator service")
if err := simulator.Stop(); err != nil {
t.Fatalf("Unable to stop simulator service: %v", err)
}
h.VerifyLogMsg("Stopping simulator service")
// The context should have been canceled.
if simulator.ctx.Err() == nil {
t.Error("Context was not canceled")
}
}
// This test uses a faulty chain reader in order to trigger an error
// in the simulateNotaryRequests goroutine when reading the block number from
// the mainchain via RPC.
func TestSimulateNotaryRequests_FaultyReader(t *testing.T) {
h := internal.NewLogHandler(t)
log.Root().SetHandler(h)
shardID := 0
server, err := p2p.NewServer()
if err != nil {
t.Fatalf("Unable to setup p2p server: %v", err)
}
simulator, err := NewSimulator(params.DefaultConfig, &mainchain.SMCClient{}, server, shardID, 0)
if err != nil {
t.Fatalf("Unable to setup simulator service: %v", err)
}
feed := server.Feed(messages.CollationBodyRequest{})
faultyReader := &faultyReader{}
go simulator.simulateNotaryRequests(&goodSMCCaller{}, faultyReader, feed)
receivedErr := <-simulator.errChan
expectedErr := "could not fetch current block number"
if !strings.Contains(receivedErr.Error(), expectedErr) {
t.Errorf("Expected error did not match. want: %v, got: %v", expectedErr, receivedErr)
}
if err := simulator.Stop(); err != nil {
t.Fatalf("Unable to stop simulator service: %v", err)
}
h.VerifyLogMsg("Stopping simulator service")
// The context should have been canceled.
if simulator.ctx.Err() == nil {
t.Error("Context was not canceled")
}
}
// This test uses a faulty SMCCaller in order to trigger an error
// in the simulateNotaryRequests goroutine when reading the collation records
// from the SMC.
func TestSimulateNotaryRequests_FaultyCaller(t *testing.T) {
h := internal.NewLogHandler(t)
log.Root().SetHandler(h)
shardID := 0
server, err := p2p.NewServer()
if err != nil {
t.Fatalf("Unable to setup p2p server: %v", err)
}
simulator, err := NewSimulator(params.DefaultConfig, &mainchain.SMCClient{}, server, shardID, 0)
if err != nil {
t.Fatalf("Unable to setup simulator service: %v", err)
}
feed := server.Feed(messages.CollationBodyRequest{})
reader := &goodReader{}
go simulator.simulateNotaryRequests(&faultySMCCaller{}, reader, feed)
receivedErr := <-simulator.errChan
expectedErr := "error constructing collation body request"
if !strings.Contains(receivedErr.Error(), expectedErr) {
t.Errorf("Expected error did not match. want: %v, got: %v", expectedErr, receivedErr)
}
if err := simulator.Stop(); err != nil {
t.Fatalf("Unable to stop simulator service: %v", err)
}
h.VerifyLogMsg("Stopping simulator service")
// The context should have been canceled.
if simulator.ctx.Err() == nil {
t.Error("Context was not canceled")
}
}
// This test checks the proper functioning of the simulateNotaryRequests goroutine
// by listening to the requestSent channel which occurs after successful
// construction and sending of a request via p2p.
func TestSimulateNotaryRequests(t *testing.T) {
h := internal.NewLogHandler(t)
log.Root().SetHandler(h)
shardID := 0
server, err := p2p.NewServer()
if err != nil {
t.Fatalf("Unable to setup p2p server: %v", err)
}
simulator, err := NewSimulator(params.DefaultConfig, &mainchain.SMCClient{}, server, shardID, 0)
if err != nil {
t.Fatalf("Unable to setup simulator service: %v", err)
}
feed := server.Feed(messages.CollationBodyRequest{})
reader := &goodReader{}
go simulator.simulateNotaryRequests(&goodSMCCaller{}, reader, feed)
<-simulator.requestSent
h.VerifyLogMsg("Sent request for collation body via a shardp2p feed")
simulator.cancel()
// The context should have been canceled.
if simulator.ctx.Err() == nil {
t.Fatal("Context was not canceled")
}
}
// TODO: Move this to the utils package along with the handleServiceErrors
// function.
func TestHandleServiceErrors(t *testing.T) {
h := internal.NewLogHandler(t)
log.Root().SetHandler(h)
shardID := 0
server, err := p2p.NewServer()
if err != nil {
t.Fatalf("Unable to setup p2p server: %v", err)
}
simulator, err := NewSimulator(params.DefaultConfig, &mainchain.SMCClient{}, server, shardID, 0)
if err != nil {
t.Fatalf("Unable to setup simulator service: %v", err)
}
go simulator.handleServiceErrors()
expectedErr := "testing the error channel"
complete := make(chan int)
go func() {
for {
select {
case <-simulator.ctx.Done():
return
default:
simulator.errChan <- errors.New(expectedErr)
complete <- 1
}
}
}()
<-complete
simulator.cancel()
// The context should have been canceled.
if simulator.ctx.Err() == nil {
t.Fatal("Context was not canceled")
}
time.Sleep(time.Millisecond * 500)
h.VerifyLogMsg(fmt.Sprintf("Simulator service error: %v", expectedErr))
}

View File

@@ -0,0 +1,74 @@
package syncer
import (
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/sharding"
"github.com/ethereum/go-ethereum/sharding/mainchain"
"github.com/ethereum/go-ethereum/sharding/p2p"
"github.com/ethereum/go-ethereum/sharding/p2p/messages"
)
// RespondCollationBody is called by a node responding to another node's request
// for a collation body given a (shardID, chunkRoot, period, proposerAddress) tuple.
// The proposer will fetch the corresponding data from persistent storage (shardDB) by
// constructing a collation header from the input and calculating its hash.
func RespondCollationBody(req p2p.Message, signer mainchain.Signer, collationFetcher sharding.CollationFetcher) (*messages.CollationBodyResponse, error) {
// Type assertion helps us catch incorrect data requests.
msg, ok := req.Data.(messages.CollationBodyRequest)
if !ok {
return nil, fmt.Errorf("received incorrect data request type: %v", msg)
}
header := sharding.NewCollationHeader(msg.ShardID, msg.ChunkRoot, msg.Period, msg.Proposer, nil)
sig, err := signer.Sign(header.Hash())
if err != nil {
return nil, fmt.Errorf("could not sign received header: %v", err)
}
// Adds the signature to the header before calculating the hash used for db lookups.
header.AddSig(sig)
// Fetch the collation by its header hash from the shardChainDB.
headerHash := header.Hash()
collation, err := collationFetcher.CollationByHeaderHash(&headerHash)
if err != nil {
return nil, fmt.Errorf("could not fetch collation: %v", err)
}
return &messages.CollationBodyResponse{HeaderHash: &headerHash, Body: collation.Body()}, nil
}
// RequestCollationBody fetches a collation header record submitted to the SMC for
// a shardID, period pair and constructs a p2p collationBodyRequest that will
// then be relayed to the appropriate proposer that submitted the collation header.
// In production, this will be done within a notary service.
func RequestCollationBody(fetcher mainchain.RecordFetcher, shardID *big.Int, period *big.Int) (*messages.CollationBodyRequest, error) {
record, err := fetcher.CollationRecords(&bind.CallOpts{}, shardID, period)
if err != nil {
return nil, fmt.Errorf("could not fetch collation record from SMC: %v", err)
}
sum := 0
for _, val := range record.ChunkRoot {
sum += int(val)
}
if sum == 0 {
return nil, nil
}
// Converts from fixed size [32]byte to []byte slice.
chunkRoot := common.BytesToHash(record.ChunkRoot[:])
return &messages.CollationBodyRequest{
ChunkRoot: &chunkRoot,
ShardID: shardID,
Period: period,
Proposer: &record.Proposer,
}, nil
}

View File

@@ -0,0 +1,234 @@
package syncer
import (
"bytes"
"errors"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/accounts/abi/bind/backends"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/sharding"
"github.com/ethereum/go-ethereum/sharding/contracts"
"github.com/ethereum/go-ethereum/sharding/p2p"
"github.com/ethereum/go-ethereum/sharding/p2p/messages"
"github.com/ethereum/go-ethereum/sharding/params"
"github.com/ethereum/go-ethereum/sharding/proposer"
)
var (
key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr = crypto.PubkeyToAddress(key.PublicKey)
accountBalance, _ = new(big.Int).SetString("1001000000000000000000", 10)
)
// Mock client for testing proposer.
type mockNode struct {
smc *contracts.SMC
t *testing.T
depositFlag bool
backend *backends.SimulatedBackend
}
type faultySMCCaller struct{}
func (f *faultySMCCaller) CollationRecords(opts *bind.CallOpts, arg0 *big.Int, arg1 *big.Int) (struct {
ChunkRoot [32]byte
Proposer common.Address
IsElected bool
}, error) {
res := new(struct {
ChunkRoot [32]byte
Proposer common.Address
IsElected bool
})
return *res, errors.New("error fetching collation record")
}
func (m *mockNode) CreateTXOpts(value *big.Int) (*bind.TransactOpts, error) {
txOpts := transactOpts()
txOpts.Value = value
return txOpts, nil
}
func (m *mockNode) SMCTransactor() *contracts.SMCTransactor {
return &m.smc.SMCTransactor
}
func (m *mockNode) SMCCaller() *contracts.SMCCaller {
return &m.smc.SMCCaller
}
func (m *mockNode) GetShardCount() (int64, error) {
shardCount, err := m.SMCCaller().ShardCount(&bind.CallOpts{})
if err != nil {
return 0, err
}
return shardCount.Int64(), nil
}
type faultyRequest struct{}
type faultySigner struct{}
type faultyCollationFetcher struct{}
type mockSigner struct{}
type mockCollationFetcher struct{}
func (m *mockSigner) Sign(hash common.Hash) ([]byte, error) {
return []byte{}, nil
}
func (f *faultySigner) Sign(hash common.Hash) ([]byte, error) {
return []byte{}, errors.New("could not sign hash")
}
func (m *mockCollationFetcher) CollationByHeaderHash(headerHash *common.Hash) (*sharding.Collation, error) {
shardID := big.NewInt(1)
chunkRoot := common.BytesToHash([]byte{})
period := big.NewInt(1)
proposerAddress := common.BytesToAddress([]byte{})
header := sharding.NewCollationHeader(shardID, &chunkRoot, period, &proposerAddress, []byte{})
return sharding.NewCollation(header, []byte{}, []*types.Transaction{}), nil
}
func (f *faultyCollationFetcher) CollationByHeaderHash(headerHash *common.Hash) (*sharding.Collation, error) {
return nil, errors.New("could not fetch collation")
}
func transactOpts() *bind.TransactOpts {
return bind.NewKeyedTransactor(key)
}
func setup(t *testing.T) (*backends.SimulatedBackend, *contracts.SMC) {
backend := backends.NewSimulatedBackend(core.GenesisAlloc{addr: {Balance: accountBalance}})
_, _, smc, err := contracts.DeploySMC(transactOpts(), backend)
if err != nil {
t.Fatalf("Failed to deploy SMC contract: %v", err)
}
backend.Commit()
return backend, smc
}
func TestCollationBodyResponse(t *testing.T) {
proposerAddress := common.BytesToAddress([]byte{})
chunkRoot := common.BytesToHash([]byte{})
goodReq := messages.CollationBodyRequest{
ChunkRoot: &chunkRoot,
ShardID: big.NewInt(1),
Period: big.NewInt(1),
Proposer: &proposerAddress,
}
incorrectReq := faultyRequest{}
signer := &mockSigner{}
faultySigner := &faultySigner{}
fetcher := &mockCollationFetcher{}
faultyFetcher := &faultyCollationFetcher{}
badMsg := p2p.Message{
Peer: p2p.Peer{},
Data: incorrectReq,
}
goodMsg := p2p.Message{
Peer: p2p.Peer{},
Data: goodReq,
}
if _, err := RespondCollationBody(badMsg, signer, fetcher); err == nil {
t.Errorf("Incorrect request should throw error. Expecting messages.CollationBodyRequest{}, received: %v", incorrectReq)
}
if _, err := RespondCollationBody(goodMsg, faultySigner, fetcher); err == nil {
t.Error("Faulty signer should cause function to throw error. no error thrown.")
}
if _, err := RespondCollationBody(goodMsg, signer, faultyFetcher); err == nil {
t.Error("Faulty collatiom fetcher should cause function to throw error. no error thrown.")
}
header := sharding.NewCollationHeader(goodReq.ShardID, goodReq.ChunkRoot, goodReq.Period, goodReq.Proposer, []byte{})
body := []byte{}
response, err := RespondCollationBody(goodMsg, signer, fetcher)
if err != nil {
t.Fatalf("Could not construct collation body response: %v", err)
}
if response.HeaderHash.Hex() != header.Hash().Hex() {
t.Errorf("Incorrect header hash received. want: %v, received: %v", header.Hash().Hex(), response.HeaderHash.Hex())
}
if !bytes.Equal(response.Body, body) {
t.Errorf("Incorrect collation body received. want: %v, received: %v", response.Body, body)
}
}
func TestConstructNotaryRequest(t *testing.T) {
backend, smc := setup(t)
node := &mockNode{smc: smc, t: t, backend: backend}
// Fast forward to next period.
for i := 0; i < int(params.DefaultConfig.PeriodLength); i++ {
backend.Commit()
}
shardID := big.NewInt(0)
period := big.NewInt(1)
// We set the proposer address to the address used to setup the backend.
proposerAddress := addr
chunkRoot := common.BytesToHash([]byte("chunkroottest"))
header := sharding.NewCollationHeader(shardID, &chunkRoot, period, &addr, []byte{})
collation := sharding.NewCollation(header, []byte{}, []*types.Transaction{})
// Adds the header to the SMC.
if err := proposer.AddHeader(node, collation); err != nil {
t.Fatalf("Failed to add header to SMC: %v", err)
}
backend.Commit()
if _, err := RequestCollationBody(&faultySMCCaller{}, shardID, period); err == nil {
t.Errorf("Expected error from RequestCollationBody when using faulty SMCCaller, got nil")
}
request, err := RequestCollationBody(node.SMCCaller(), shardID, period)
if err != nil {
t.Fatalf("Could not construct request: %v", err)
}
// fetching an inexistent shardID, period pair from the SMC will return a nil request.
nilRequest, err := RequestCollationBody(node.SMCCaller(), big.NewInt(20), big.NewInt(20))
if err != nil {
t.Fatalf("Could not construct request: %v", err)
}
if nilRequest != nil {
t.Errorf("constructNotaryRequest should return nil for an inexistent collation header. got: %v", err)
}
if request.ChunkRoot.Hex() != chunkRoot.Hex() {
t.Errorf("Chunk root from notary request incorrect. want: %v, got: %v", chunkRoot.Hex(), request.ChunkRoot.Hex())
}
if request.Proposer.Hex() != proposerAddress.Hex() {
t.Errorf("Proposer address from notary request incorrect. want: %v, got: %v", proposerAddress.Hex(), request.Proposer.Hex())
}
if request.ShardID.Cmp(shardID) != 0 {
t.Errorf("ShardID from notary request incorrect. want: %s, got: %s", shardID, request.ShardID)
}
if request.Period.Cmp(period) != 0 {
t.Errorf("Proposer address from notary request incorrect. want: %s, got: %s", period, request.Period)
}
}

105
sharding/syncer/service.go Normal file
View File

@@ -0,0 +1,105 @@
package syncer
import (
"context"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding"
"github.com/ethereum/go-ethereum/sharding/mainchain"
"github.com/ethereum/go-ethereum/sharding/p2p"
"github.com/ethereum/go-ethereum/sharding/p2p/messages"
"github.com/ethereum/go-ethereum/sharding/params"
)
// Syncer represents a service that provides handlers for shard chain
// data requests/responses between remote nodes and event loops for
// performing windback sync across nodes, handling reorgs, and synchronizing
// items such as transactions and in future sharding iterations: state.
type Syncer struct {
config *params.Config
client *mainchain.SMCClient
shard *sharding.Shard
p2p *p2p.Server
ctx context.Context
cancel context.CancelFunc
errChan chan error // Useful channel for handling errors at the service layer.
responseSent chan int // Useful channel for handling outgoing responses from the service.
}
// NewSyncer creates a struct instance of a syncer service.
// It will have access to config, a mainchain client, a p2p server,
// a shardChainDb, and a shardID.
func NewSyncer(config *params.Config, client *mainchain.SMCClient, p2p *p2p.Server, shardChainDB ethdb.Database, shardID int) (*Syncer, error) {
ctx, cancel := context.WithCancel(context.Background())
shard := sharding.NewShard(big.NewInt(int64(shardID)), shardChainDB)
errChan := make(chan error)
responseSent := make(chan int)
return &Syncer{config, client, shard, p2p, ctx, cancel, errChan, responseSent}, nil
}
// Start the main loop for handling shard chain data requests.
func (s *Syncer) Start() {
log.Info("Starting sync service")
go s.handleCollationBodyRequests(s.client, s.p2p.Feed(messages.CollationBodyRequest{}))
go s.handleServiceErrors()
}
// Stop the main loop.
func (s *Syncer) Stop() error {
// Triggers a cancel call in the service's context which shuts down every goroutine
// in this service.
defer s.cancel()
log.Warn("Stopping sync service")
return nil
}
// handleServiceErrors manages a goroutine that listens for errors broadcast to
// this service's error channel. This serves as a final step for error logging
// and is stopped upon the service shutting down.
func (s *Syncer) handleServiceErrors() {
for {
select {
case <-s.ctx.Done():
return
case err := <-s.errChan:
log.Error(fmt.Sprintf("Sync service error: %v", err))
}
}
}
// handleCollationBodyRequests subscribes to messages from the shardp2p
// network and responds to a specific peer that requested the body using
// the Send method exposed by the p2p server's API (implementing the p2p.Sender interface).
func (s *Syncer) handleCollationBodyRequests(signer mainchain.Signer, feed *event.Feed) {
ch := make(chan p2p.Message, 100)
sub := feed.Subscribe(ch)
defer sub.Unsubscribe()
for {
select {
// Makes sure to close this goroutine when the service stops.
case <-s.ctx.Done():
return
case req := <-ch:
if req.Data != nil {
log.Info(fmt.Sprintf("Received p2p request of type: %T", req))
res, err := RespondCollationBody(req, signer, s.shard)
if err != nil {
s.errChan <- fmt.Errorf("could not construct response: %v", err)
continue
}
// Reply to that specific peer only.
s.p2p.Send(*res, req.Peer)
log.Info(fmt.Sprintf("Responding to p2p request with collation with headerHash: %v", res.HeaderHash.Hex()))
s.responseSent <- 1
}
}
}
}

View File

@@ -0,0 +1,232 @@
package syncer
import (
"errors"
"fmt"
"math/big"
"strings"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/sharding/mainchain"
"github.com/ethereum/go-ethereum/sharding/p2p/messages"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/sharding"
"github.com/ethereum/go-ethereum/sharding/database"
internal "github.com/ethereum/go-ethereum/sharding/internal"
"github.com/ethereum/go-ethereum/sharding/p2p"
"github.com/ethereum/go-ethereum/sharding/params"
)
var _ = sharding.Service(&Syncer{})
func TestStartStop(t *testing.T) {
h := internal.NewLogHandler(t)
log.Root().SetHandler(h)
shardChainDB := database.NewShardKV()
shardID := 0
server, err := p2p.NewServer()
if err != nil {
t.Fatalf("Unable to setup p2p server: %v", err)
}
syncer, err := NewSyncer(params.DefaultConfig, &mainchain.SMCClient{}, server, shardChainDB, shardID)
if err != nil {
t.Fatalf("Unable to setup sync service: %v", err)
}
syncer.Start()
h.VerifyLogMsg("Starting sync service")
if err := syncer.Stop(); err != nil {
t.Fatalf("Unable to stop sync service: %v", err)
}
h.VerifyLogMsg("Stopping sync service")
// The context should have been canceled.
if syncer.ctx.Err() == nil {
t.Error("Context was not canceled")
}
}
// This test uses a faulty Signer interface in order to trigger an error
// in the simulateNotaryRequests goroutine when attempting to sign
// a collation header within the goroutine's internals.
func TestHandleCollationBodyRequests_FaultySigner(t *testing.T) {
h := internal.NewLogHandler(t)
log.Root().SetHandler(h)
shardChainDB := database.NewShardKV()
shardID := 0
server, err := p2p.NewServer()
if err != nil {
t.Fatalf("Unable to setup p2p server: %v", err)
}
syncer, err := NewSyncer(params.DefaultConfig, &mainchain.SMCClient{}, server, shardChainDB, shardID)
if err != nil {
t.Fatalf("Unable to setup syncer service: %v", err)
}
feed := server.Feed(messages.CollationBodyRequest{})
go syncer.handleCollationBodyRequests(&faultySigner{}, feed)
go func() {
for {
select {
case <-syncer.ctx.Done():
return
default:
msg := p2p.Message{
Peer: p2p.Peer{},
Data: messages.CollationBodyRequest{},
}
feed.Send(msg)
}
}
}()
receivedErr := <-syncer.errChan
expectedErr := "could not construct response"
if !strings.Contains(receivedErr.Error(), expectedErr) {
t.Errorf("Expected error did not match. want: %v, got: %v", expectedErr, receivedErr)
}
syncer.cancel()
// The context should have been canceled.
if syncer.ctx.Err() == nil {
t.Fatal("Context was not canceled")
}
}
// This test checks the proper functioning of the handleCollationBodyRequests goroutine
// by listening to the responseSent channel which occurs after successful
// construction and sending of a response via p2p.
func TestHandleCollationBodyRequests(t *testing.T) {
h := internal.NewLogHandler(t)
log.Root().SetHandler(h)
shardChainDB := database.NewShardKV()
server, err := p2p.NewServer()
if err != nil {
t.Fatalf("Unable to setup p2p server: %v", err)
}
body := []byte{1, 2, 3, 4, 5}
shardID := big.NewInt(0)
chunkRoot := types.DeriveSha(sharding.Chunks(body))
period := big.NewInt(0)
proposerAddress := common.BytesToAddress([]byte{})
signer := &mockSigner{}
header := sharding.NewCollationHeader(shardID, &chunkRoot, period, &proposerAddress, nil)
sig, err := signer.Sign(header.Hash())
if err != nil {
t.Fatalf("Could not sign header: %v", err)
}
// Adds the signature to the header before calculating the hash used for db lookups.
header.AddSig(sig)
// Stores the collation into the inmemory kv store shardChainDB.
collation := sharding.NewCollation(header, body, nil)
shard := sharding.NewShard(shardID, shardChainDB)
if err := shard.SaveCollation(collation); err != nil {
t.Fatalf("Could not store collation in shardChainDB: %v", err)
}
syncer, err := NewSyncer(params.DefaultConfig, &mainchain.SMCClient{}, server, shardChainDB, 0)
if err != nil {
t.Fatalf("Unable to setup syncer service: %v", err)
}
feed := server.Feed(messages.CollationBodyRequest{})
go syncer.handleCollationBodyRequests(&mockSigner{}, feed)
go func() {
for {
select {
case <-syncer.ctx.Done():
return
default:
msg := p2p.Message{
Peer: p2p.Peer{},
Data: messages.CollationBodyRequest{
ChunkRoot: &chunkRoot,
ShardID: shardID,
Period: period,
Proposer: &proposerAddress,
},
}
feed.Send(msg)
}
}
}()
<-syncer.responseSent
h.VerifyLogMsg(fmt.Sprintf("Received p2p request of type: %T", p2p.Message{}))
h.VerifyLogMsg(fmt.Sprintf("Responding to p2p request with collation with headerHash: %v", header.Hash().Hex()))
syncer.cancel()
// The context should have been canceled.
if syncer.ctx.Err() == nil {
t.Fatal("Context was not canceled")
}
}
// TODO: Move this to the utils package along with the handleServiceErrors
// function.
func TestHandleServiceErrors(t *testing.T) {
h := internal.NewLogHandler(t)
log.Root().SetHandler(h)
shardChainDB := database.NewShardKV()
shardID := 0
server, err := p2p.NewServer()
if err != nil {
t.Fatalf("Unable to setup p2p server: %v", err)
}
syncer, err := NewSyncer(params.DefaultConfig, &mainchain.SMCClient{}, server, shardChainDB, shardID)
if err != nil {
t.Fatalf("Unable to setup syncer service: %v", err)
}
go syncer.handleServiceErrors()
expectedErr := "testing the error channel"
complete := make(chan int)
go func() {
for {
select {
case <-syncer.ctx.Done():
return
default:
syncer.errChan <- errors.New(expectedErr)
complete <- 1
}
}
}()
<-complete
syncer.cancel()
// The context should have been canceled.
if syncer.ctx.Err() == nil {
t.Fatal("Context was not canceled")
}
time.Sleep(time.Millisecond * 500)
h.VerifyLogMsg(fmt.Sprintf("Sync service error: %v", expectedErr))
}