Move Shared/ Subpackages Into Monitoring/ Folder (#9591)

* add tracing

* monitoring pkg

* move prom

* Add client stats

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Raul Jordan
2021-09-14 15:59:51 -05:00
committed by GitHub
parent 5b37deb1a6
commit 9935ca3733
102 changed files with 270 additions and 281 deletions

View File

@@ -0,0 +1,9 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["http_backup_handler.go"],
importpath = "github.com/prysmaticlabs/prysm/monitoring/backup",
visibility = ["//visibility:public"],
deps = ["@com_github_sirupsen_logrus//:go_default_library"],
)

View File

@@ -0,0 +1,36 @@
package backup
import (
"context"
"fmt"
"net/http"
"github.com/sirupsen/logrus"
)
// BackupExporter defines a backup exporter methods.
type BackupExporter interface {
Backup(ctx context.Context, outputPath string, permissionOverride bool) error
}
// BackupHandler for accepting requests to initiate a new database backup.
func BackupHandler(bk BackupExporter, outputDir string) func(http.ResponseWriter, *http.Request) {
log := logrus.WithField("prefix", "db")
return func(w http.ResponseWriter, r *http.Request) {
log.Debug("Creating database backup from HTTP webhook")
_, permissionOverride := r.URL.Query()["permissionOverride"]
if err := bk.Backup(context.Background(), outputDir, permissionOverride); err != nil {
log.WithError(err).Error("Failed to create backup")
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
_, err := fmt.Fprint(w, "OK")
if err != nil {
log.WithError(err).Error("Failed to write OK")
}
}
}

View File

@@ -0,0 +1,30 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"interfaces.go",
"scrapers.go",
"types.go",
"updaters.go",
],
importpath = "github.com/prysmaticlabs/prysm/monitoring/clientstats",
visibility = ["//visibility:public"],
deps = [
"//proto/prysm/v1alpha1:go_default_library",
"@com_github_prometheus_client_model//go:go_default_library",
"@com_github_prometheus_prom2json//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["scrapers_test.go"],
embed = [":go_default_library"],
deps = [
"//shared/testutil/require:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)

View File

@@ -0,0 +1,121 @@
# Client stats reporting
## Specification
The request JSON object is a non-nested object with the following properties. The process refers to which process data is associated with the request.
|Property |Type |Process |Source |Description |
|-----------------------------------|-------------|---------------------|-------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|version |int |Every request |hard-coded |Stats data specification version, current only `1` is accepted |
|timestamp |long |Every request |system time |Unix timestamp in milliseconds |
|process |string (enum)|Every request |hard-coded / cli flag |Enum values: validator, beaconnode, system |
|cpu_process_seconds_total |long |beaconnode, validator|prom: process_cpu_seconds_total |CPU seconds consumed by the process |
|memory_process_bytes |long |beaconnode, validator|prom: process_resident_memory_bytes |Number of bytes allocated to the process |
|client_name |string |beaconnode, validator|hard-coded to "prysm" |Name of the client type. Ex: prysm, lighthouse, nimbus, teku |
|client_version |string |beaconnode, validator|prom: prysm_version (label: version) |Client version. Ex: 1.0.0-beta.0 |
|client_build |int |beaconnode, validator|prom: prysm_version (label: buildDate) |Integer representation of build for easier comparison |
|disk_beaconchain_bytes_total |long |beaconchain |prom: bcnode_disk_beaconchain_bytes_total |The amount of data consumed on disk by the beacon chain's database. |
|network_libp2p_bytes_total_receive |long |beaconchain |(currently unsupported) |The number of bytes received via libp2p traffic |
|network_libp2p_bytes_total_transmit|long |beaconchain |(currently unsupported) |The number of bytes transmitted via libp2p traffic |
|network_peers_connected |int |beaconchain |(currently unsupported) |The number of peers currently connected to the beacon chain |
|sync_eth1_connected |bool |beaconchain |prom: powchain_sync_eth1_connected |Whether or not the beacon chain node is connected to a _synced_ eth1 node |
|sync_eth2_synced |bool |beaconchain |prom: beacon_clock_time_slot (true if this equals prom: beacon_head_slot)|Whether or not the beacon chain node is in sync with the beacon chain network |
|sync_beacon_head_slot |long |beaconchain |prom: beacon_head_slot |The head slot number. |
|sync_eth1_fallback_configured |bool |beaconchain |prom: powchain_sync_eth1_fallback_configured |Whether or not the beacon chain node has a fallback eth1 endpoint configured. |
|sync_eth1_fallback_connected |bool |beaconchain |prom: powchain_sync_eth1_fallback_connected |Whether or not the beacon chain node is connected to a fallback eth1 endpoint. A true value indicates a failed or interrupted connection with the primary eth1 endpoint.|
|slasher_active |bool |beaconchain |(coming soon) |Whether or not slasher functionality is enabled. |
|sync_eth2_fallback_configured |bool |validator |(currently unsupported) |Whether or not the process has a fallback eth2 endpoint configured |
|sync_eth2_fallback_connected |bool |validator |(currently unsupported) |Weather or not the process has connected to the failover eth2 endpoint. A true value indicates a failed or interrupted connection with the primary eth2 endpoint. |
|validator_total |int |validator |prom: validator_statuses (count of all peers) |The number of validating keys in use. |
|validator_active |int |validator |prom: validator_statuses (count of peers w/ "ACTIVE" status label) |The number of validator keys that are currently active. |
|cpu_cores |int |system |(currently unsupported) |The number of CPU cores available on the host machine |
|cpu_threads |int |system |(currently unsupported) |The number of CPU threads available on the host machine |
|cpu_node_system_seconds_total |long |system |(currently unsupported) |Overall CPU seconds observed on the host machine for all processes. |
|cpu_node_user_seconds_total |long |system |(currently unsupported) |?? |
|cpu_node_iowait_seconds_total |long |system |(currently unsupported) |?? |
|cpu_node_idle_seconds_total |long |system |(currently unsupported) |?? |
|memory_node_bytes_total |long |system |(currently unsupported) |?? |
|memory_node_bytes_free |long |system |(currently unsupported) |?? |
|memory_node_bytes_cached |long |system |(currently unsupported) |?? |
|memory_node_bytes_bufferd |long |system |(currently unsupported) |?? |
|disk_node_bytes_total |long |system |(currently unsupported) |?? |
|disk_node_bytes_free |long |system |(currently unsupported) |?? |
|disk_node_io_seconds |long |system |(currently unsupported) |?? |
|disk_node_reads_total |long |system |(currently unsupported) |?? |
|disk_node_writes_total |long |system |(currently unsupported) |?? |
|network_node_bytes_total_receive |long |system |(currently unsupported) |?? |
|network_node_bytes_total_transmit |long |system |(currently unsupported) |?? |
|misc_node_boot_ts_system |long |system |(currently unsupported) |?? |
|misc_os |string |system |(currently unsupported) |Enum values: lin, win, mac, unk |
The client stats reporter will submit a request object for each process type. The report request may
submit a list of data or a single JSON object.
### Examples
POST https://beaconcha.in/api/v1/stats/$API_KEY/$MACHINE_NAME
**Single object payload**
```json
{
"version": 1,
"timestamp": 11234567,
"process": "validator",
"cpu_process_seconds_total": 1234567,
"memory_process_bytes": 654321,
"client_name": "lighthouse",
"client_version": "1.1.2",
"client_build": 12,
"sync_eth2_fallback_configured": false,
"sync_eth2_fallback_connected": false,
"validator_total": 3,
"validator_active": 2
}
```
**Multiple object payload**
```json
[
{
"version":1,
"timestamp":1618835497239,
"process":"beaconnode",
"cpu_process_seconds_total":6925,
"memory_process_bytes":1175138304,
"client_name":"lighthouse",
"client_version":"1.1.3",
"client_build":42,
"sync_eth2_fallback_configured":false,
"sync_eth2_fallback_connected":false,
"validator_active":1,
"validator_total":1
},
{
"version":1,
"timestamp":1618835497258,
"process":"system",
"cpu_cores":4,
"cpu_threads":8,
"cpu_node_system_seconds_total":1953818,
"cpu_node_user_seconds_total":229215,
"cpu_node_iowait_seconds_total":3761,
"cpu_node_idle_seconds_total":1688619,
"memory_node_bytes_total":33237434368,
"memory_node_bytes_free":500150272,
"memory_node_bytes_cached":13904945152,
"memory_node_bytes_buffers":517832704,
"disk_node_bytes_total":250436972544,
"disk_node_bytes_free":124707479552,
"disk_node_io_seconds":0,
"disk_node_reads_total":3362272,
"disk_node_writes_total":47766864,
"network_node_bytes_total_receive":26546324572,
"network_node_bytes_total_transmit":12057786467,
"misc_node_boot_ts_seconds":1617707420,
"misc_os":"unk"
}
]
```

View File

@@ -0,0 +1,19 @@
package clientstats
import "io"
// A Scraper polls the data source it has been configured with
// and interprets the content to produce a client-stats process
// metric. Scrapers currently exist to produce 'validator' and
// 'beaconnode' metric types.
type Scraper interface {
Scrape() (io.Reader, error)
}
// An Updater can take the io.Reader created by Scraper and
// send it to a data sink for consumption. An Updater is used
// for instance ot send the scraped data for a beacon-node to
// a remote client-stats endpoint.
type Updater interface {
Update(io.Reader) error
}

View File

@@ -0,0 +1,288 @@
package clientstats
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"time"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/prom2json"
eth "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
log "github.com/sirupsen/logrus"
)
type beaconNodeScraper struct {
url string
tripper http.RoundTripper
}
func (bc *beaconNodeScraper) Scrape() (io.Reader, error) {
log.Infof("Scraping beacon-node at %s", bc.url)
pf, err := scrapeProm(bc.url, bc.tripper)
if err != nil {
return nil, err
}
bs := populateBeaconNodeStats(pf)
b, err := json.Marshal(bs)
return bytes.NewBuffer(b), err
}
// NewBeaconNodeScraper constructs a Scaper capable of scraping
// the prometheus endpoint of a beacon-node process and producing
// the json body for the beaconnode client-stats process type.
func NewBeaconNodeScraper(promExpoURL string) Scraper {
return &beaconNodeScraper{
url: promExpoURL,
}
}
type validatorScraper struct {
url string
tripper http.RoundTripper
}
func (vc *validatorScraper) Scrape() (io.Reader, error) {
log.Infof("Scraping validator at %s", vc.url)
pf, err := scrapeProm(vc.url, vc.tripper)
if err != nil {
return nil, err
}
vs := populateValidatorStats(pf)
b, err := json.Marshal(vs)
return bytes.NewBuffer(b), err
}
// NewValidatorScraper constructs a Scaper capable of scraping
// the prometheus endpoint of a validator process and producing
// the json body for the validator client-stats process type.
func NewValidatorScraper(promExpoURL string) Scraper {
return &validatorScraper{
url: promExpoURL,
}
}
// note on tripper -- under the hood FetchMetricFamilies constructs an http.Client,
// which, if transport is nil, will just use the DefaultTransport, so we
// really only bother specifying the transport in tests, otherwise we let
// the zero-value (which is nil) flow through so that the default transport
// will be used.
func scrapeProm(url string, tripper http.RoundTripper) (map[string]*dto.MetricFamily, error) {
mfChan := make(chan *dto.MetricFamily)
errChan := make(chan error)
go func() {
// FetchMetricFamilies handles grpc flavored prometheus ez
// but at the cost of the awkward channel select loop below
err := prom2json.FetchMetricFamilies(url, mfChan, tripper)
if err != nil {
errChan <- err
}
}()
result := make(map[string]*dto.MetricFamily)
// channel select accumulates results from FetchMetricFamilies
// unless there is an error.
for {
select {
case fam, chanOpen := <-mfChan:
// FetchMetricFamiles will close the channel when done
// at which point we want to stop the goroutine
if fam == nil && !chanOpen {
return result, nil
}
ptr := fam
result[fam.GetName()] = ptr
case err := <-errChan:
return result, err
}
if errChan == nil && mfChan == nil {
return result, nil
}
}
}
type metricMap map[string]*dto.MetricFamily
func (mm metricMap) getFamily(name string) (*dto.MetricFamily, error) {
f, ok := mm[name]
if !ok {
return nil, fmt.Errorf("scraper did not find metric family %s", name)
}
return f, nil
}
var now = time.Now // var hook for tests to overwrite
var nanosPerMilli = int64(time.Millisecond) / int64(time.Nanosecond)
func populateAPIMessage(processName string) APIMessage {
return APIMessage{
Timestamp: now().UnixNano() / nanosPerMilli,
APIVersion: APIVersion,
ProcessName: processName,
}
}
func populateCommonStats(pf metricMap) CommonStats {
cs := CommonStats{}
cs.ClientName = ClientName
var f *dto.MetricFamily
var m *dto.Metric
var err error
f, err = pf.getFamily("process_cpu_seconds_total")
if err != nil {
log.WithError(err).Debug("Failed to get process_cpu_seconds_total")
} else {
m = f.Metric[0]
// float64->int64: truncates fractional seconds
cs.CPUProcessSecondsTotal = int64(m.Counter.GetValue())
}
f, err = pf.getFamily("process_resident_memory_bytes")
if err != nil {
log.WithError(err).Debug("Failed to get process_resident_memory_bytes")
} else {
m = f.Metric[0]
cs.MemoryProcessBytes = int64(m.Gauge.GetValue())
}
f, err = pf.getFamily("prysm_version")
if err != nil {
log.WithError(err).Debug("Failed to get prysm_version")
} else {
m = f.Metric[0]
for _, l := range m.GetLabel() {
switch l.GetName() {
case "version":
cs.ClientVersion = l.GetValue()
case "buildDate":
buildDate, err := strconv.Atoi(l.GetValue())
if err != nil {
log.WithError(err).Debug("Failed to retrieve buildDate label from the prysm_version metric")
continue
}
cs.ClientBuild = int64(buildDate)
}
}
}
return cs
}
func populateBeaconNodeStats(pf metricMap) BeaconNodeStats {
var err error
bs := BeaconNodeStats{}
bs.CommonStats = populateCommonStats(pf)
bs.APIMessage = populateAPIMessage(BeaconNodeProcessName)
var f *dto.MetricFamily
var m *dto.Metric
f, err = pf.getFamily("beacon_head_slot")
if err != nil {
log.WithError(err).Debug("Failed to get beacon_head_slot")
} else {
m = f.Metric[0]
bs.SyncBeaconHeadSlot = int64(m.Gauge.GetValue())
}
f, err = pf.getFamily("beacon_clock_time_slot")
if err != nil {
log.WithError(err).Debug("Failed to get beacon_clock_time_slot")
} else {
m = f.Metric[0]
if int64(m.Gauge.GetValue()) == bs.SyncBeaconHeadSlot {
bs.SyncEth2Synced = true
}
}
f, err = pf.getFamily("bcnode_disk_beaconchain_bytes_total")
if err != nil {
log.WithError(err).Debug("Failed to get bcnode_disk_beaconchain_bytes_total")
} else {
m = f.Metric[0]
bs.DiskBeaconchainBytesTotal = int64(m.Gauge.GetValue())
}
f, err = pf.getFamily("p2p_peer_count")
if err != nil {
log.WithError(err).Debug("Failed to get p2p_peer_count")
} else {
for _, m := range f.Metric {
for _, l := range m.GetLabel() {
if l.GetName() == "state" {
if l.GetValue() == "Connected" {
bs.NetworkPeersConnected = int64(m.Gauge.GetValue())
}
}
}
}
}
f, err = pf.getFamily("powchain_sync_eth1_connected")
if err != nil {
log.WithError(err).Debug("Failed to get powchain_sync_eth1_connected")
} else {
m = f.Metric[0]
bs.SyncEth1Connected = false
if int64(m.Gauge.GetValue()) == 1 {
bs.SyncEth1Connected = true
}
}
f, err = pf.getFamily("powchain_sync_eth1_fallback_configured")
if err != nil {
log.WithError(err).Debug("Failed to get powchain_sync_eth1_fallback_configured")
} else {
m = f.Metric[0]
bs.SyncEth1FallbackConfigured = false
if int64(m.Gauge.GetValue()) == 1 {
bs.SyncEth1FallbackConfigured = true
}
}
f, err = pf.getFamily("powchain_sync_eth1_fallback_connected")
if err != nil {
log.WithError(err).Debug("Failed to get powchain_sync_eth1_fallback_connected")
} else {
m = f.Metric[0]
bs.SyncEth1FallbackConnected = false
if int64(m.Gauge.GetValue()) == 1 {
bs.SyncEth1FallbackConnected = true
}
}
return bs
}
func statusIsActive(statusCode int64) bool {
s := eth.ValidatorStatus(statusCode)
return s.String() == "ACTIVE"
}
func populateValidatorStats(pf metricMap) ValidatorStats {
var err error
vs := ValidatorStats{}
vs.CommonStats = populateCommonStats(pf)
vs.APIMessage = populateAPIMessage(ValidatorProcessName)
f, err := pf.getFamily("validator_statuses")
if err != nil {
log.WithError(err).Debug("Failed to get validator_statuses")
} else {
for _, m := range f.Metric {
if statusIsActive(int64(m.Gauge.GetValue())) {
vs.ValidatorActive += 1
}
vs.ValidatorTotal += 1
}
}
return vs
}

View File

@@ -0,0 +1,296 @@
package clientstats
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"testing"
"time"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
"github.com/sirupsen/logrus"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func init() {
logrus.SetLevel(logrus.DebugLevel)
}
type mockRT struct {
body string
status string
statusCode int
}
func (rt *mockRT) RoundTrip(req *http.Request) (*http.Response, error) {
return &http.Response{
Status: http.StatusText(http.StatusOK),
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader(rt.body)),
}, nil
}
var _ http.RoundTripper = &mockRT{}
func TestBeaconNodeScraper(t *testing.T) {
bnScraper := beaconNodeScraper{}
bnScraper.tripper = &mockRT{body: prometheusTestBody}
r, err := bnScraper.Scrape()
require.NoError(t, err, "Unexpected error calling beaconNodeScraper.Scrape")
bs := &BeaconNodeStats{}
err = json.NewDecoder(r).Decode(bs)
require.NoError(t, err, "Unexpected error decoding result of beaconNodeScraper.Scrape")
// CommonStats
require.Equal(t, int64(225), bs.CPUProcessSecondsTotal)
require.Equal(t, int64(1166630912), bs.MemoryProcessBytes)
require.Equal(t, int64(1619586241), bs.ClientBuild)
require.Equal(t, "v1.3.8-hotfix+6c0942", bs.ClientVersion)
require.Equal(t, "prysm", bs.ClientName)
// BeaconNodeStats
require.Equal(t, int64(256552), bs.SyncBeaconHeadSlot)
require.Equal(t, true, bs.SyncEth2Synced)
require.Equal(t, int64(7365341184), bs.DiskBeaconchainBytesTotal)
require.Equal(t, int64(37), bs.NetworkPeersConnected)
require.Equal(t, true, bs.SyncEth1Connected)
require.Equal(t, true, bs.SyncEth1FallbackConfigured)
require.Equal(t, true, bs.SyncEth1FallbackConnected)
}
// helper function to wrap up all the scrape logic so tests can focus on data cases and assertions
func scrapeBeaconNodeStats(body string) (*BeaconNodeStats, error) {
if !strings.HasSuffix(body, "\n") {
return nil, fmt.Errorf("bad test fixture -- make sure there is a trailing newline unless you want to waste time debugging tests")
}
bnScraper := beaconNodeScraper{}
bnScraper.tripper = &mockRT{body: body}
r, err := bnScraper.Scrape()
if err != nil {
return nil, err
}
bs := &BeaconNodeStats{}
err = json.NewDecoder(r).Decode(bs)
return bs, err
}
func TestInvertEth1Metrics(t *testing.T) {
cases := []struct {
key string
body string
test func(*BeaconNodeStats) bool
}{
{
key: "SyncEth1Connected",
body: strings.Replace(prometheusTestBody, "powchain_sync_eth1_connected 1", "powchain_sync_eth1_connected 0", 1),
test: func(bs *BeaconNodeStats) bool {
return bs.SyncEth1Connected == false && bs.SyncEth1FallbackConfigured == true && bs.SyncEth1FallbackConnected == true
},
},
{
key: "SyncEth1FallbackConfigured",
body: strings.Replace(prometheusTestBody, "powchain_sync_eth1_fallback_configured 1", "powchain_sync_eth1_fallback_configured 0", 1),
test: func(bs *BeaconNodeStats) bool {
return bs.SyncEth1Connected == true && bs.SyncEth1FallbackConfigured == false && bs.SyncEth1FallbackConnected == true
},
},
{
key: "SyncEth1FallbackConnected",
body: strings.Replace(prometheusTestBody, "powchain_sync_eth1_fallback_connected 1", "powchain_sync_eth1_fallback_connected 0", 1),
test: func(bs *BeaconNodeStats) bool {
return bs.SyncEth1Connected == true && bs.SyncEth1FallbackConfigured == true && bs.SyncEth1FallbackConnected == false
},
},
}
for _, c := range cases {
bs, err := scrapeBeaconNodeStats(c.body)
require.NoError(t, err)
require.Equal(t, true, c.test(bs), "BeaconNodeStats.%s was not false, with prometheus body=%s", c.key, c.body)
}
}
func TestFalseEth2Synced(t *testing.T) {
bnScraper := beaconNodeScraper{}
eth2NotSynced := strings.Replace(prometheusTestBody, "beacon_head_slot 256552", "beacon_head_slot 256559", 1)
bnScraper.tripper = &mockRT{body: eth2NotSynced}
r, err := bnScraper.Scrape()
require.NoError(t, err, "Unexpected error calling beaconNodeScraper.Scrape")
bs := &BeaconNodeStats{}
err = json.NewDecoder(r).Decode(bs)
require.NoError(t, err, "Unexpected error decoding result of beaconNodeScraper.Scrape")
require.Equal(t, false, bs.SyncEth2Synced)
}
func TestValidatorScraper(t *testing.T) {
vScraper := validatorScraper{}
vScraper.tripper = &mockRT{body: statusFixtureOneOfEach + prometheusTestBody}
r, err := vScraper.Scrape()
require.NoError(t, err, "Unexpected error calling validatorScraper.Scrape")
vs := &ValidatorStats{}
err = json.NewDecoder(r).Decode(vs)
require.NoError(t, err, "Unexpected error decoding result of validatorScraper.Scrape")
// CommonStats
require.Equal(t, int64(225), vs.CPUProcessSecondsTotal)
require.Equal(t, int64(1166630912), vs.MemoryProcessBytes)
require.Equal(t, int64(1619586241), vs.ClientBuild)
require.Equal(t, "v1.3.8-hotfix+6c0942", vs.ClientVersion)
require.Equal(t, "prysm", vs.ClientName)
require.Equal(t, int64(7), vs.ValidatorTotal)
require.Equal(t, int64(1), vs.ValidatorActive)
}
func TestValidatorScraperAllActive(t *testing.T) {
vScraper := validatorScraper{}
vScraper.tripper = &mockRT{body: statusFixtureAllActive + prometheusTestBody}
r, err := vScraper.Scrape()
require.NoError(t, err, "Unexpected error calling validatorScraper.Scrape")
vs := &ValidatorStats{}
err = json.NewDecoder(r).Decode(vs)
require.NoError(t, err, "Unexpected error decoding result of validatorScraper.Scrape")
// CommonStats
require.Equal(t, int64(4), vs.ValidatorTotal)
require.Equal(t, int64(4), vs.ValidatorActive)
}
func TestValidatorScraperNoneActive(t *testing.T) {
vScraper := validatorScraper{}
vScraper.tripper = &mockRT{body: statusFixtureNoneActive + prometheusTestBody}
r, err := vScraper.Scrape()
require.NoError(t, err, "Unexpected error calling validatorScraper.Scrape")
vs := &ValidatorStats{}
err = json.NewDecoder(r).Decode(vs)
require.NoError(t, err, "Unexpected error decoding result of validatorScraper.Scrape")
// CommonStats
require.Equal(t, int64(6), vs.ValidatorTotal)
require.Equal(t, int64(0), vs.ValidatorActive)
}
func mockNowFunc(fixedTime time.Time) func() time.Time {
return func() time.Time {
return fixedTime
}
}
func TestValidatorAPIMessageDefaults(t *testing.T) {
now = mockNowFunc(time.Unix(1619811114, 123456789))
// 1+e6 ns per ms, so 123456789 ns rounded down should be 123 ms
nowMillis := int64(1619811114123)
vScraper := validatorScraper{}
vScraper.tripper = &mockRT{body: statusFixtureOneOfEach + prometheusTestBody}
r, err := vScraper.Scrape()
require.NoError(t, err, "unexpected error from validatorScraper.Scrape()")
vs := &ValidatorStats{}
err = json.NewDecoder(r).Decode(vs)
require.NoError(t, err, "Unexpected error decoding result of validatorScraper.Scrape")
// CommonStats
require.Equal(t, nowMillis, vs.Timestamp, "Unexpected 'timestamp' in client-stats APIMessage struct")
require.Equal(t, APIVersion, vs.APIVersion, "Unexpected 'version' in client-stats APIMessage struct")
require.Equal(t, ValidatorProcessName, vs.ProcessName, "Unexpected value for 'process' in client-stats APIMessage struct")
}
func TestBeaconNodeAPIMessageDefaults(t *testing.T) {
now = mockNowFunc(time.Unix(1619811114, 123456789))
// 1+e6 ns per ms, so 123456789 ns rounded down should be 123 ms
nowMillis := int64(1619811114123)
bScraper := beaconNodeScraper{}
bScraper.tripper = &mockRT{body: prometheusTestBody}
r, err := bScraper.Scrape()
require.NoError(t, err, "unexpected error from beaconNodeScraper.Scrape()")
vs := &BeaconNodeStats{}
err = json.NewDecoder(r).Decode(vs)
require.NoError(t, err, "Unexpected error decoding result of beaconNodeScraper.Scrape")
// CommonStats
require.Equal(t, nowMillis, vs.Timestamp, "Unexpected 'timestamp' in client-stats APIMessage struct")
require.Equal(t, APIVersion, vs.APIVersion, "Unexpected 'version' in client-stats APIMessage struct")
require.Equal(t, BeaconNodeProcessName, vs.ProcessName, "Unexpected value for 'process' in client-stats APIMessage struct")
}
func TestBadInput(t *testing.T) {
hook := logTest.NewGlobal()
bnScraper := beaconNodeScraper{}
bnScraper.tripper = &mockRT{body: ""}
_, err := bnScraper.Scrape()
require.NoError(t, err)
require.LogsContain(t, hook, "Failed to get prysm_version")
}
var prometheusTestBody = `
# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds.
# TYPE process_cpu_seconds_total counter
process_cpu_seconds_total 225.09
# HELP process_resident_memory_bytes Resident memory size in bytes.
# TYPE process_resident_memory_bytes gauge
process_resident_memory_bytes 1.166630912e+09
# HELP prysm_version
# TYPE prysm_version gauge
prysm_version{buildDate="1619586241",commit="51eb1540fa838cdbe467bbeb0e36ee667d449377",version="v1.3.8-hotfix+6c0942"} 1
# HELP validator_count The total number of validators
# TYPE validator_count gauge
validator_count{state="Active"} 210301
validator_count{state="Exited"} 10
validator_count{state="Exiting"} 0
validator_count{state="Pending"} 0
validator_count{state="Slashed"} 0
validator_count{state="Slashing"} 0
# HELP beacon_head_slot Slot of the head block of the beacon chain
# TYPE beacon_head_slot gauge
beacon_head_slot 256552
# HELP beacon_clock_time_slot The current slot based on the genesis time and current clock
# TYPE beacon_clock_time_slot gauge
beacon_clock_time_slot 256552
# HELP bcnode_disk_beaconchain_bytes_total Total hard disk space used by the beaconchain database, in bytes. May include mmap.
# TYPE bcnode_disk_beaconchain_bytes_total gauge
bcnode_disk_beaconchain_bytes_total 7.365341184e+09
# HELP p2p_peer_count The number of peers in a given state.
# TYPE p2p_peer_count gauge
p2p_peer_count{state="Bad"} 1
p2p_peer_count{state="Connected"} 37
p2p_peer_count{state="Connecting"} 0
p2p_peer_count{state="Disconnected"} 62
p2p_peer_count{state="Disconnecting"} 0
# HELP powchain_sync_eth1_connected Boolean indicating whether a fallback eth1 endpoint is currently connected: 0=false, 1=true.
# TYPE powchain_sync_eth1_connected gauge
powchain_sync_eth1_connected 1
# HELP powchain_sync_eth1_fallback_configured Boolean recording whether a fallback eth1 endpoint was configured: 0=false, 1=true.
# TYPE powchain_sync_eth1_fallback_configured gauge
powchain_sync_eth1_fallback_configured 1
# HELP powchain_sync_eth1_fallback_connected Boolean indicating whether a fallback eth1 endpoint is currently connected: 0=false, 1=true.
# TYPE powchain_sync_eth1_fallback_connected gauge
powchain_sync_eth1_fallback_connected 1
`
var statusFixtureOneOfEach = `# HELP validator_statuses validator statuses: 0 UNKNOWN, 1 DEPOSITED, 2 PENDING, 3 ACTIVE, 4 EXITING, 5 SLASHING, 6 EXITED
# TYPE validator_statuses gauge
validator_statuses{pubkey="pk0"} 0
validator_statuses{pubkey="pk1"} 1
validator_statuses{pubkey="pk2"} 2
validator_statuses{pubkey="pk3"} 3
validator_statuses{pubkey="pk4"} 4
validator_statuses{pubkey="pk5"} 5
validator_statuses{pubkey="pk6"} 6
`
var statusFixtureAllActive = `# HELP validator_statuses validator statuses: 0 UNKNOWN, 1 DEPOSITED, 2 PENDING, 3 ACTIVE, 4 EXITING, 5 SLASHING, 6 EXITED
# TYPE validator_statuses gauge
validator_statuses{pubkey="pk0"} 3
validator_statuses{pubkey="pk1"} 3
validator_statuses{pubkey="pk2"} 3
validator_statuses{pubkey="pk3"} 3
`
var statusFixtureNoneActive = `# HELP validator_statuses validator statuses: 0 UNKNOWN, 1 DEPOSITED, 2 PENDING, 3 ACTIVE, 4 EXITING, 5 SLASHING, 6 EXITED
# TYPE validator_statuses gauge
validator_statuses{pubkey="pk0"} 0
validator_statuses{pubkey="pk1"} 1
validator_statuses{pubkey="pk2"} 2
validator_statuses{pubkey="pk3"} 4
validator_statuses{pubkey="pk4"} 5
validator_statuses{pubkey="pk5"} 6
`

