Archiver Service Definition (#3507)

* archive flags

* gaz

* create archiver

* register archiver in node

* registering the head updater feed

* add more gazelle

* cancel func

* test for service

* properly utilize the mocks

* lint

* remove extraneous log

* add back write to disk

* gaz
This commit is contained in:
Raul Jordan
2019-09-18 09:30:02 -05:00
committed by GitHub
parent 9ab08e6998
commit 037c01f4d7
14 changed files with 234 additions and 3 deletions

View File

@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["service.go"],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/archiver",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/db:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["service_test.go"],
embed = [":go_default_library"],
deps = [
"//beacon-chain/blockchain/testing:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)

View File

@@ -0,0 +1,75 @@
package archiver
import (
"context"
"fmt"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/sirupsen/logrus"
)
var log = logrus.WithField("prefix", "archiver")
// Service defining archiver functionality for persisting checkpointed
// beacon chain information to a database backend for historical purposes.
type Service struct {
ctx context.Context
cancel context.CancelFunc
beaconDB db.Database
newHeadNotifier blockchain.NewHeadNotifier
newHeadRootChan chan [32]byte
}
// Config options for the archiver service.
type Config struct {
BeaconDB db.Database
NewHeadNotifier blockchain.NewHeadNotifier
}
// NewArchiverService initializes the service from configuration options.
func NewArchiverService(ctx context.Context, cfg *Config) *Service {
ctx, cancel := context.WithCancel(ctx)
return &Service{
ctx: ctx,
cancel: cancel,
beaconDB: cfg.BeaconDB,
newHeadNotifier: cfg.NewHeadNotifier,
newHeadRootChan: make(chan [32]byte, 1),
}
}
// Start the archiver service event loop.
func (s *Service) Start() {
log.Info("Starting service")
go s.run()
}
// Stop the archiver service event loop.
func (s *Service) Stop() error {
defer s.cancel()
log.Info("Stopping service")
return nil
}
// Status reports the healthy status of the archiver. Returning nil means service
// is correctly running without error.
func (s *Service) Status() error {
return nil
}
func (s *Service) run() {
sub := s.newHeadNotifier.HeadUpdatedFeed().Subscribe(s.newHeadRootChan)
defer sub.Unsubscribe()
for {
select {
case h := <-s.newHeadRootChan:
log.WithField("headRoot", fmt.Sprintf("%#x", h)).Info("New chain head event")
case <-s.ctx.Done():
log.Info("Context closed, exiting goroutine")
return
case err := <-sub.Err():
log.WithError(err).Error("Subscription to new chain head notifier failed")
}
}
}

View File

@@ -0,0 +1,40 @@
package archiver
import (
"context"
"fmt"
"testing"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/shared/testutil"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestArchiverService_ReceivesNewChainHead(t *testing.T) {
hook := logTest.NewGlobal()
ctx, cancel := context.WithCancel(context.Background())
svc := &Service{
ctx: ctx,
cancel: cancel,
newHeadRootChan: make(chan [32]byte, 0),
newHeadNotifier: &mock.ChainService{},
}
exitRoutine := make(chan bool)
go func() {
svc.run()
<-exitRoutine
}()
svc.newHeadRootChan <- [32]byte{1, 2, 3}
if err := svc.Stop(); err != nil {
t.Fatal(err)
}
exitRoutine <- true
// The context should have been canceled.
if svc.ctx.Err() != context.Canceled {
t.Error("context was not canceled")
}
testutil.AssertLogsContain(t, hook, fmt.Sprintf("%#x", [32]byte{1, 2, 3}))
testutil.AssertLogsContain(t, hook, "New chain head event")
}

View File

@@ -100,6 +100,9 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.BeaconB
s.reportSlotMetrics(block.Slot)
processedBlkNoPubsub.Inc()
// We write the latest saved head root to a feed for consumption by other services.
s.headUpdatedFeed.Send(bytesutil.ToBytes32(headRoot))
return nil
}
@@ -134,6 +137,8 @@ func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *eth
// Reports on block and fork choice metrics.
s.reportSlotMetrics(block.Slot)
// We write the latest saved head root to a feed for consumption by other services.
s.headUpdatedFeed.Send(root)
processedBlkNoPubsubForkchoice.Inc()
return nil
}

View File

