new prometheus metrics for client-stats metrics (#8834)

This commit is contained in:
kasey
2021-04-30 15:37:38 -05:00
committed by GitHub
parent 2fdfda2804
commit 7be6fc1780
18 changed files with 625 additions and 215 deletions

View File

@@ -14,6 +14,13 @@ func NewDB(ctx context.Context, dirPath string, config *kv.Config) (Database, er
return kv.NewKVStore(ctx, dirPath, config)
}
// NewDBFilename uses the KVStoreDatafilePath so that if this layer of
// indirection between db.NewDB->kv.NewKVStore ever changes, it will be easy to remember
// to also change this filename indirection at the same time.
func NewDBFilename(dirPath string) string {
return kv.KVStoreDatafilePath(dirPath)
}
// NewSlasherDB initializes a new DB for slasher.
func NewSlasherDB(ctx context.Context, dirPath string, config *slasherkv.Config) (SlasherDatabase, error) {
return slasherkv.NewKVStore(ctx, dirPath, config)

View File

@@ -64,6 +64,13 @@ type Store struct {
ctx context.Context
}
// KVStoreDatafilePath is the canonical construction of a full
// database file path from the directory path, so that code outside
// this package can find the full path in a consistent way.
func KVStoreDatafilePath(dirPath string) string {
return path.Join(dirPath, DatabaseFileName)
}
// NewKVStore initializes a new boltDB key-value store at the directory
// path specified, creates the kv-buckets based on the schema, and stores
// an open connection db object as a property of the Store struct.
@@ -77,7 +84,7 @@ func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, er
return nil, err
}
}
datafile := path.Join(dirPath, DatabaseFileName)
datafile := KVStoreDatafilePath(dirPath)
boltDB, err := bolt.Open(
datafile,
params.BeaconIoConfig().ReadWritePermissions,

View File

@@ -7,6 +7,7 @@ go_library(
"config.go",
"log.go",
"node.go",
"prometheus.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/node",
visibility = [
@@ -48,6 +49,7 @@ go_library(
"//shared/version:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",

View File

@@ -73,6 +73,7 @@ type BeaconNode struct {
opFeed *event.Feed
forkChoiceStore forkchoice.ForkChoicer
stateGen *stategen.State
collector *bcnodeCollector
}
// New creates a new node instance, sets up configuration options, and registers
@@ -162,6 +163,15 @@ func New(cliCtx *cli.Context) (*BeaconNode, error) {
}
}
// db.DatabasePath is the path to the containing directory
// db.NewDBFilename expands that to the canonical full path using
// the same constuction as NewDB()
c, err := newBeaconNodePromCollector(db.NewDBFilename(beacon.db.DatabasePath()))
if err != nil {
return nil, err
}
beacon.collector = c
return beacon, nil
}
@@ -224,6 +234,7 @@ func (b *BeaconNode) Close() {
if err := b.db.Close(); err != nil {
log.Errorf("Failed to close database: %v", err)
}
b.collector.unregister()
b.cancel()
close(b.stop)
}
@@ -435,15 +446,22 @@ func (b *BeaconNode) registerPOWChainService() error {
return err
}
cfg := &powchain.Web3ServiceConfig{
HttpEndpoints: endpoints,
DepositContract: common.HexToAddress(depAddress),
BeaconDB: b.db,
DepositCache: b.depositCache,
StateNotifier: b,
StateGen: b.stateGen,
Eth1HeaderReqLimit: b.cliCtx.Uint64(flags.Eth1HeaderReqLimit.Name),
bs, err := powchain.NewPowchainCollector(b.ctx)
if err != nil {
return err
}
cfg := &powchain.Web3ServiceConfig{
HttpEndpoints: endpoints,
DepositContract: common.HexToAddress(depAddress),
BeaconDB: b.db,
DepositCache: b.depositCache,
StateNotifier: b,
StateGen: b.stateGen,
Eth1HeaderReqLimit: b.cliCtx.Uint64(flags.Eth1HeaderReqLimit.Name),
BeaconNodeStatsUpdater: bs,
}
web3Service, err := powchain.NewService(b.ctx, cfg)
if err != nil {
return errors.Wrap(err, "could not register proof-of-work chain web3Service")

View File

@@ -0,0 +1,61 @@
package node
import (
"fmt"
"os"
"github.com/prometheus/client_golang/prometheus"
)
type bcnodeCollector struct {
DiskBeaconchainBytesTotal *prometheus.Desc
dbPath string
}
func newBeaconNodePromCollector(dbPath string) (*bcnodeCollector, error) {
namespace := "bcnode"
c := &bcnodeCollector{
DiskBeaconchainBytesTotal: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "disk_beaconchain_bytes_total"),
"Total hard disk space used by the beaconchain database, in bytes.",
nil,
nil,
),
dbPath: dbPath,
}
_, err := c.getCurrentDbBytes()
if err != nil {
return nil, err
}
return c, prometheus.Register(c)
}
func (bc *bcnodeCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- bc.DiskBeaconchainBytesTotal
}
func (bc *bcnodeCollector) Collect(ch chan<- prometheus.Metric) {
dbBytes, err := bc.getCurrentDbBytes()
if err != nil {
log.Warn(err)
return
}
ch <- prometheus.MustNewConstMetric(
bc.DiskBeaconchainBytesTotal,
prometheus.GaugeValue,
dbBytes,
)
}
func (bc *bcnodeCollector) getCurrentDbBytes() (float64, error) {
fs, err := os.Stat(bc.dbPath)
if err != nil {
return 0, fmt.Errorf("could not collect database file size for prometheus, path=%s, err=%s", bc.dbPath, err)
}
return float64(fs.Size()), nil
}
func (bc *bcnodeCollector) unregister() {
prometheus.Unregister(bc)
}

View File

@@ -9,6 +9,7 @@ go_library(
"deposit.go",
"log.go",
"log_processing.go",
"prometheus.go",
"provider.go",
"service.go",
],
@@ -32,6 +33,7 @@ go_library(
"//contracts/deposit-contract:go_default_library",
"//proto/beacon/db:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/clientstats:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/httputils:go_default_library",
"//shared/httputils/authorizationmethod:go_default_library",
@@ -67,6 +69,7 @@ go_test(
"init_test.go",
"log_processing_test.go",
"powchain_test.go",
"prometheus_test.go",
"provider_test.go",
"service_test.go",
],
@@ -84,6 +87,7 @@ go_test(
"//proto/beacon/db:go_default_library",
"//shared/bls:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/clientstats:go_default_library",
"//shared/event:go_default_library",
"//shared/httputils:go_default_library",
"//shared/httputils/authorizationmethod:go_default_library",
@@ -98,6 +102,7 @@ go_test(
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_ethereum_go_ethereum//trie:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",

View File

@@ -0,0 +1,153 @@
package powchain
import (
"context"
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/prysmaticlabs/prysm/shared/clientstats"
)
type BeaconNodeStatsUpdater interface {
Update(stats clientstats.BeaconNodeStats)
}
type PowchainCollector struct {
SyncEth1Connected *prometheus.Desc
SyncEth1FallbackConnected *prometheus.Desc
SyncEth1FallbackConfigured *prometheus.Desc // true if flag specified: --fallback-web3provider
updateChan chan clientstats.BeaconNodeStats
latestStats clientstats.BeaconNodeStats
sync.Mutex
ctx context.Context
finishChan chan struct{}
}
var _ BeaconNodeStatsUpdater = &PowchainCollector{}
var _ prometheus.Collector = &PowchainCollector{}
// Update satisfies the BeaconNodeStatsUpdater
func (pc *PowchainCollector) Update(update clientstats.BeaconNodeStats) {
pc.updateChan <- update
}
// Describe is invoked by the prometheus collection loop.
// It returns a set of metric Descriptor references which
// are also used in Collect to group collected metrics into
// a family. Describe and Collect together satisfy the
// prometheus.Collector interface.
func (pc *PowchainCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- pc.SyncEth1Connected
ch <- pc.SyncEth1FallbackConfigured
ch <- pc.SyncEth1FallbackConnected
}
// Collect is invoked by the prometheus collection loop.
// It returns a set of Metrics representing the observation
// for the current collection period. In the case of this
// collector, we use values from the latest BeaconNodeStats
// value sent by the powchain Service, which updates this value
// whenever an internal event could change the state of one of
// the metrics.
// Describe and Collect together satisfy the
// prometheus.Collector interface.
func (pc *PowchainCollector) Collect(ch chan<- prometheus.Metric) {
bs := pc.getLatestStats()
var syncEth1FallbackConfigured float64 = 0
if bs.SyncEth1FallbackConfigured {
syncEth1FallbackConfigured = 1
}
ch <- prometheus.MustNewConstMetric(
pc.SyncEth1FallbackConfigured,
prometheus.GaugeValue,
syncEth1FallbackConfigured,
)
var syncEth1FallbackConnected float64 = 0
if bs.SyncEth1FallbackConnected {
syncEth1FallbackConnected = 1
}
ch <- prometheus.MustNewConstMetric(
pc.SyncEth1FallbackConnected,
prometheus.GaugeValue,
syncEth1FallbackConnected,
)
var syncEth1Connected float64 = 0
if bs.SyncEth1Connected {
syncEth1Connected = 1
}
ch <- prometheus.MustNewConstMetric(
pc.SyncEth1Connected,
prometheus.GaugeValue,
syncEth1Connected,
)
}
func (pc *PowchainCollector) getLatestStats() clientstats.BeaconNodeStats {
pc.Lock()
defer pc.Unlock()
return pc.latestStats
}
func (pc *PowchainCollector) setLatestStats(bs clientstats.BeaconNodeStats) {
pc.Lock()
pc.latestStats = bs
pc.Unlock()
}
// unregister returns true if the prometheus DefaultRegistry
// confirms that it was removed.
func (pc *PowchainCollector) unregister() bool {
return prometheus.Unregister(pc)
}
func (pc *PowchainCollector) latestStatsUpdateLoop() {
for {
select {
case <-pc.ctx.Done():
pc.unregister()
pc.finishChan <- struct{}{}
return
case bs := <-pc.updateChan:
pc.setLatestStats(bs)
}
}
}
func NewPowchainCollector(ctx context.Context) (*PowchainCollector, error) {
namespace := "powchain"
updateChan := make(chan clientstats.BeaconNodeStats, 2)
c := &PowchainCollector{
SyncEth1FallbackConfigured: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "sync_eth1_fallback_configured"),
"Boolean recording whether a fallback eth1 endpoint was configured: 0=false, 1=true.",
nil,
nil,
),
SyncEth1FallbackConnected: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "sync_eth1_fallback_connected"),
"Boolean indicating whether a fallback eth1 endpoint is currently connected: 0=false, 1=true.",
nil,
nil,
),
SyncEth1Connected: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "sync_eth1_connected"),
"Boolean indicating whether a fallback eth1 endpoint is currently connected: 0=false, 1=true.",
nil,
nil,
),
updateChan: updateChan,
ctx: ctx,
finishChan: make(chan struct{}, 1),
}
go c.latestStatsUpdateLoop()
return c, prometheus.Register(c)
}
type NopBeaconNodeStatsUpdater struct{}
func (nop *NopBeaconNodeStatsUpdater) Update(stats clientstats.BeaconNodeStats) {}
var _ BeaconNodeStatsUpdater = &NopBeaconNodeStatsUpdater{}