View File

@@ -0,0 +1,80 @@
package clientstats
const (
ClientName = "prysm"
BeaconNodeProcessName = "beaconnode"
ValidatorProcessName = "validator"
APIVersion = 1
)
// APIMessage are common to all requests to the client-stats API
// Note that there is a "system" type that we do not currently
// support -- if we did APIMessage would be present on the system
// messages as well as validator and beaconnode, whereas
// CommonStats would only be part of beaconnode and validator.
type APIMessage struct {
APIVersion int `json:"version"`
Timestamp int64 `json:"timestamp"` // unix timestamp in milliseconds
ProcessName string `json:"process"` // validator, beaconnode, system
}
// CommonStats represent generic metrics that are expected on both
// beaconnode and validator metric types. This type is used for
// marshaling metrics to the POST body sent to the metrics collcetor.
// Note that some metrics are labeled NA because they are expected
// to be present with their zero-value when not supported by a client.
type CommonStats struct {
CPUProcessSecondsTotal int64 `json:"cpu_process_seconds_total"`
MemoryProcessBytes int64 `json:"memory_process_bytes"`
ClientName string `json:"client_name"`
ClientVersion string `json:"client_version"`
ClientBuild int64 `json:"client_build"`
// TODO(#8849): parse the grpc connection string to determine
// if multiple addresses are present
SyncEth2FallbackConfigured bool `json:"sync_eth2_fallback_configured"`
// N/A -- when multiple addresses are provided to grpc, requests are
// load-balanced between the provided endpoints.
// This is different from a "fallback" configuration where
// the second address is treated as a failover.
SyncEth2FallbackConnected bool `json:"sync_eth2_fallback_connected"`
APIMessage `json:",inline"`
}
// BeaconNodeStats embeds CommonStats and represents metrics specific to
// the beacon-node process. This type is used to marshal metrics data
// to the POST body sent to the metrics collcetor. To make the connection
// to client-stats clear, BeaconNodeStats is also used by prometheus
// collection code introduced to support client-stats.
// Note that some metrics are labeled NA because they are expected
// to be present with their zero-value when not supported by a client.
type BeaconNodeStats struct {
// TODO(#8850): add support for this after slasher refactor is merged
SlasherActive bool `json:"slasher_active"`
SyncEth1FallbackConfigured bool `json:"sync_eth1_fallback_configured"`
SyncEth1FallbackConnected bool `json:"sync_eth1_fallback_connected"`
SyncEth1Connected bool `json:"sync_eth1_connected"`
SyncEth2Synced bool `json:"sync_eth2_synced"`
DiskBeaconchainBytesTotal int64 `json:"disk_beaconchain_bytes_total"`
// N/A -- would require significant network code changes at this time
NetworkLibp2pBytesTotalReceive int64 `json:"network_libp2p_bytes_total_receive"`
// N/A -- would require significant network code changes at this time
NetworkLibp2pBytesTotalTransmit int64 `json:"network_libp2p_bytes_total_transmit"`
// p2p_peer_count where label "state" == "Connected"
NetworkPeersConnected int64 `json:"network_peers_connected"`
// beacon_head_slot
SyncBeaconHeadSlot int64 `json:"sync_beacon_head_slot"`
CommonStats `json:",inline"`
}
// ValidatorStats embeds CommonStats and represents metrics specific to
// the validator process. This type is used to marshal metrics data
// to the POST body sent to the metrics collcetor.
// Note that some metrics are labeled NA because they are expected
// to be present with their zero-value when not supported by a client.
type ValidatorStats struct {
// N/A -- TODO(#8848): verify whether we can obtain this metric from the validator process
ValidatorTotal int64 `json:"validator_total"`
// N/A -- TODO(#8848): verify whether we can obtain this metric from the validator process
ValidatorActive int64 `json:"validator_active"`
CommonStats `json:",inline"`
}

