shared: Abstract Service Registry Into Shared Folder, Add Beacon Node and Beacon Entry Point (#268)

Former-commit-id: 406ba2f1e65ec58e822fcf1b9d54c44ba51a559c [formerly 52aebe050663c4dc73fc56e5e4c6846620267f1f]
Former-commit-id: c959a9fda119e4403136ac4f8d1b345d464ab5df
This commit is contained in:
Raul Jordan
2018-07-13 21:15:37 -05:00
committed by GitHub
parent 739f34f26c
commit 83569f1342
16 changed files with 392 additions and 198 deletions

View File

@@ -1,8 +1,19 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
go_library(
name = "go_default_library",
srcs = ["config.go"],
srcs = ["main.go"],
importpath = "github.com/prysmaticlabs/geth-sharding/beacon-chain",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/node:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli//:go_default_library",
],
)
go_binary(
name = "beacon-chain",
embed = [":go_default_library"],
visibility = ["//beacon-chain:__subpackages__"],
)

36
beacon-chain/main.go Normal file
View File

@@ -0,0 +1,36 @@
package main
import (
"os"
"runtime"
"github.com/prysmaticlabs/geth-sharding/beacon-chain/node"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli"
)
func startNode(ctx *cli.Context) error {
beacon, err := node.New(ctx)
if err != nil {
return err
}
beacon.Start()
return nil
}
func main() {
app := cli.NewApp()
app.Name = "beacon-chain"
app.Usage = "this is a beacon chain implementation for Ethereum 2.0"
app.Action = startNode
app.Before = func(ctx *cli.Context) error {
runtime.GOMAXPROCS(runtime.NumCPU())
return nil
}
if err := app.Run(os.Args); err != nil {
log.Error(err.Error())
os.Exit(1)
}
}

View File

@@ -0,0 +1,20 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["node.go"],
importpath = "github.com/prysmaticlabs/geth-sharding/beacon-chain/node",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//shared:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["node_test.go"],
embed = [":go_default_library"],
deps = ["@com_github_urfave_cli//:go_default_library"],
)

74
beacon-chain/node/node.go Normal file
View File

@@ -0,0 +1,74 @@
package node
import (
"os"
"os/signal"
"sync"
"syscall"
"github.com/prysmaticlabs/geth-sharding/shared"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli"
)
// BeaconNode defines a struct that handles the services running a random beacon chain
// full PoS node. It handles the lifecycle of the entire system and registers
// services to a service registry.
type BeaconNode struct {
services *shared.ServiceRegistry
lock sync.RWMutex
stop chan struct{} // Channel to wait for termination notifications.
}
// New creates a new node instance, sets up configuration options, and registers
// every required service to the node.
func New(ctx *cli.Context) (*BeaconNode, error) {
registry := shared.NewServiceRegistry()
beacon := &BeaconNode{
services: registry,
stop: make(chan struct{}),
}
return beacon, nil
}
// Start the BeaconNode and kicks off every registered service.
func (b *BeaconNode) Start() {
b.lock.Lock()
log.Info("Starting beacon node")
b.services.StartAll()
stop := b.stop
b.lock.Unlock()
go func() {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(sigc)
<-sigc
log.Info("Got interrupt, shutting down...")
go b.Close()
for i := 10; i > 0; i-- {
<-sigc
if i > 1 {
log.Info("Already shutting down, interrupt more to panic.", "times", i-1)
}
}
panic("Panic closing the beacon node")
}()
// Wait for stop channel to be closed.
<-stop
}
// Close handles graceful shutdown of the system.
func (b *BeaconNode) Close() {
b.lock.Lock()
defer b.lock.Unlock()
b.services.StopAll()
log.Info("Stopping beacon node")
close(b.stop)
}

View File

@@ -0,0 +1,18 @@
package node
import (
"testing"
"github.com/urfave/cli"
)
// Test that the sharding node can build with default flag values.
func TestNode_Builds(t *testing.T) {
app := cli.NewApp()
context := cli.NewContext(app, nil, nil)
_, err := New(context)
if err != nil {
t.Fatalf("Failed to create BeaconNode: %v", err)
}
}