View File

@@ -0,0 +1,53 @@
package powchain
import (
"context"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
)
// TestCleanup ensures that the cleanup function unregisters the prometheus.Collection
// also tests the interchangability of the explicit prometheus Register/Unregister
// and the implicit methods within the collector implementation
func TestCleanup(t *testing.T) {
ctx := context.Background()
pc, err := NewPowchainCollector(ctx)
assert.NoError(t, err, "Uxpected error caling NewPowchainCollector")
unregistered := pc.unregister()
assert.Equal(t, true, unregistered, "PowchainCollector.unregister did not return true (via prometheus.DefaultRegistry)")
// PowchainCollector is a prometheus.Collector, so we should be able to register it again
err = prometheus.Register(pc)
assert.NoError(t, err, "Got error from prometheus.Register after unregistering PowchainCollector")
// even if it somehow gets registered somewhere else, unregister should work
unregistered = pc.unregister()
assert.Equal(t, true, unregistered, "PowchainCollector.unregister failed on the second attempt")
// and so we should be able to register it again
err = prometheus.Register(pc)
assert.NoError(t, err, "Got error from prometheus.Register on the second attempt")
// ok clean it up one last time for real :)
unregistered = prometheus.Unregister(pc)
assert.Equal(t, true, unregistered, "prometheus.Unregister failed to unregister PowchainCollector on final cleanup")
}
// TestCancelation tests that canceling the context passed into
// NewPowchainCollector cleans everything up as expected. This
// does come at the cost of an extra channel cluttering up
// PowchainCollector, just for this test.
func TestCancelation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
pc, err := NewPowchainCollector(ctx)
assert.NoError(t, err, "Uxpected error caling NewPowchainCollector")
ticker := time.NewTicker(10 * time.Second)
cancel()
select {
case <-ticker.C:
t.Error("Hit timeout waiting for cancel() to cleanup PowchainCollector")
case <-pc.finishChan:
break
}
err = prometheus.Register(pc)
assert.NoError(t, err, "Got error from prometheus.Register after unregistering PowchainCollector through canceled context")
}