View File

@@ -0,0 +1,58 @@
package clientstats
import (
"bytes"
"fmt"
"io"
"net/http"
)
type genericWriter struct {
io.Writer
}
func (gw *genericWriter) Update(r io.Reader) error {
_, err := io.Copy(gw, r)
return err
}
// NewGenericClientStatsUpdater can Update any io.Writer.
// It is used by the cli to write to stdout when an http endpoint
// is not provided. The output could be piped into another program
// or used for debugging.
func NewGenericClientStatsUpdater(w io.Writer) Updater {
return &genericWriter{w}
}
type httpPoster struct {
url string
client *http.Client
}
func (gw *httpPoster) Update(r io.Reader) error {
resp, err := gw.client.Post(gw.url, "application/json", r)
if err != nil {
return err
}
defer func() {
if err := resp.Body.Close(); err != nil {
return
}
}()
if resp.StatusCode != http.StatusOK {
buf := new(bytes.Buffer)
_, err = io.Copy(buf, resp.Body)
if err != nil {
return fmt.Errorf("error reading response body for non-200 response status code (%d), err=%s", resp.StatusCode, err)
}
return fmt.Errorf("non-200 response status code (%d). response body=%s", resp.StatusCode, buf.String())
}
return nil
}
// NewClientStatsHTTPPostUpdater is used when the update endpoint
// is reachable via an HTTP POST request.
func NewClientStatsHTTPPostUpdater(u string) Updater {
return &httpPoster{url: u, client: http.DefaultClient}
}