View File

@@ -0,0 +1,8 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["config.go"],
importpath = "github.com/prysmaticlabs/geth-sharding/beacon-chain/params",
visibility = ["//beacon-chain:__subpackages__"],
)

View File

@@ -1,4 +1,4 @@
package beacon
package params
const (
// AttesterCount is the number of attesters per committee/

View File

@@ -8,6 +8,7 @@ go_library(
deps = [
"//sharding/node:go_default_library",
"//sharding/utils:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli//:go_default_library",
],
)

View File

@@ -1,12 +1,12 @@
package main
import (
"fmt"
"os"
"runtime"
"github.com/prysmaticlabs/geth-sharding/sharding/node"
"github.com/prysmaticlabs/geth-sharding/sharding/utils"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli"
)
@@ -61,9 +61,7 @@ VERSION:
}
if err := app.Run(os.Args); err != nil {
if _, err := fmt.Fprintln(os.Stderr, err); err != nil {
panic(err)
}
log.Error(err.Error())
os.Exit(1)
}
}

View File

@@ -18,6 +18,7 @@ go_library(
"//sharding/txpool:go_default_library",
"//sharding/types:go_default_library",
"//sharding/utils:go_default_library",
"//shared:go_default_library",
"@com_github_ethereum_go_ethereum//event:go_default_library",
"@com_github_ethereum_go_ethereum//node:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
@@ -29,9 +30,5 @@ go_test(
name = "go_default_test",
srcs = ["backend_test.go"],
embed = [":go_default_library"],
deps = [
"//sharding/params:go_default_library",
"//sharding/types:go_default_library",
"@com_github_urfave_cli//:go_default_library",
],
deps = ["@com_github_urfave_cli//:go_default_library"],
)

View File

@@ -8,7 +8,6 @@ import (
"fmt"
"os"
"os/signal"
"reflect"
"sync"
"syscall"
"time"
@@ -27,6 +26,7 @@ import (
"github.com/prysmaticlabs/geth-sharding/sharding/txpool"
"github.com/prysmaticlabs/geth-sharding/sharding/types"
"github.com/prysmaticlabs/geth-sharding/sharding/utils"
"github.com/prysmaticlabs/geth-sharding/shared"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli"
)
@@ -43,17 +43,17 @@ type ShardEthereum struct {
eventFeed *event.Feed // Used to enable P2P related interactions via different sharding actors.
// Lifecycle and service stores.
services map[reflect.Type]types.Service // Service registry.
serviceTypes []reflect.Type // Keeps an ordered slice of registered service types.
lock sync.RWMutex
stop chan struct{} // Channel to wait for termination notifications
services *shared.ServiceRegistry
lock sync.RWMutex
stop chan struct{} // Channel to wait for termination notifications.
}
// New creates a new sharding-enabled Ethereum instance. This is called in the main
// geth sharding entrypoint.
func New(ctx *cli.Context) (*ShardEthereum, error) {
registry := shared.NewServiceRegistry()
shardEthereum := &ShardEthereum{
services: make(map[reflect.Type]types.Service),
services: registry,
stop: make(chan struct{}),
}
@@ -99,10 +99,7 @@ func (s *ShardEthereum) Start() {
log.Info("Starting sharding node")
for _, kind := range s.serviceTypes {
// Start each service in order of registration.
s.services[kind].Start()
}
s.services.StartAll()
stop := s.stop
s.lock.Unlock()
@@ -120,11 +117,10 @@ func (s *ShardEthereum) Start() {
log.Info("Already shutting down, interrupt more to panic.", "times", i-1)
}
}
// ensure trace and CPU profile data is flushed.
panic("Panic closing the sharding node")
}()
// Wait for stop channel to be closed
// Wait for stop channel to be closed.
<-stop
}
@@ -133,44 +129,12 @@ func (s *ShardEthereum) Close() {
s.lock.Lock()
defer s.lock.Unlock()
for kind, service := range s.services {
if err := service.Stop(); err != nil {
log.Panicf("Could not stop the following service: %v, %v", kind, err)
}
}
s.services.StopAll()
log.Info("Stopping sharding node")
// unblock n.Wait
close(s.stop)
}
// registerService appends a service constructor function to the service registry of the
// sharding node.
func (s *ShardEthereum) registerService(service types.Service) error {
kind := reflect.TypeOf(service)
if _, exists := s.services[kind]; exists {
return fmt.Errorf("service already exists: %v", kind)
}
s.services[kind] = service
s.serviceTypes = append(s.serviceTypes, kind)
return nil
}
// fetchService takes in a struct pointer and sets the value of that pointer
// to a service currently stored in the service registry. This ensures the input argument is
// set to the right pointer that refers to the originally registered service.
func (s *ShardEthereum) fetchService(service interface{}) error {
if reflect.TypeOf(service).Kind() != reflect.Ptr {
return fmt.Errorf("input must be of pointer type, received value type instead: %T", service)
}
element := reflect.ValueOf(service).Elem()
if running, ok := s.services[element.Type()]; ok {
element.Set(reflect.ValueOf(running))
return nil
}
return fmt.Errorf("unknown service: %T", service)
}
// registerShardChainDB attaches a LevelDB wrapped object to the shardEthereum instance.
func (s *ShardEthereum) registerShardChainDB(ctx *cli.Context) error {
path := node.DefaultDataDir()
@@ -181,21 +145,18 @@ func (s *ShardEthereum) registerShardChainDB(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("could not register shardDB service: %v", err)
}
return s.registerService(shardDB)
return s.services.RegisterService(shardDB)
}
// registerP2P attaches a p2p server to the ShardEthereum instance.
// TODO: Design this p2p service and the methods it should expose as well as
// its event loop.
func (s *ShardEthereum) registerP2P() error {
shardp2p, err := p2p.NewServer()
if err != nil {
return fmt.Errorf("could not register shardp2p service: %v", err)
}
return s.registerService(shardp2p)
return s.services.RegisterService(shardp2p)
}
// registerMainchainClient
func (s *ShardEthereum) registerMainchainClient(ctx *cli.Context) error {
path := node.DefaultDataDir()
if ctx.GlobalIsSet(utils.DataDirFlag.Name) {
@@ -216,7 +177,7 @@ func (s *ShardEthereum) registerMainchainClient(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("could not register smc client service: %v", err)
}
return s.registerService(client)
return s.services.RegisterService(client)
}
// registerTXPool is only relevant to proposers in the sharded system. It will
@@ -229,34 +190,34 @@ func (s *ShardEthereum) registerTXPool(actor string) error {
return nil
}
var shardp2p *p2p.Server
if err := s.fetchService(&shardp2p); err != nil {
if err := s.services.FetchService(&shardp2p); err != nil {
return err
}
pool, err := txpool.NewTXPool(shardp2p)
if err != nil {
return fmt.Errorf("could not register shard txpool service: %v", err)
}
return s.registerService(pool)
return s.services.RegisterService(pool)
}
// Registers the actor according to CLI flags. Either notary/proposer/observer.
func (s *ShardEthereum) registerActorService(config *params.Config, actor string, shardID int) error {
var shardp2p *p2p.Server
if err := s.fetchService(&shardp2p); err != nil {
if err := s.services.FetchService(&shardp2p); err != nil {
return err
}
var client *mainchain.SMCClient
if err := s.fetchService(&client); err != nil {
if err := s.services.FetchService(&client); err != nil {
return err
}
var shardChainDB *database.ShardDB
if err := s.fetchService(&shardChainDB); err != nil {
if err := s.services.FetchService(&shardChainDB); err != nil {
return err
}
var sync *syncer.Syncer
if err := s.fetchService(&sync); err != nil {
if err := s.services.FetchService(&sync); err != nil {
return err
}
@@ -266,10 +227,10 @@ func (s *ShardEthereum) registerActorService(config *params.Config, actor string
if err != nil {
return fmt.Errorf("could not register notary service: %v", err)
}
return s.registerService(not)
return s.services.RegisterService(not)
case "proposer":
var pool *txpool.TXPool
if err := s.fetchService(&pool); err != nil {
if err := s.services.FetchService(&pool); err != nil {
return err
}
@@ -277,19 +238,19 @@ func (s *ShardEthereum) registerActorService(config *params.Config, actor string
if err != nil {
return fmt.Errorf("could not register proposer service: %v", err)
}
return s.registerService(prop)
return s.services.RegisterService(prop)
case "simulator":
sim, err := simulator.NewSimulator(config, client, shardp2p, shardID, 15) // 15 second delay between simulator requests.
if err != nil {
return fmt.Errorf("could not register simulator service: %v", err)
}
return s.registerService(sim)
return s.services.RegisterService(sim)
default:
obs, err := observer.NewObserver(shardp2p, shardChainDB, shardID, sync, client)
if err != nil {
return fmt.Errorf("could not register observer service: %v", err)
}
return s.registerService(obs)
return s.services.RegisterService(obs)
}
}
@@ -301,11 +262,11 @@ func (s *ShardEthereum) registerSimulatorService(actorFlag string, config *param
}
var shardp2p *p2p.Server
if err := s.fetchService(&shardp2p); err != nil {
if err := s.services.FetchService(&shardp2p); err != nil {
return err
}
var client *mainchain.SMCClient
if err := s.fetchService(&client); err != nil {
if err := s.services.FetchService(&client); err != nil {
return err
}
@@ -314,21 +275,21 @@ func (s *ShardEthereum) registerSimulatorService(actorFlag string, config *param
if err != nil {
return fmt.Errorf("could not register simulator service: %v", err)
}
return s.registerService(sim)
return s.services.RegisterService(sim)
}
func (s *ShardEthereum) registerSyncerService(config *params.Config, shardID int) error {
var shardp2p *p2p.Server
if err := s.fetchService(&shardp2p); err != nil {
if err := s.services.FetchService(&shardp2p); err != nil {
return err
}
var client *mainchain.SMCClient
if err := s.fetchService(&client); err != nil {
if err := s.services.FetchService(&client); err != nil {
return err
}
var shardChainDB *database.ShardDB
if err := s.fetchService(&shardChainDB); err != nil {
if err := s.services.FetchService(&shardChainDB); err != nil {
return err
}
@@ -336,5 +297,5 @@ func (s *ShardEthereum) registerSyncerService(config *params.Config, shardID int
if err != nil {
return fmt.Errorf("could not register syncer service: %v", err)
}
return s.registerService(sync)
return s.services.RegisterService(sync)
}

View File

@@ -2,37 +2,11 @@ package node
import (
"flag"
"reflect"
"testing"
"github.com/prysmaticlabs/geth-sharding/sharding/params"
"github.com/prysmaticlabs/geth-sharding/sharding/types"
"github.com/urfave/cli"
)
// Verifies that ShardEthereum implements the Node interface.
var _ = types.Node(&ShardEthereum{})
type mockService struct{}
type secondMockService struct{}
func (m *mockService) Start() {
return
}
func (m *mockService) Stop() error {
return nil
}
func (s *secondMockService) Start() {
return
}
func (s *secondMockService) Stop() error {
return nil
}
// Test that the sharding node can build with default flag values.
func TestNode_Builds(t *testing.T) {
app := cli.NewApp()
@@ -45,92 +19,3 @@ func TestNode_Builds(t *testing.T) {
t.Fatalf("Failed to create ShardEthereum: %v", err)
}
}
func TestRegisterServiceTwice(t *testing.T) {
shardEthereum := &ShardEthereum{
services: make(map[reflect.Type]types.Service),
stop: make(chan struct{}),
}
// Configure shardConfig by loading the default.
shardEthereum.shardConfig = params.DefaultConfig
m := &mockService{}
if err := shardEthereum.registerService(m); err != nil {
t.Fatalf("failed to register first service")
}
// checks if first service was indeed registered
if len(shardEthereum.serviceTypes) != 1 {
t.Fatalf("service types slice should contain 1 service, contained %v", len(shardEthereum.serviceTypes))
}
if err := shardEthereum.registerService(m); err == nil {
t.Errorf("should not be able to register a service twice, got nil error")
}
}
func TestRegisterDifferentServices(t *testing.T) {
shardEthereum := &ShardEthereum{
services: make(map[reflect.Type]types.Service),
stop: make(chan struct{}),
}
// Configure shardConfig by loading the default.
shardEthereum.shardConfig = params.DefaultConfig
m := &mockService{}
s := &secondMockService{}
if err := shardEthereum.registerService(m); err != nil {
t.Fatalf("failed to register first service")
}
if err := shardEthereum.registerService(s); err != nil {
t.Fatalf("failed to register second service")
}
if len(shardEthereum.serviceTypes) != 2 {
t.Errorf("service types slice should contain 2 services, contained %v", len(shardEthereum.serviceTypes))
}
if _, exists := shardEthereum.services[reflect.TypeOf(m)]; !exists {
t.Errorf("service of type %v not registered", reflect.TypeOf(m))
}
if _, exists := shardEthereum.services[reflect.TypeOf(s)]; !exists {
t.Errorf("service of type %v not registered", reflect.TypeOf(s))
}
}
func TestFetchService(t *testing.T) {
shardEthereum := &ShardEthereum{
services: make(map[reflect.Type]types.Service),
stop: make(chan struct{}),
}
// Configure shardConfig by loading the default.
shardEthereum.shardConfig = params.DefaultConfig
m := &mockService{}
if err := shardEthereum.registerService(m); err != nil {
t.Fatalf("failed to register first service")
}
if err := shardEthereum.fetchService(*m); err == nil {
t.Errorf("passing in a value should throw an error, received nil error")
}
var s *secondMockService
if err := shardEthereum.fetchService(&s); err == nil {
t.Errorf("fetching an unregistered service should return an error, got nil")
}
var m2 *mockService
if err := shardEthereum.fetchService(&m2); err != nil {
t.Fatalf("failed to fetch service")
}
if m2 != m {
t.Errorf("pointers were not equal, instead got %p, %p", m2, m)
}
}

View File

@@ -2,7 +2,11 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["marshal.go"],
srcs = [
"marshal.go",
"service_registry.go",
"types.go",
],
importpath = "github.com/prysmaticlabs/geth-sharding/shared",
visibility = ["//visibility:public"],
deps = ["@com_github_ethereum_go_ethereum//rlp:go_default_library"],
@@ -10,6 +14,9 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["marshal_test.go"],
srcs = [
"marshal_test.go",
"service_registry_test.go",
],
embed = [":go_default_library"],
)

View File

@@ -0,0 +1,65 @@
package shared
import (
"fmt"
"log"
"reflect"
)
// ServiceRegistry provides a useful pattern for managing services.
// It allows for ease of dependency management and ensures services
// dependent on others use the same references in memory.
type ServiceRegistry struct {
services map[reflect.Type]Service // map of types to services.
serviceTypes []reflect.Type // keep an ordered slice of registered service types.
}
// NewServiceRegistry starts a registry instance for convenience
func NewServiceRegistry() *ServiceRegistry {
return &ServiceRegistry{
services: make(map[reflect.Type]Service),
}
}
// StartAll initialized each service in order of registration.
func (s *ServiceRegistry) StartAll() {
for _, kind := range s.serviceTypes {
s.services[kind].Start()
}
}
// StopAll ends every service, logging a panic if any of them fail to stop.
func (s *ServiceRegistry) StopAll() {
for kind, service := range s.services {
if err := service.Stop(); err != nil {
log.Panicf("Could not stop the following service: %v, %v", kind, err)
}
}
}
// RegisterService appends a service constructor function to the service
// registry.
func (s *ServiceRegistry) RegisterService(service Service) error {
kind := reflect.TypeOf(service)
if _, exists := s.services[kind]; exists {
return fmt.Errorf("service already exists: %v", kind)
}
s.services[kind] = service
s.serviceTypes = append(s.serviceTypes, kind)
return nil
}
// FetchService takes in a struct pointer and sets the value of that pointer
// to a service currently stored in the service registry. This ensures the input argument is
// set to the right pointer that refers to the originally registered service.
func (s *ServiceRegistry) FetchService(service interface{}) error {
if reflect.TypeOf(service).Kind() != reflect.Ptr {
return fmt.Errorf("input must be of pointer type, received value type instead: %T", service)
}
element := reflect.ValueOf(service).Elem()
if running, ok := s.services[element.Type()]; ok {
element.Set(reflect.ValueOf(running))
return nil
}
return fmt.Errorf("unknown service: %T", service)
}

View File

@@ -0,0 +1,102 @@
package shared
import (
"reflect"
"testing"
)
type mockService struct{}
type secondMockService struct{}
func (m *mockService) Start() {
return
}
func (m *mockService) Stop() error {
return nil
}
func (s *secondMockService) Start() {
return
}
func (s *secondMockService) Stop() error {
return nil
}
func TestRegisterServiceTwice(t *testing.T) {
registry := &ServiceRegistry{
services: make(map[reflect.Type]Service),
}
m := &mockService{}
if err := registry.RegisterService(m); err != nil {
t.Fatalf("failed to register first service")
}
// Checks if first service was indeed registered.
if len(registry.serviceTypes) != 1 {
t.Fatalf("service types slice should contain 1 service, contained %v", len(registry.serviceTypes))
}
if err := registry.RegisterService(m); err == nil {
t.Errorf("should not be able to register a service twice, got nil error")
}
}
func TestRegisterDifferentServices(t *testing.T) {
registry := &ServiceRegistry{
services: make(map[reflect.Type]Service),
}
m := &mockService{}
s := &secondMockService{}
if err := registry.RegisterService(m); err != nil {
t.Fatalf("failed to register first service")
}
if err := registry.RegisterService(s); err != nil {
t.Fatalf("failed to register second service")
}
if len(registry.serviceTypes) != 2 {
t.Errorf("service types slice should contain 2 services, contained %v", len(registry.serviceTypes))
}
if _, exists := registry.services[reflect.TypeOf(m)]; !exists {
t.Errorf("service of type %v not registered", reflect.TypeOf(m))
}
if _, exists := registry.services[reflect.TypeOf(s)]; !exists {
t.Errorf("service of type %v not registered", reflect.TypeOf(s))
}
}
func TestFetchService(t *testing.T) {
registry := &ServiceRegistry{
services: make(map[reflect.Type]Service),
}
m := &mockService{}
if err := registry.RegisterService(m); err != nil {
t.Fatalf("failed to register first service")
}
if err := registry.FetchService(*m); err == nil {
t.Errorf("passing in a value should throw an error, received nil error")
}
var s *secondMockService
if err := registry.FetchService(&s); err == nil {
t.Errorf("fetching an unregistered service should return an error, got nil")
}
var m2 *mockService
if err := registry.FetchService(&m2); err != nil {
t.Fatalf("failed to fetch service")
}
if m2 != m {
t.Errorf("pointers were not equal, instead got %p, %p", m2, m)
}
}

11
shared/types.go Normal file
View File

@@ -0,0 +1,11 @@
package shared
// Service is a struct that can be registered into a ServiceRegistry for
// easy dependency management.
type Service interface {
// Start spawns any goroutines required by the service.
Start()
// Stop terminates all goroutines belonging to the service,
// blocking until they are all terminated.
Stop() error
}