View File

@@ -36,6 +36,7 @@ import (
contracts "github.com/prysmaticlabs/prysm/contracts/deposit-contract"
protodb "github.com/prysmaticlabs/prysm/proto/beacon/db"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/clientstats"
"github.com/prysmaticlabs/prysm/shared/httputils"
"github.com/prysmaticlabs/prysm/shared/httputils/authorizationmethod"
"github.com/prysmaticlabs/prysm/shared/logutil"
@@ -146,17 +147,19 @@ type Service struct {
lastReceivedMerkleIndex int64 // Keeps track of the last received index to prevent log spam.
runError error
preGenesisState iface.BeaconState
bsUpdater BeaconNodeStatsUpdater
}
// Web3ServiceConfig defines a config struct for web3 service to use through its life cycle.
type Web3ServiceConfig struct {
HttpEndpoints []string
DepositContract common.Address
BeaconDB db.HeadAccessDatabase
DepositCache *depositcache.DepositCache
StateNotifier statefeed.Notifier
StateGen *stategen.State
Eth1HeaderReqLimit uint64
HttpEndpoints []string
DepositContract common.Address
BeaconDB db.HeadAccessDatabase
DepositCache *depositcache.DepositCache
StateNotifier statefeed.Notifier
StateGen *stategen.State
Eth1HeaderReqLimit uint64
BeaconNodeStatsUpdater BeaconNodeStatsUpdater
}
// NewService sets up a new instance with an ethclient when
@@ -210,6 +213,12 @@ func NewService(ctx context.Context, config *Web3ServiceConfig) (*Service, error
lastReceivedMerkleIndex: -1,
preGenesisState: genState,
headTicker: time.NewTicker(time.Duration(params.BeaconConfig().SecondsPerETH1Block) * time.Second),
// use the nop updater by default, rely on upstream set up to pass in an appropriate impl
bsUpdater: config.BeaconNodeStatsUpdater,
}
if config.BeaconNodeStatsUpdater == nil {
s.bsUpdater = &NopBeaconNodeStatsUpdater{}
}
if err := s.ensureValidPowchainData(ctx); err != nil {
@@ -223,6 +232,7 @@ func NewService(ctx context.Context, config *Web3ServiceConfig) (*Service, error
if err := s.initializeEth1Data(ctx, eth1Data); err != nil {
return nil, err
}
return s, nil
}
@@ -302,6 +312,31 @@ func (s *Service) Status() error {
return nil
}
func (s *Service) updateBeaconnodeStats() {
bs := clientstats.BeaconNodeStats{}
if len(s.httpEndpoints) > 1 {
bs.SyncEth1FallbackConfigured = true
}
if s.IsConnectedToETH1() {
if s.primaryConnected() {
bs.SyncEth1Connected = true
} else {
bs.SyncEth1FallbackConnected = true
}
}
s.bsUpdater.Update(bs)
}
func (s *Service) updateCurrHttpEndpoint(endpoint httputils.Endpoint) {
s.currHttpEndpoint = endpoint
s.updateBeaconnodeStats()
}
func (s *Service) updateConnectedETH1(state bool) {
s.connectedETH1 = state
s.updateBeaconnodeStats()
}
// IsConnectedToETH1 checks if the beacon node is connected to a ETH1 Node.
func (s *Service) IsConnectedToETH1() bool {
return s.connectedETH1
@@ -455,7 +490,7 @@ func (s *Service) waitForConnection() {
synced, errSynced := s.isEth1NodeSynced()
// Resume if eth1 node is synced.
if synced {
s.connectedETH1 = true
s.updateConnectedETH1(true)
s.runError = nil
log.WithFields(logrus.Fields{
"endpoint": logutil.MaskCredentialsLogging(s.currHttpEndpoint.Url),
@@ -503,7 +538,7 @@ func (s *Service) waitForConnection() {
continue
}
if synced {
s.connectedETH1 = true
s.updateConnectedETH1(true)
s.runError = nil
log.WithFields(logrus.Fields{
"endpoint": logutil.MaskCredentialsLogging(s.currHttpEndpoint.Url),
@@ -539,7 +574,7 @@ func (s *Service) isEth1NodeSynced() (bool, error) {
// Reconnect to eth1 node in case of any failure.
func (s *Service) retryETH1Node(err error) {
s.runError = err
s.connectedETH1 = false
s.updateConnectedETH1(false)
// Back off for a while before
// resuming dialing the eth1 node.
time.Sleep(backOffPeriod)
@@ -765,7 +800,7 @@ func (s *Service) run(done <-chan struct{}) {
case <-done:
s.isRunning = false
s.runError = nil
s.connectedETH1 = false
s.updateConnectedETH1(false)
log.Debug("Context closed, exiting goroutine")
return
case <-s.headTicker.C:
@@ -895,7 +930,7 @@ func (s *Service) checkDefaultEndpoint() {
// Switch back to primary endpoint and try connecting
// to it again.
s.currHttpEndpoint = primaryEndpoint
s.updateCurrHttpEndpoint(primaryEndpoint)
s.retryETH1Node(nil)
}
@@ -916,12 +951,10 @@ func (s *Service) fallbackToNextEndpoint() {
if nextIndex >= totalEndpoints {
nextIndex = 0
}
// Exit early if we have the same index.
if nextIndex == currIndex {
return
if nextIndex != currIndex {
log.Infof("Falling back to alternative endpoint: %s", logutil.MaskCredentialsLogging(s.currHttpEndpoint.Url))
}
s.currHttpEndpoint = s.httpEndpoints[nextIndex]
log.Infof("Falling back to alternative endpoint: %s", logutil.MaskCredentialsLogging(s.currHttpEndpoint.Url))
s.updateCurrHttpEndpoint(s.httpEndpoints[nextIndex])
}
// initializes our service from the provided eth1data object by initializing all the relevant
@@ -1032,3 +1065,7 @@ func eth1HeadIsBehind(timestamp uint64) bool {
// check that web3 client is syncing
return time.Unix(int64(timestamp), 0).Before(timeout)
}
func (s *Service) primaryConnected() bool {
return s.currHttpEndpoint.Equals(s.httpEndpoints[0])
}

View File

@@ -21,6 +21,7 @@ import (
mockPOW "github.com/prysmaticlabs/prysm/beacon-chain/powchain/testing"
contracts "github.com/prysmaticlabs/prysm/contracts/deposit-contract"
protodb "github.com/prysmaticlabs/prysm/proto/beacon/db"
"github.com/prysmaticlabs/prysm/shared/clientstats"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/httputils"
"github.com/prysmaticlabs/prysm/shared/params"
@@ -535,6 +536,16 @@ func TestNewService_Eth1HeaderRequLimit(t *testing.T) {
assert.Equal(t, uint64(150), s2.cfg.Eth1HeaderReqLimit, "unable to set eth1HeaderRequestLimit")
}
type mockBSUpdater struct {
lastBS clientstats.BeaconNodeStats
}
func (mbs *mockBSUpdater) Update(bs clientstats.BeaconNodeStats) {
mbs.lastBS = bs
}
var _ BeaconNodeStatsUpdater = &mockBSUpdater{}
func TestServiceFallbackCorrectly(t *testing.T) {
firstEndpoint := "A"
secondEndpoint := "B"
@@ -543,22 +554,27 @@ func TestServiceFallbackCorrectly(t *testing.T) {
require.NoError(t, err, "Unable to set up simulated backend")
beaconDB := dbutil.SetupDB(t)
mbs := &mockBSUpdater{}
s1, err := NewService(context.Background(), &Web3ServiceConfig{
HttpEndpoints: []string{firstEndpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
HttpEndpoints: []string{firstEndpoint},
DepositContract: testAcc.ContractAddr,
BeaconDB: beaconDB,
BeaconNodeStatsUpdater: mbs,
})
s1.bsUpdater = mbs
require.NoError(t, err)
assert.Equal(t, firstEndpoint, s1.currHttpEndpoint.Url, "Unexpected http endpoint")
// Stay at the first endpoint.
s1.fallbackToNextEndpoint()
assert.Equal(t, firstEndpoint, s1.currHttpEndpoint.Url, "Unexpected http endpoint")
assert.Equal(t, false, mbs.lastBS.SyncEth1FallbackConfigured, "SyncEth1FallbackConfigured in clientstats update should be false when only 1 endpoint is configured")
s1.httpEndpoints = append(s1.httpEndpoints, httputils.Endpoint{Url: secondEndpoint})
s1.fallbackToNextEndpoint()
assert.Equal(t, secondEndpoint, s1.currHttpEndpoint.Url, "Unexpected http endpoint")
assert.Equal(t, true, mbs.lastBS.SyncEth1FallbackConfigured, "SyncEth1FallbackConfigured in clientstats update should be true when > 1 endpoint is configured")
thirdEndpoint := "C"
fourthEndpoint := "D"