View File

@@ -0,0 +1,20 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"journald.go",
"journald_linux.go",
],
importpath = "github.com/prysmaticlabs/prysm/monitoring/journald",
visibility = ["//visibility:public"],
deps = select({
"@io_bazel_rules_go//go/platform:android": [
"@com_github_wercker_journalhook//:go_default_library",
],
"@io_bazel_rules_go//go/platform:linux": [
"@com_github_wercker_journalhook//:go_default_library",
],
"//conditions:default": [],
}),
)

View File

@@ -0,0 +1,12 @@
// +build !linux
package journald
import (
"fmt"
)
// Enable returns an error on non-Linux systems
func Enable() error {
return fmt.Errorf("journald is not supported in this platform")
}

View File

@@ -0,0 +1,13 @@
// +build linux
package journald
import (
"github.com/wercker/journalhook"
)
//Enable enables the journald logrus hook
func Enable() error {
journalhook.Enable()
return nil
}

View File

@@ -0,0 +1,12 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["progress.go"],
importpath = "github.com/prysmaticlabs/prysm/monitoring/progress",
visibility = ["//visibility:public"],
deps = [
"@com_github_k0kubun_go_ansi//:go_default_library",
"@com_github_schollz_progressbar_v3//:go_default_library",
],
)

View File