@@ -29,12 +29,18 @@ import (
"go.opencensus.io/trace"
)
// ChainFeeds interface defines the methods of the Service which provide
// information feeds.
// ChainFeeds interface defines the methods of the Service which provide state related
// information feeds to consumers.
type ChainFeeds interface {
StateInitializedFeed() *event.Feed
}
// NewHeadNotifier defines a struct which can notify many consumers of a new,
// canonical chain head event occuring in the node.
type NewHeadNotifier interface {
HeadUpdatedFeed() *event.Feed
}
// Service represents a service that handles the internal
// logic of managing the full PoS beacon chain.
type Service struct {
@@ -48,6 +54,7 @@ type Service struct {
chainStartChan chan time.Time
genesisTime time.Time
stateInitializedFeed *event.Feed
headUpdatedFeed *event.Feed
p2p p2p.Broadcaster
maxRoutines int64
headSlot uint64
@@ -83,6 +90,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
forkChoiceStore: store,
chainStartChan: make(chan time.Time),
stateInitializedFeed: new(event.Feed),
headUpdatedFeed: new(event.Feed),
p2p: cfg.P2p,
canonicalRoots: make(map[uint64][]byte),
maxRoutines: cfg.MaxRoutines,
@@ -190,6 +198,11 @@ func (s *Service) StateInitializedFeed() *event.Feed {
return s.stateInitializedFeed
}
// HeadUpdatedFeed is a feed that is written to when a new head block is saved to DB.
func (s *Service) HeadUpdatedFeed() *event.Feed {
return s.headUpdatedFeed
}
// This gets called to update canonical root mapping.
func (s *Service) saveHead(ctx context.Context, b *ethpb.BeaconBlock, r [32]byte) error {
s.headSlot = b.Slot

View File

@@ -34,6 +34,7 @@ import (
// Ensure Service implements interfaces.
var _ = ChainFeeds(&Service{})
var _ = NewHeadNotifier(&Service{})
func init() {
logrus.SetLevel(logrus.DebugLevel)

View File

@@ -82,3 +82,8 @@ func (ms *ChainService) StateInitializedFeed() *event.Feed {
ms.StateFeed = new(event.Feed)
return ms.StateFeed
}
// HeadUpdatedFeed mocks the same method in the chain service.
func (ms *ChainService) HeadUpdatedFeed() *event.Feed {
return new(event.Feed)
}

View File

@@ -3,7 +3,8 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"flags.go",
"archive.go",
"base.go",
"interop.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/flags",

View File

@@ -0,0 +1,32 @@
package flags
import (
"github.com/urfave/cli"
)
var (
// ArchiveEnableFlag defines whether or not the beacon chain should archive
// historical blocks, attestations, and validator set changes.
ArchiveEnableFlag = cli.BoolFlag{
Name: "archive",
Usage: "Whether or not beacon chain should archive historical data including blocks, attestations, and validator set changes",
}
// ArchiveValidatorSetChangesFlag defines whether or not the beacon chain should archive
// historical validator set changes in persistent storage.
ArchiveValidatorSetChangesFlag = cli.BoolFlag{
Name: "archive-validator-set-changes",
Usage: "Whether or not beacon chain should archive historical validator set changes",
}
// ArchiveBlocksFlag defines whether or not the beacon chain should archive
// historical block data in persistent storage.
ArchiveBlocksFlag = cli.BoolFlag{
Name: "archive-blocks",
Usage: "Whether or not beacon chain should archive historical blocks",
}
// ArchiveAttestationsFlag defines whether or not the beacon chain should archive
// historical attestation data in persistent storage.
ArchiveAttestationsFlag = cli.BoolFlag{
Name: "archive-attestations",
Usage: "Whether or not beacon chain should archive historical blocks",
}
)

View File

@@ -35,6 +35,10 @@ var appFlags = []cli.Flag{
flags.InteropGenesisStateFlag,
flags.InteropNumValidatorsFlag,
flags.InteropGenesisTimeFlag,
flags.ArchiveEnableFlag,
flags.ArchiveValidatorSetChangesFlag,
flags.ArchiveBlocksFlag,
flags.ArchiveAttestationsFlag,
cmd.BootstrapNode,
cmd.NoDiscovery,
cmd.StaticPeers,

View File

@@ -9,6 +9,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/node",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/archiver:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/cache/depositcache:go_default_library",
"//beacon-chain/db:go_default_library",

View File

@@ -17,6 +17,7 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
gethRPC "github.com/ethereum/go-ethereum/rpc"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/archiver"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
@@ -132,6 +133,10 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) {
return nil, err
}
if err := beacon.registerArchiverService(ctx); err != nil {
return nil, err
}
if !ctx.GlobalBool(cmd.DisableMonitoringFlag.Name) {
if err := beacon.registerPrometheusService(ctx); err != nil {
return nil, err
@@ -509,3 +514,19 @@ func (b *BeaconNode) registerInteropServices(ctx *cli.Context) error {
}
return nil
}
func (b *BeaconNode) registerArchiverService(ctx *cli.Context) error {
shouldArchive := ctx.GlobalBool(flags.ArchiveEnableFlag.Name)
if !shouldArchive {
return nil
}
var chainService *blockchain.Service
if err := b.services.FetchService(&chainService); err != nil {
return err
}
svc := archiver.NewArchiverService(context.Background(), &archiver.Config{
BeaconDB: b.db,
NewHeadNotifier: chainService,
})
return b.services.RegisterService(svc)
}

View File

@@ -118,6 +118,15 @@ var appHelpFlagGroups = []flagGroup{
flags.InteropNumValidatorsFlag,
},
},
{
Name: "archive",
Flags: []cli.Flag{
flags.ArchiveEnableFlag,
flags.ArchiveValidatorSetChangesFlag,
flags.ArchiveBlocksFlag,
flags.ArchiveAttestationsFlag,
},
},
}
func init() {