@@ -0,0 +1,27 @@
package progress
import (
"fmt"
"github.com/k0kubun/go-ansi"
"github.com/schollz/progressbar/v3"
)
// InitializeProgressBar standard for use in Prysm.
func InitializeProgressBar(numItems int, msg string) *progressbar.ProgressBar {
return progressbar.NewOptions(
numItems,
progressbar.OptionFullWidth(),
progressbar.OptionSetWriter(ansi.NewAnsiStdout()),
progressbar.OptionEnableColorCodes(true),
progressbar.OptionSetTheme(progressbar.Theme{
Saucer: "[green]=[reset]",
SaucerHead: "[green]>[reset]",
SaucerPadding: " ",
BarStart: "[",
BarEnd: "]",
}),
progressbar.OptionOnCompletion(func() { fmt.Println() }),
progressbar.OptionSetDescription(msg),
)
}

View File

@@ -0,0 +1,37 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"content_negotiation.go",
"logrus_collector.go",
"service.go",
"simple_server.go",
],
importpath = "github.com/prysmaticlabs/prysm/monitoring/prometheus",
visibility = ["//visibility:public"],
deps = [
"//shared:go_default_library",
"@com_github_golang_gddo//httputil:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promhttp:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
go_test(
name = "go_default_test",
size = "small",
srcs = [
"logrus_collector_test.go",
"service_test.go",
],
embed = [":go_default_library"],
deps = [
"//shared:go_default_library",
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -0,0 +1,46 @@
# How to monitor with prometheus
## Prerequisites:
- [Prometheus](https://prometheus.io/docs/prometheus/latest/getting_started/) (Instal to scrap metrics and start to monitor)
- (optional) [Grafana](https://grafana.com/grafana/download) (For better graphs)
- (optional) [Setup prometheus+grafana](https://prometheus.io/docs/visualization/grafana/)
## Start scrapping services
To start scrapping with prometheus you must create or edit the prometheus config file and add all the services you want to scrap, like these:
```diff
global:
scrape_interval: 15s # By default, scrape targets every 15 seconds.
# Attach these labels to any time series or alerts when communicating with
# external systems (federation, remote storage, Alertmanager).
external_labels:
monitor: 'codelab-monitor'
# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'prometheus'
# Override the global default and scrape targets from this job every 5 seconds.
scrape_interval: 5s
static_configs:
- targets: ['localhost:9090']
+ - job_name: 'beacon-chain'
+ static_configs:
+ - targets: ['localhost:8080']
```
After creating/updating the prometheus file run it:
```sh
$ prometheus --config.file=your-prometheus-file.yml
```
Now, you can add the prometheus server as a data source on grafana and start building your dashboards.
## How to add additional metrics
The prometheus service export the metrics from the `DefaultRegisterer` so just need to register your metrics with the `prometheus` or `promauto` libraries.
To know more [Go application guide](https://prometheus.io/docs/guides/go-application/)

View File

@@ -0,0 +1,53 @@
package prometheus
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"github.com/golang/gddo/httputil"
)
const (
contentTypePlainText = "text/plain"
contentTypeJSON = "application/json"
)
// generatedResponse is a container for response output.
type generatedResponse struct {
// Err is protocol error, if any.
Err string `json:"error"`
// Data is response output, if any.
Data interface{} `json:"data"`
}
// negotiateContentType parses "Accept:" header and returns preferred content type string.
func negotiateContentType(r *http.Request) string {
contentTypes := []string{
contentTypePlainText,
contentTypeJSON,
}
return httputil.NegotiateContentType(r, contentTypes, contentTypePlainText)
}
// writeResponse is content-type aware response writer.
func writeResponse(w http.ResponseWriter, r *http.Request, response generatedResponse) error {
switch negotiateContentType(r) {
case contentTypePlainText:
buf, ok := response.Data.(bytes.Buffer)
if !ok {
return fmt.Errorf("unexpected data: %v", response.Data)
}
if _, err := w.Write(buf.Bytes()); err != nil {
return fmt.Errorf("could not write response body: %w", err)
}
case contentTypeJSON:
w.Header().Set("Content-Type", contentTypeJSON)
if err := json.NewEncoder(w).Encode(response); err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,51 @@
package prometheus
import (
"errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sirupsen/logrus"
)
// LogrusCollector is a logrus hook to collect log counters.
type LogrusCollector struct {
counterVec *prometheus.CounterVec
}
var (
supportedLevels = []logrus.Level{logrus.InfoLevel, logrus.WarnLevel, logrus.ErrorLevel}
counterVec = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "log_entries_total",
Help: "Total number of log messages.",
}, []string{"level", "prefix"})
)
const prefixKey = "prefix"
const defaultprefix = "global"
// NewLogrusCollector register internal metrics and return an logrus hook to collect log counters
// This function can be called only once, if more than one call is made an error will be returned.
func NewLogrusCollector() *LogrusCollector {
return &LogrusCollector{
counterVec: counterVec,
}
}
// Fire is called on every log call.
func (hook *LogrusCollector) Fire(entry *logrus.Entry) error {
prefix := defaultprefix
if prefixValue, ok := entry.Data[prefixKey]; ok {
prefix, ok = prefixValue.(string)
if !ok {
return errors.New("prefix is not a string")
}
}
hook.counterVec.WithLabelValues(entry.Level.String(), prefix).Inc()
return nil
}
// Levels return a slice of levels supported by this hook;
func (hook *LogrusCollector) Levels() []logrus.Level {
return supportedLevels
}

View File

@@ -0,0 +1,108 @@
package prometheus_test
import (
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"testing"
"time"
"github.com/prysmaticlabs/prysm/monitoring/prometheus"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
log "github.com/sirupsen/logrus"
)
const addr = "127.0.0.1:8989"
type logger interface {
Info(args ...interface{})
Warn(args ...interface{})
Error(args ...interface{})
}
func TestLogrusCollector(t *testing.T) {
service := prometheus.NewService(addr, nil)
hook := prometheus.NewLogrusCollector()
log.AddHook(hook)
go service.Start()
defer func() {
err := service.Stop()
require.NoError(t, err)
}()
tests := []struct {
name string
want int
count int
prefix string
level log.Level
}{
{"info message with empty prefix", 3, 3, "", log.InfoLevel},
{"warn message with empty prefix", 2, 2, "", log.WarnLevel},
{"error message with empty prefix", 1, 1, "", log.ErrorLevel},
{"error message with prefix", 1, 1, "foo", log.ErrorLevel},
{"info message with prefix", 3, 3, "foo", log.InfoLevel},
{"warn message with prefix", 2, 2, "foo", log.WarnLevel},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
prefix := "global"
for i := 0; i < tt.count; i++ {
if tt.prefix != "" {
prefix = tt.prefix
subLog := log.WithField("prefix", tt.prefix)
logExampleMessage(subLog, tt.level)
continue
}
logExampleMessage(log.StandardLogger(), tt.level)
}
time.Sleep(time.Millisecond)
metrics := metrics(t)
count := valueFor(t, metrics, prefix, tt.level)
if count != tt.want {
t.Errorf("Expecting %d and receive %d", tt.want, count)
}
})
}
}
func metrics(t *testing.T) []string {
resp, err := http.Get(fmt.Sprintf("http://%s/metrics", addr))
require.NoError(t, err)
body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
return strings.Split(string(body), "\n")
}
func valueFor(t *testing.T, metrics []string, prefix string, level log.Level) int {
// Expect line with this pattern:
// # HELP log_entries_total Total number of log messages.
// # TYPE log_entries_total counter
// log_entries_total{level="error",prefix="empty"} 1
pattern := fmt.Sprintf("log_entries_total{level=\"%s\",prefix=\"%s\"}", level, prefix)
for _, line := range metrics {
if strings.HasPrefix(line, pattern) {
parts := strings.Split(line, " ")
count, err := strconv.ParseFloat(parts[1], 64)
assert.NoError(t, err)
return int(count)
}
}
t.Errorf("Pattern \"%s\" not found", pattern)
return 0
}
func logExampleMessage(logger logger, level log.Level) {
switch level {
case log.InfoLevel:
logger.Info("Info message")
case log.WarnLevel:
logger.Warn("Warning message!")
case log.ErrorLevel:
logger.Error("Error message!!")
}
}

View File

@@ -0,0 +1,162 @@
// Package prometheus defines a service which is used for metrics collection
// and health of a node in Prysm.
package prometheus
import (
"bytes"
"context"
"fmt"
"net"
"net/http"
"runtime/debug"
"runtime/pprof"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prysmaticlabs/prysm/shared"
"github.com/sirupsen/logrus"
)
var log = logrus.WithField("prefix", "prometheus")
// Service provides Prometheus metrics via the /metrics route. This route will
// show all the metrics registered with the Prometheus DefaultRegisterer.
type Service struct {
server *http.Server
svcRegistry *shared.ServiceRegistry
failStatus error
}
// Handler represents a path and handler func to serve on the same port as /metrics, /healthz, /goroutinez, etc.
type Handler struct {
Path string
Handler func(http.ResponseWriter, *http.Request)
}
// NewService sets up a new instance for a given address host:port.
// An empty host will match with any IP so an address like ":2121" is perfectly acceptable.
func NewService(addr string, svcRegistry *shared.ServiceRegistry, additionalHandlers ...Handler) *Service {
s := &Service{svcRegistry: svcRegistry}
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{
MaxRequestsInFlight: 5,
Timeout: 30 * time.Second,
}))
mux.HandleFunc("/healthz", s.healthzHandler)
mux.HandleFunc("/goroutinez", s.goroutinezHandler)
// Register additional handlers.
for _, h := range additionalHandlers {
mux.HandleFunc(h.Path, h.Handler)
}
s.server = &http.Server{Addr: addr, Handler: mux}
return s
}
func (s *Service) healthzHandler(w http.ResponseWriter, r *http.Request) {
response := generatedResponse{}
type serviceStatus struct {
Name string `json:"service"`
Status bool `json:"status"`
Err string `json:"error"`
}
var hasError bool
var statuses []serviceStatus
for k, v := range s.svcRegistry.Statuses() {
s := serviceStatus{
Name: k.String(),
Status: true,
}
if v != nil {
s.Status = false
s.Err = v.Error()
if s.Err != "" {
hasError = true
}
}
statuses = append(statuses, s)
}
response.Data = statuses
if hasError {
w.WriteHeader(http.StatusServiceUnavailable)
} else {
w.WriteHeader(http.StatusOK)
}
// Handle plain text content.
if contentType := negotiateContentType(r); contentType == contentTypePlainText {
var buf bytes.Buffer
for _, s := range statuses {
var status string
if s.Status {
status = "OK"
} else {
status = "ERROR, " + s.Err
}
if _, err := buf.WriteString(fmt.Sprintf("%s: %s\n", s.Name, status)); err != nil {
response.Err = err.Error()
break
}
}
response.Data = buf
}
if err := writeResponse(w, r, response); err != nil {
log.Errorf("Error writing response: %v", err)
}
}
func (s *Service) goroutinezHandler(w http.ResponseWriter, _ *http.Request) {
stack := debug.Stack()
if _, err := w.Write(stack); err != nil {
log.WithError(err).Error("Failed to write goroutines stack")
}
if err := pprof.Lookup("goroutine").WriteTo(w, 2); err != nil {
log.WithError(err).Error("Failed to write pprof goroutines")
}
}
// Start the prometheus service.
func (s *Service) Start() {
go func() {
// See if the port is already used.
conn, err := net.DialTimeout("tcp", s.server.Addr, time.Second)
if err == nil {
if err := conn.Close(); err != nil {
log.WithError(err).Error("Failed to close connection")
}
// Something on the port; we cannot use it.
log.WithField("address", s.server.Addr).Warn("Port already in use; cannot start prometheus service")
} else {
// Nothing on that port; we can use it.
log.WithField("address", s.server.Addr).Debug("Starting prometheus service")
err := s.server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
log.Errorf("Could not listen to host:port :%s: %v", s.server.Addr, err)
s.failStatus = err
}
}
}()
}
// Stop the service gracefully.
func (s *Service) Stop() error {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
return s.server.Shutdown(ctx)
}
// Status checks for any service failure conditions.
func (s *Service) Status() error {
if s.failStatus != nil {
return s.failStatus
}
return nil
}

View File

@@ -0,0 +1,174 @@
package prometheus
import (
"errors"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
"github.com/sirupsen/logrus"
)
func init() {
logrus.SetLevel(logrus.DebugLevel)
logrus.SetOutput(ioutil.Discard)
}
func TestLifecycle(t *testing.T) {
prometheusService := NewService(":2112", nil)
prometheusService.Start()
// Give service time to start.
time.Sleep(time.Second)
// Query the service to ensure it really started.
resp, err := http.Get("http://localhost:2112/metrics")
require.NoError(t, err)
assert.NotEqual(t, uint64(0), resp.ContentLength, "Unexpected content length 0")
err = prometheusService.Stop()
require.NoError(t, err)
// Give service time to stop.
time.Sleep(time.Second)
// Query the service to ensure it really stopped.
_, err = http.Get("http://localhost:2112/metrics")
assert.NotNil(t, err, "Service still running after Stop()")
}
type mockService struct {
status error
}
func (m *mockService) Start() {
}
func (m *mockService) Stop() error {
return nil
}
func (m *mockService) Status() error {
return m.status
}
func TestHealthz(t *testing.T) {
registry := shared.NewServiceRegistry()
m := &mockService{}
require.NoError(t, registry.RegisterService(m), "Failed to register service")
s := NewService("" /*addr*/, registry)
req, err := http.NewRequest("GET", "/healthz", nil /*reader*/)
require.NoError(t, err)
handler := http.HandlerFunc(s.healthzHandler)
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)
if status := rr.Code; status != http.StatusOK {
t.Errorf("expected OK status but got %v", rr.Code)
}
body := rr.Body.String()
if !strings.Contains(body, "*prometheus.mockService: OK") {
t.Errorf("Expected body to contain mockService status, but got %v", body)
}
m.status = errors.New("something really bad has happened")
rr = httptest.NewRecorder()
handler.ServeHTTP(rr, req)
if status := rr.Code; status != http.StatusServiceUnavailable {
t.Errorf("expected StatusServiceUnavailable status but got %v", rr.Code)
}
body = rr.Body.String()
if !strings.Contains(
body,
"*prometheus.mockService: ERROR, something really bad has happened",
) {
t.Errorf("Expected body to contain mockService status, but got %v", body)
}
}
func TestStatus(t *testing.T) {
failError := errors.New("failure")
s := &Service{failStatus: failError}
if err := s.Status(); err != s.failStatus {
t.Errorf("Wanted: %v, got: %v", s.failStatus, s.Status())
}
}
func TestContentNegotiation(t *testing.T) {
t.Run("/healthz all services are ok", func(t *testing.T) {
registry := shared.NewServiceRegistry()
m := &mockService{}
require.NoError(t, registry.RegisterService(m), "Failed to register service")
s := NewService("", registry)
req, err := http.NewRequest("GET", "/healthz", nil /* body */)
require.NoError(t, err)
handler := http.HandlerFunc(s.healthzHandler)
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)
body := rr.Body.String()
if !strings.Contains(body, "*prometheus.mockService: OK") {
t.Errorf("Expected body to contain mockService status, but got %q", body)
}
// Request response as JSON.
req.Header.Add("Accept", "application/json, */*;q=0.5")
rr = httptest.NewRecorder()
handler.ServeHTTP(rr, req)
body = rr.Body.String()
expectedJSON := "{\"error\":\"\",\"data\":[{\"service\":\"*prometheus.mockService\",\"status\":true,\"error\":\"\"}]}"
if !strings.Contains(body, expectedJSON) {
t.Errorf("Unexpected data, want: %q got %q", expectedJSON, body)
}
})
t.Run("/healthz failed service", func(t *testing.T) {
registry := shared.NewServiceRegistry()
m := &mockService{}
m.status = errors.New("something is wrong")
require.NoError(t, registry.RegisterService(m), "Failed to register service")
s := NewService("", registry)
req, err := http.NewRequest("GET", "/healthz", nil /* body */)
require.NoError(t, err)
handler := http.HandlerFunc(s.healthzHandler)
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)
body := rr.Body.String()
if !strings.Contains(body, "*prometheus.mockService: ERROR, something is wrong") {
t.Errorf("Expected body to contain mockService status, but got %q", body)
}
// Request response as JSON.
req.Header.Add("Accept", "application/json, */*;q=0.5")
rr = httptest.NewRecorder()
handler.ServeHTTP(rr, req)
body = rr.Body.String()
expectedJSON := "{\"error\":\"\",\"data\":[{\"service\":\"*prometheus.mockService\",\"status\":false,\"error\":\"something is wrong\"}]}"
if !strings.Contains(body, expectedJSON) {
t.Errorf("Unexpected data, want: %q got %q", expectedJSON, body)
}
if rr.Code < 500 {
t.Errorf("Expected a server error response code, but got %d", rr.Code)
}
})
}

View File

@@ -0,0 +1,17 @@
package prometheus
import (
"net/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// RunSimpleServerOrDie is a blocking call to serve /metrics at the given
// address.
func RunSimpleServerOrDie(addr string) {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
svr := &http.Server{Addr: addr, Handler: mux}
log.Fatal(svr.ListenAndServe())
}

View File

@@ -0,0 +1,18 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"errors.go",
"recovery_interceptor_option.go",
"tracer.go",
],
importpath = "github.com/prysmaticlabs/prysm/monitoring/tracing",
visibility = ["//visibility:public"],
deps = [
"//shared/version:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@io_opencensus_go_contrib_exporter_jaeger//:go_default_library",
],
)

View File

@@ -0,0 +1,48 @@
#### How to view collected traces
##### Prerequisites:
- [Docker](https://www.docker.com/get-started) (For Jaeger image)
- [Go](https://golang.org/) 1.11+ (For execution traces collected by pprof)
##### Using Jaeger
Tracing is disabled by default, to enable, you can use the option `--enable-tracing`.
Jaeger endpoint can be configured with the `--tracing-endpoint` option and defaults to `http://127.0.0.1:14268`.
Run Jaeger:
```sh
$ docker run -d --name jaeger -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 -p 5775:5775/udp -p 6831:6831/udp -p 6832:6832/udp -p 5778:5778 -p 16686:16686 -p 14268:14268 -p 9411:9411 jaegertracing/all-in-one:1.6
```
This will start the UI at `http://localhost:16686`
##### Using the Go tool
Tracing is disabled by default, to enable, you can use the option `--enable-tracing`.
Run the application using the `--pprof` option to enable pprof (for trace collection).
To collect traces for 5 seconds:
```sh
$ curl http://localhost:6060/debug/pprof/trace?seconds=5 -o trace.out
```
View the trace with:
```sh
$ go tool trace trace.out
2018/05/04 10:39:59 Parsing trace...
2018/05/04 10:39:59 Splitting trace...
2018/05/04 10:39:59 Opening browser. Trace viewer is listening on http://127.0.0.1:51803
```
#### How to collect additional traces
We use the OpenCensus library to create traces. To trace the execution of a p2p
message through the system, we must define [spans](https://godoc.org/go.opencensus.io/trace#Span) around the code that handles the message. To correlate the trace with other spans defined for the same message, use the context passed inside the [Message](https://godoc.org/github.com/prysmaticlabs/prysm/shared/deprecated-p2p#Message) struct to create a span:
```go
var msg p2p.Message
var mySpan *trace.Span
msg.Ctx, mySpan = trace.StartSpan(msg.Ctx, "myOperation")
myOperation()
mySpan.End()
```
Another example on how to define spans can be found here: https://godoc.org/go.opencensus.io/trace#example-StartSpan

View File

@@ -0,0 +1,18 @@
// Package tracing includes useful functions for opentracing annotations.
package tracing
import (
"go.opencensus.io/trace"
)
// AnnotateError on span. This should be used any time a particular span experiences an error.
func AnnotateError(span *trace.Span, err error) {
if err == nil {
return
}
span.AddAttributes(trace.BoolAttribute("error", true))
span.SetStatus(trace.Status{
Code: trace.StatusCodeUnknown,
Message: err.Error(),
})
}

View File

@@ -0,0 +1,31 @@
package tracing
import (
"context"
"errors"
"fmt"
"runtime"
"runtime/debug"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
// RecoveryHandlerFunc is a function that recovers from the panic `p` by returning an `error`.
// The context can be used to extract request scoped metadata and context values.
func RecoveryHandlerFunc(ctx context.Context, p interface{}) error {
span := trace.FromContext(ctx)
if span != nil {
span.AddAttributes(trace.StringAttribute("stack", string(debug.Stack())))
}
var err error
switch v := p.(type) {
case runtime.Error:
err = errors.New(v.Error())
default:
err = fmt.Errorf("%v", p)
}
logrus.WithError(err).WithField("stack", string(debug.Stack())).Error("gRPC panicked!")
return err
}

View File

@@ -0,0 +1,53 @@
// Package tracing sets up jaeger as an opentracing tool
// for services in Prysm.
package tracing
import (
"errors"
"contrib.go.opencensus.io/exporter/jaeger"
"github.com/prysmaticlabs/prysm/shared/version"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
var log = logrus.WithField("prefix", "tracing")
// Setup creates and initializes a new tracing configuration..
func Setup(serviceName, processName, endpoint string, sampleFraction float64, enable bool) error {
if !enable {
trace.ApplyConfig(trace.Config{DefaultSampler: trace.NeverSample()})
return nil
}
if serviceName == "" {
return errors.New("tracing service name cannot be empty")
}
trace.ApplyConfig(trace.Config{
DefaultSampler: trace.ProbabilitySampler(sampleFraction),
MaxMessageEventsPerSpan: 500,
})
log.Infof("Starting Jaeger exporter endpoint at address = %s", endpoint)
exporter, err := jaeger.NewExporter(jaeger.Options{
CollectorEndpoint: endpoint,
Process: jaeger.Process{
ServiceName: serviceName,
Tags: []jaeger.Tag{
jaeger.StringTag("process_name", processName),
jaeger.StringTag("version", version.Version()),
},
},
BufferMaxCount: 10000,
OnError: func(err error) {
log.WithError(err).Error("Failed to process span")
},
})
if err != nil {
return err
}
trace.RegisterExporter(exporter)
return nil
}