Remove cluster-pk-manager tool (#9165)

* Remove cluster-pk-manager tool

* Go mod tidy
This commit is contained in:
Preston Van Loon
2021-07-08 13:31:41 -05:00
committed by GitHub
parent 9dc3dd04c7
commit 7c23d02c3e
14 changed files with 0 additions and 1104 deletions

2
go.mod
View File

@@ -7,7 +7,6 @@ require (
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 // indirect
github.com/allegro/bigcache v1.2.1 // indirect
github.com/aristanetworks/goarista v0.0.0-20200521140103-6c3304613b30
github.com/bazelbuild/buildtools v0.0.0-20200528175155-f4e8394f069d
github.com/bazelbuild/rules_go v0.23.2
github.com/btcsuite/btcd v0.21.0-beta // indirect
github.com/cespare/cp v1.1.1 // indirect
@@ -123,7 +122,6 @@ require (
gopkg.in/d4l3k/messagediff.v1 v1.2.1
gopkg.in/errgo.v2 v2.1.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.18.3
k8s.io/apimachinery v0.18.3
k8s.io/client-go v0.18.3
k8s.io/klog/v2 v2.3.0 // indirect

2
go.sum
View File

@@ -107,8 +107,6 @@ github.com/aws/aws-sdk-go-v2/service/route53 v1.1.1/go.mod h1:rLiOUrPLW/Er5kRcQ7
github.com/aws/aws-sdk-go-v2/service/sso v1.1.1/go.mod h1:SuZJxklHxLAXgLTc1iFXbEWkXs7QRTQpCLGaKIprQW0=
github.com/aws/aws-sdk-go-v2/service/sts v1.1.1/go.mod h1:Wi0EBZwiz/K44YliU0EKxqTCJGUfYTWXrrBwkq736bM=
github.com/aws/smithy-go v1.1.0/go.mod h1:EzMw8dbp/YJL4A5/sbhGddag+NPT7q084agLbB9LgIw=
github.com/bazelbuild/buildtools v0.0.0-20200528175155-f4e8394f069d h1:lXjj6ngxx9PVxg6TtlMCbkPATwLFf5dcl9z5Jr3WqGg=
github.com/bazelbuild/buildtools v0.0.0-20200528175155-f4e8394f069d/go.mod h1:5JP0TXzWDHXv8qvxRC4InIazwdyDseBDbzESUMKk1yU=
github.com/bazelbuild/rules_go v0.23.2 h1:Wxu7JjqnF78cKZbsBsARLSXx/jlGaSLCnUV3mTlyHvM=
github.com/bazelbuild/rules_go v0.23.2/go.mod h1:MC23Dc/wkXEyk3Wpq6lCqz0ZAYOZDw2DR5y3N1q2i7M=
github.com/benbjohnson/clock v1.0.2/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=

View File

@@ -1,67 +0,0 @@
# Cluster private key management tool
This is a primative tool for managing and delegating validator private key
assigments within the kubernetes cluster.
## Design
When a validator pod is initializing within the cluster, it requests a private
key for a deposited validator. Since pods are epheremal, scale up/down quickly,
there needs to be some service to manage private key allocations, validator
deposits, and re-allocations of previously in-use private keys from terminated
pods.
Workflow for bootstraping a validator pod
1. Request `n` private keys from the pk manager.
1. If unallocated private keys exist (from previously terminated pods), assign
to the requesting pod.
1. If there are not at least `n` keys not in use, generate new private keys,
and make the deposits on behalf of these newly generated private keys.
1. Write the key allocations to a persistent datastore and fulfill the request.
1. The client uses these private keys to act as deposited validators in the
system.
## Server
The server manages the private key database, allocates new private keys, makes
validator deposits, and fulfills requests from pods for private key allocation.
### Database structure
There are two buckets for the server, unallocated keys and allocated keys.
Unallocated keys bucket:
| key | value |
|-------------|-------|
| private key | nil |
Allocated keys bucket:
| key | value |
|----------|----------------------|
| pod name | list of private keys |
### Key management design
There are two types of operations with regards to private keys:
- Allocate(podName, keys)
- UnallocateAllKeys(podName)
Allocating keys will first check and attempt to recycle existing, unused keys.
If there are no unused keys available (or not enough), new keys are deposited.
Unallocating keys happens when a pod is destroyed. This should return all of
that's pods' keys to the unallocated keys bucket.
### Assignments HTTP Page `/assignments`
The server exposes an HTTP page which maps pod names to public keys.
This may be useful for determining which logs to follow for a given validator.
## Client
The client makes the private key request with a given pod name and generates a
keystore with the server response.

View File

@@ -1,48 +0,0 @@
load("@prysm//tools/go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_binary")
load("@io_bazel_rules_docker//go:image.bzl", "go_image")
load("@io_bazel_rules_docker//container:container.bzl", "container_bundle")
load("@io_bazel_rules_docker//contrib:push-all.bzl", "docker_push")
go_library(
name = "go_default_library",
srcs = ["main.go"],
importpath = "github.com/prysmaticlabs/prysm/tools/cluster-pk-manager/client",
visibility = ["//visibility:private"],
deps = [
"//proto/cluster:go_default_library",
"//shared/maxprocs:go_default_library",
"//shared/params:go_default_library",
"@com_github_bazelbuild_buildtools//file:go_default_library",
"@org_golang_google_grpc//:go_default_library",
],
)
go_binary(
name = "client",
embed = [":go_default_library"],
visibility = ["//visibility:public"],
)
go_image(
name = "image",
base = "//tools:cc_image",
binary = ":client",
tags = ["manual"],
visibility = ["//visibility:private"],
)
container_bundle(
name = "image_bundle",
images = {
"gcr.io/prysmaticlabs/prysm/cluster-pk-manager/client:latest": ":image",
"gcr.io/prysmaticlabs/prysm/cluster-pk-manager/client:{DOCKER_TAG}": ":image",
},
tags = ["manual"],
)
docker_push(
name = "push_images",
bundle = ":image_bundle",
tags = ["manual"],
)

View File

@@ -1,76 +0,0 @@
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"github.com/bazelbuild/buildtools/file"
pb "github.com/prysmaticlabs/prysm/proto/cluster"
_ "github.com/prysmaticlabs/prysm/shared/maxprocs"
"github.com/prysmaticlabs/prysm/shared/params"
"google.golang.org/grpc"
)
var (
serverAddr = flag.String("server", "", "The address of the gRPC server")
podName = flag.String("pod-name", "", "The name of the pod running this tool")
numKeys = flag.Uint64("keys", 1, "The number of keys to request")
outputJSON = flag.String("output-json", "", "JSON file to write output to")
)
// UnencryptedKeysContainer defines the structure of the unecrypted key JSON file.
type UnencryptedKeysContainer struct {
Keys []*UnencryptedKeys `json:"keys"`
}
// UnencryptedKeys is the inner struct of the JSON file.
type UnencryptedKeys struct {
ValidatorKey []byte `json:"validator_key"`
WithdrawalKey []byte `json:"withdrawal_key"`
}
func main() {
params.UsePyrmontConfig()
flag.Parse()
fmt.Printf("Starting client to fetch private key for pod %s\n", *podName)
ctx := context.Background()
conn, err := grpc.DialContext(ctx, *serverAddr, grpc.WithInsecure())
if err != nil {
panic(err)
}
client := pb.NewPrivateKeyServiceClient(conn)
resp, err := client.Request(ctx, &pb.PrivateKeyRequest{
PodName: *podName,
NumberOfKeys: *numKeys,
})
if err != nil {
panic(err)
}
keys := make([]*UnencryptedKeys, len(resp.PrivateKeys.PrivateKeys))
for i, privateKey := range resp.PrivateKeys.PrivateKeys {
keys[i] = &UnencryptedKeys{
ValidatorKey: privateKey,
WithdrawalKey: privateKey,
}
}
c := &UnencryptedKeysContainer{Keys: keys}
enc, err := json.Marshal(c)
if err != nil {
panic(err)
}
if err := file.WriteFile(*outputJSON, enc); err != nil {
panic(err)
}
fmt.Printf("Wrote %d keys\n", len(keys))
}

View File

@@ -1,88 +0,0 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
load("@io_bazel_rules_go//go:def.bzl", "go_binary")
load("@io_bazel_rules_docker//go:image.bzl", "go_image")
load("@io_bazel_rules_docker//container:container.bzl", "container_bundle")
load("@io_bazel_rules_docker//contrib:push-all.bzl", "docker_push")
go_library(
name = "go_default_library",
srcs = [
"allocations.go",
"db.go",
"keyChecker.go",
"logger.go",
"main.go",
"server.go",
"watchtower.go",
],
importpath = "github.com/prysmaticlabs/prysm/tools/cluster-pk-manager/server",
visibility = ["//visibility:private"],
deps = [
"//contracts/deposit-contract:go_default_library",
"//proto/cluster:go_default_library",
"//proto/eth/v1alpha1:go_default_library",
"//shared/bls:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/depositutil:go_default_library",
"//shared/keystore:go_default_library",
"//shared/maxprocs:go_default_library",
"//shared/params:go_default_library",
"//shared/prometheus:go_default_library",
"@com_github_ethereum_go_ethereum//:go_default_library",
"@com_github_ethereum_go_ethereum//accounts/abi/bind:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
"@com_github_ethereum_go_ethereum//ethclient:go_default_library",
"@com_github_ethereum_go_ethereum//rpc:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_etcd_go_bbolt//:go_default_library",
"@io_k8s_api//core/v1:go_default_library",
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
"@io_k8s_client_go//kubernetes:go_default_library",
"@io_k8s_client_go//rest:go_default_library",
"@io_opencensus_go//plugin/ocgrpc:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
],
)
go_binary(
name = "server",
embed = [":go_default_library"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
size = "small",
srcs = ["server_test.go"],
embed = [":go_default_library"],
deps = ["//proto/cluster:go_default_library"],
)
go_image(
name = "image",
base = "//tools:cc_image",
binary = ":server",
tags = ["manual"],
visibility = ["//visibility:private"],
)
container_bundle(
name = "image_bundle",
images = {
"gcr.io/prysmaticlabs/prysm/cluster-pk-manager/server:latest": ":image",
"gcr.io/prysmaticlabs/prysm/cluster-pk-manager/server:{DOCKER_TAG}": ":image",
},
tags = ["manual"],
)
docker_push(
name = "push_images",
bundle = ":image_bundle",
tags = ["manual"],
)

View File

@@ -1,39 +0,0 @@
package main
import (
"fmt"
"io"
"net/http"
)
func (s *server) serveAllocationsHTTPPage() {
mux := http.NewServeMux()
mux.HandleFunc("/allocations", func(w http.ResponseWriter, _ *http.Request) {
res := ""
a, err := s.db.Allocations()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
if _, err := io.WriteString(w, err.Error()); err != nil {
log.Error(err)
}
return
}
for podName, pubkeys := range a {
for _, pk := range pubkeys {
res += fmt.Sprintf("%s=%#x\n", podName, pk)
}
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusOK)
if _, err := io.WriteString(w, res); err != nil {
log.Error(err)
}
})
srv := &http.Server{Addr: ":8080", Handler: mux}
go func() {
log.Fatal(srv.ListenAndServe())
}()
log.Info("Serving allocations page at :8080/allocations")
}

View File

@@ -1,354 +0,0 @@
package main
import (
"bytes"
"context"
"errors"
"path"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
pb "github.com/prysmaticlabs/prysm/proto/cluster"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/keystore"
"github.com/prysmaticlabs/prysm/shared/params"
bolt "go.etcd.io/bbolt"
"google.golang.org/protobuf/proto"
)
var (
allocatedPkCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "allocated_pk_count",
Help: "The number of allocated private keys",
})
assignedPkCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "assigned_pk_count",
Help: "The number of private keys currently assigned to alive pods",
})
bannedPKCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "banned_pk_count",
Help: "The number of private keys which have been removed that are of exited validators",
})
)
var (
dbFileName = "pk.db"
assignedPkBucket = []byte("assigned_pks")
unassignedPkBucket = []byte("unassigned_pks")
deletedKeysBucket = []byte("deleted_pks")
dummyVal = []byte{1}
)
type keyMap struct {
podName string
privateKey []byte
index int
}
type db struct {
db *bolt.DB
}
func newDB(dbPath string) *db {
datafile := path.Join(dbPath, dbFileName)
boltdb, err := bolt.Open(datafile, params.BeaconIoConfig().ReadWritePermissions, &bolt.Options{Timeout: params.BeaconIoConfig().BoltTimeout})
if err != nil {
panic(err)
}
// Initialize buckets
if err := boltdb.Update(func(tx *bolt.Tx) error {
for _, bkt := range [][]byte{assignedPkBucket, unassignedPkBucket, deletedKeysBucket} {
if _, err := tx.CreateBucketIfNotExists(bkt); err != nil {
return err
}
}
return nil
}); err != nil {
panic(err)
}
// Populate metrics on start.
if err := boltdb.View(func(tx *bolt.Tx) error {
// Populate banned key count.
bannedPKCount.Set(float64(tx.Bucket(deletedKeysBucket).Stats().KeyN))
keys := 0
// Iterate over all of the pod assigned keys (one to many).
c := tx.Bucket(assignedPkBucket).Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
pks := &pb.PrivateKeys{}
if err := proto.Unmarshal(v, pks); err != nil {
log.WithError(err).Error("Unable to unmarshal private key")
continue
}
keys += len(pks.PrivateKeys)
}
assignedPkCount.Set(float64(keys))
// Add the unassigned keys count (one to one).
keys += tx.Bucket(unassignedPkBucket).Stats().KeyN
allocatedPkCount.Add(float64(keys))
return nil
}); err != nil {
panic(err)
}
return &db{db: boltdb}
}
// UnallocatedPKs returns unassigned private keys, if any are available.
func (d *db) UnallocatedPKs(_ context.Context, numKeys uint64) (*pb.PrivateKeys, error) {
pks := &pb.PrivateKeys{}
if err := d.db.View(func(tx *bolt.Tx) error {
c := tx.Bucket(unassignedPkBucket).Cursor()
i := uint64(0)
for k, _ := c.First(); k != nil && i < numKeys; k, _ = c.Next() {
pks.PrivateKeys = append(pks.PrivateKeys, k)
i++
}
return nil
}); err != nil {
return nil, err
}
return pks, nil
}
// DeleteUnallocatedKey removes provided private key.
func (d *db) DeleteUnallocatedKey(_ context.Context, privateKey []byte) error {
return d.db.Update(func(tx *bolt.Tx) error {
if err := tx.Bucket(unassignedPkBucket).Delete(privateKey); err != nil {
return err
}
if err := tx.Bucket(deletedKeysBucket).Put(privateKey, dummyVal); err != nil {
return err
}
bannedPKCount.Inc()
allocatedPkCount.Dec()
return nil
})
}
// PodPKs returns an assigned private key to the given pod name, if one exists.
func (d *db) PodPKs(_ context.Context, podName string) (*pb.PrivateKeys, error) {
pks := &pb.PrivateKeys{}
if err := d.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(assignedPkBucket).Get([]byte(podName))
return proto.Unmarshal(b, pks)
}); err != nil {
return nil, err
}
return pks, nil
}
// AllocateNewPkToPod records new private key assignment in DB.
func (d *db) AllocateNewPkToPod(
_ context.Context,
pk *keystore.Key,
podName string,
) error {
allocatedPkCount.Inc()
assignedPkCount.Inc()
return d.db.Update(func(tx *bolt.Tx) error {
pks := &pb.PrivateKeys{}
if b := tx.Bucket(assignedPkBucket).Get([]byte(podName)); b != nil {
if err := proto.Unmarshal(b, pks); err != nil {
return err
}
}
pks.PrivateKeys = append(pks.PrivateKeys, pk.SecretKey.Marshal())
b, err := proto.Marshal(pks)
if err != nil {
return err
}
return tx.Bucket(assignedPkBucket).Put(
[]byte(podName),
b,
)
})
}
// RemovePKAssignment from pod and put the private keys into the unassigned
// bucket.
func (d *db) RemovePKAssignment(_ context.Context, podName string) error {
return d.db.Update(func(tx *bolt.Tx) error {
data := tx.Bucket(assignedPkBucket).Get([]byte(podName))
if data == nil {
log.WithField("podName", podName).Warn("Nil private key returned from db")
return nil
}
pks := &pb.PrivateKeys{}
if err := proto.Unmarshal(data, pks); err != nil {
log.WithError(err).Error("Failed to unmarshal pks, deleting from db")
return tx.Bucket(assignedPkBucket).Delete([]byte(podName))
}
if err := tx.Bucket(assignedPkBucket).Delete([]byte(podName)); err != nil {
return err
}
assignedPkCount.Sub(float64(len(pks.PrivateKeys)))
for _, pk := range pks.PrivateKeys {
if err := tx.Bucket(unassignedPkBucket).Put(pk, dummyVal); err != nil {
return err
}
}
return nil
})
}
// AssignExistingPKs assigns a PK from the unassigned bucket to a given pod.
func (d *db) AssignExistingPKs(_ context.Context, pks *pb.PrivateKeys, podName string) error {
return d.db.Update(func(tx *bolt.Tx) error {
for _, pk := range pks.PrivateKeys {
if bytes.Equal(tx.Bucket(unassignedPkBucket).Get(pk), dummyVal) {
if err := tx.Bucket(unassignedPkBucket).Delete(pk); err != nil {
return err
}
}
}
assignedPkCount.Add(float64(len(pks.PrivateKeys)))
// If pod assignment exists, append to it.
if existing := tx.Bucket(assignedPkBucket).Get([]byte(podName)); existing != nil {
existingKeys := &pb.PrivateKeys{}
if err := proto.Unmarshal(existing, existingKeys); err != nil {
pks.PrivateKeys = append(pks.PrivateKeys, existingKeys.PrivateKeys...)
}
}
data, err := proto.Marshal(pks)
if err != nil {
return err
}
return tx.Bucket(assignedPkBucket).Put([]byte(podName), data)
})
}
// AllocatedPodNames returns the string list of pod names with current private
// key allocations.
func (d *db) AllocatedPodNames(_ context.Context) ([]string, error) {
var podNames []string
if err := d.db.View(func(tx *bolt.Tx) error {
return tx.Bucket(assignedPkBucket).ForEach(func(k, v []byte) error {
podNames = append(podNames, string(k))
return nil
})
}); err != nil {
return nil, err
}
return podNames, nil
}
// Allocations builds and returns key allocations.
func (d *db) Allocations() (map[string][][]byte, error) {
m := make(map[string][][]byte)
if err := d.db.View(func(tx *bolt.Tx) error {
return tx.Bucket(assignedPkBucket).ForEach(func(k, v []byte) error {
pks := &pb.PrivateKeys{}
if err := proto.Unmarshal(v, pks); err != nil {
log.WithError(err).Error("Could not unmarshal private key")
}
pubkeys := make([][]byte, len(pks.PrivateKeys))
for i, pk := range pks.PrivateKeys {
k, err := bls.SecretKeyFromBytes(pk)
if err != nil {
return err
}
pubkeys[i] = k.PublicKey().Marshal()
}
m[string(k)] = pubkeys
return nil
})
}); err != nil {
// do something
return nil, err
}
return m, nil
}
// KeyMap builds and returns key map.
func (d *db) KeyMap() ([][]byte, map[[48]byte]keyMap, error) {
m := make(map[[48]byte]keyMap)
pubkeys := make([][]byte, 0)
if err := d.db.View(func(tx *bolt.Tx) error {
return tx.Bucket(assignedPkBucket).ForEach(func(k, v []byte) error {
pks := &pb.PrivateKeys{}
if err := proto.Unmarshal(v, pks); err != nil {
return err
}
for i, pk := range pks.PrivateKeys {
seckey, err := bls.SecretKeyFromBytes(pk)
if err != nil {
log.WithError(err).Warn("Could not deserialize secret key... removing")
return tx.Bucket(assignedPkBucket).Delete(k)
}
keytoSet := bytesutil.ToBytes48(seckey.PublicKey().Marshal())
m[keytoSet] = keyMap{
podName: string(k),
privateKey: pk,
index: i,
}
pubkeys = append(pubkeys, seckey.PublicKey().Marshal())
}
return nil
})
}); err != nil {
// do something
return nil, nil, err
}
return pubkeys, m, nil
}
// RemovePKFromPod and throw it away.
func (d *db) RemovePKFromPod(podName string, key []byte) error {
return d.db.Update(func(tx *bolt.Tx) error {
data := tx.Bucket(assignedPkBucket).Get([]byte(podName))
if data == nil {
log.WithField("podName", podName).Warn("Nil private key returned from db")
return nil
}
pks := &pb.PrivateKeys{}
if err := proto.Unmarshal(data, pks); err != nil {
log.WithError(err).Error("Unable to unmarshal private keys, deleting assignment from db")
return tx.Bucket(assignedPkBucket).Delete([]byte(podName))
}
found := false
for i, k := range pks.PrivateKeys {
if bytes.Equal(k, key) {
found = true
pks.PrivateKeys = append(pks.PrivateKeys[:i], pks.PrivateKeys[i+1:]...)
break
}
}
if !found {
return errors.New("private key not assigned to pod")
}
marshaled, err := proto.Marshal(pks)
if err != nil {
return err
}
bannedPKCount.Inc()
allocatedPkCount.Dec()
assignedPkCount.Dec()
nowBytes, err := time.Now().MarshalBinary()
if err != nil {
return err
}
if err := tx.Bucket(deletedKeysBucket).Put(key, nowBytes); err != nil {
return err
}
return tx.Bucket(assignedPkBucket).Put([]byte(podName), marshaled)
})
}

View File

@@ -1,33 +0,0 @@
package main
import (
"time"
)
var keyInterval = 3 * time.Minute
type keyChecker struct {
db *db
}
func newkeyChecker(db *db, _ string) *keyChecker {
log.Warn("Key checker temporarily disabled during refactor.")
return &keyChecker{
db: db,
}
}
func (k *keyChecker) checkKeys() error {
log.Warn("Not checking for EXITED validator keys.")
return nil
}
func (k *keyChecker) run() {
for {
if err := k.checkKeys(); err != nil {
log.WithError(err).Error("Failed to check keys")
}
time.Sleep(keyInterval)
}
}

View File

@@ -1,5 +0,0 @@
package main
import "github.com/sirupsen/logrus"
var log = logrus.WithField("prefix", "main")

View File

@@ -1,69 +0,0 @@
package main
import (
"flag"
"fmt"
"net"
pb "github.com/prysmaticlabs/prysm/proto/cluster"
_ "github.com/prysmaticlabs/prysm/shared/maxprocs"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/prometheus"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
var (
port = flag.Int("port", 8000, "The port to server gRPC")
metricsPort = flag.Int("metrics-port", 9090, "The port to serve /metrics")
privateKey = flag.String("private-key", "", "The private key of funder")
rpcPath = flag.String("rpc", "https://goerli.prylabs.net", "RPC address of a running ETH1 node")
beaconRPCPath = flag.String("beaconRPC", "localhost:4000", "RPC address of Beacon Node")
depositContractAddr = flag.String("deposit-contract", "", "Address of the deposit contract")
depositAmount = flag.String("deposit-amount", "", "The amount of wei to deposit into the contract")
dbPath = flag.String("db-path", "", "The file path for database storage")
disableWatchtower = flag.Bool("disable-watchtower", false, "Disable kubernetes pod watcher. Useful for local testing")
verbose = flag.Bool("verbose", false, "Enable debug logging")
ensureDeposited = flag.Bool("ensure-deposited", false, "Ensure keys are deposited")
allowNewDeposits = flag.Bool("allow-new-deposits", true, "Allow cluster PK manager to send new deposits or generate new keys")
)
func main() {
params.UsePyrmontConfig()
flag.Parse()
if *verbose {
logrus.SetLevel(logrus.DebugLevel)
}
if *ensureDeposited {
log.Warn("--ensure-deposited: Ensuring all keys are deposited or deleting them from database!")
}
if !*allowNewDeposits {
log.Warn("Disallowing new deposits")
}
db := newDB(*dbPath)
srv := newServer(db, *rpcPath, *depositContractAddr, *privateKey, *depositAmount, *beaconRPCPath)
if !*disableWatchtower {
wt := newWatchtower(db)
go wt.WatchPods()
}
kc := newkeyChecker(db, *beaconRPCPath)
go kc.run()
s := grpc.NewServer()
pb.RegisterPrivateKeyServiceServer(s, srv)
go prometheus.RunSimpleServerOrDie(fmt.Sprintf(":%d", *metricsPort))
srv.serveAllocationsHTTPPage()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
panic(err)
}
log.Infof("Listening for gRPC requests on port %d", *port)
if err := s.Serve(lis); err != nil {
log.Fatal(err)
}
}

View File

@@ -1,243 +0,0 @@
package main
import (
"context"
"crypto/ecdsa"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/pkg/errors"
contracts "github.com/prysmaticlabs/prysm/contracts/deposit-contract"
pb "github.com/prysmaticlabs/prysm/proto/cluster"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/depositutil"
"github.com/prysmaticlabs/prysm/shared/keystore"
"go.opencensus.io/plugin/ocgrpc"
"google.golang.org/grpc"
)
var gasLimit = uint64(4000000)
var blockTime = time.Duration(14)
type server struct {
contract *contracts.DepositContract
db *db
depositAmount *big.Int
txPk *ecdsa.PrivateKey
client *ethclient.Client
beacon ethpb.BeaconNodeValidatorClient
clientLock sync.Mutex
}
func newServer(db *db, rpcAddr, depositContractAddr, funderPK, validatorDepositAmount, beaconRPCAddr string) *server {
rpcClient, err := rpc.Dial(rpcAddr)
if err != nil {
panic(err)
}
client := ethclient.NewClient(rpcClient)
contract, err := contracts.NewDepositContract(common.HexToAddress(depositContractAddr), client)
if err != nil {
panic(err)
}
txPk, err := crypto.HexToECDSA(funderPK)
if err != nil {
panic(err)
}
depositAmount := big.NewInt(0)
depositAmount.SetString(validatorDepositAmount, 10)
conn, err := grpc.DialContext(context.Background(), beaconRPCAddr, grpc.WithInsecure(), grpc.WithStatsHandler(&ocgrpc.ClientHandler{}))
if err != nil {
log.Errorf("Could not dial endpoint: %s, %v", beaconRPCAddr, err)
}
return &server{
contract: contract,
client: client,
db: db,
depositAmount: depositAmount,
txPk: txPk,
beacon: ethpb.NewBeaconNodeValidatorClient(conn),
}
}
func (s *server) makeDeposit(pubkey, withdrawalCredentials, signature []byte, depositRoot [32]byte) (*types.Transaction, error) {
txOps := bind.NewKeyedTransactor(s.txPk)
txOps.Value = s.depositAmount
txOps.GasLimit = gasLimit
tx, err := s.contract.Deposit(txOps, pubkey, withdrawalCredentials, signature, depositRoot)
if err != nil {
return nil, errors.Wrap(err, "deposit failed")
}
log.WithField("tx", tx.Hash().Hex()).Info("Deposit transaction sent")
return tx, nil
}
// Request processes private keys requests.
func (s *server) Request(ctx context.Context, req *pb.PrivateKeyRequest) (*pb.PrivateKeyResponse, error) {
s.clientLock.Lock()
defer s.clientLock.Unlock()
if req.NumberOfKeys == 0 {
req.NumberOfKeys = 1
}
// build the list of PKs in the following order, until the requested
// amount is ready to return.
// - PKs already assigned to the pod
// - PKs that have not yet been allocated
// - PKs that are newly initialized with deposits
pks, err := s.db.PodPKs(ctx, req.PodName)
if err != nil {
return nil, err
}
if pks != nil && len(pks.PrivateKeys) > 0 {
log.WithField("pod", req.PodName).Debug("Returning existing assignment(s)")
return &pb.PrivateKeyResponse{
PrivateKeys: pks,
}, nil
}
unallocated, err := s.db.UnallocatedPKs(ctx, req.NumberOfKeys)
if err != nil {
return nil, err
}
log.WithField(
"pod", req.PodName,
).WithField(
"keys", len(unallocated.PrivateKeys),
).Debug("Recycling existing private key(s)")
pks.PrivateKeys = append(pks.PrivateKeys, unallocated.PrivateKeys...)
if *ensureDeposited {
log.Debugf("Ensuring %d keys are deposited", len(pks.PrivateKeys))
ok := make([][]byte, 0, len(pks.PrivateKeys))
for _, pk := range pks.PrivateKeys {
sk, err := bls.SecretKeyFromBytes(pk)
if err != nil || sk == nil {
continue
}
pub := sk.PublicKey().Marshal()
req := &ethpb.ValidatorStatusRequest{PublicKey: pub}
res, err := s.beacon.ValidatorStatus(ctx, req)
if err != nil {
log.WithError(err).Error("Failed to get validator status")
continue
}
if res.Status == ethpb.ValidatorStatus_UNKNOWN_STATUS {
log.Warn("Deleting unknown deposit pubkey")
if err := s.db.DeleteUnallocatedKey(ctx, pk); err != nil {
log.WithError(err).Error("Failed to delete unallocated key")
}
} else {
ok = append(ok, pk)
}
}
pks.PrivateKeys = ok
}
if len(pks.PrivateKeys) < int(req.NumberOfKeys) {
c := int(req.NumberOfKeys) - len(pks.PrivateKeys)
newKeys, err := s.allocateNewKeys(ctx, req.PodName, c)
if err != nil {
return nil, err
}
pks.PrivateKeys = append(pks.PrivateKeys, newKeys.PrivateKeys...)
}
if err := s.db.AssignExistingPKs(ctx, pks, req.PodName); err != nil {
return nil, err
}
return &pb.PrivateKeyResponse{PrivateKeys: pks}, nil
}
func (s *server) allocateNewKeys(ctx context.Context, podName string, numKeys int) (*pb.PrivateKeys, error) {
if !*allowNewDeposits {
return nil, errors.New("new deposits not allowed")
}
pks := make([][]byte, 0, numKeys)
txMap := make(map[*keystore.Key]*types.Transaction)
for i := 0; i < numKeys; i++ {
key, err := keystore.NewKey()
if err != nil {
return nil, err
}
// Make the validator deposit
// NOTE: This uses the validator key as the withdrawal key
di, dr, err := depositutil.DepositInput(
key.SecretKey, /*depositKey*/
key.SecretKey, /*withdrawalKey*/
new(big.Int).Div(s.depositAmount, big.NewInt(1e9)).Uint64(),
)
if err != nil {
return nil, err
}
// Do the actual deposit
tx, err := s.makeDeposit(di.PublicKey, di.WithdrawalCredentials, di.Signature, dr)
if err != nil {
return nil, err
}
txMap[key] = tx
// Store in database
if err := s.db.AllocateNewPkToPod(ctx, key, podName); err != nil {
return nil, err
}
}
for {
time.Sleep(time.Second * blockTime)
receivedKeys, err := s.checkDepositTxs(ctx, txMap)
if err != nil {
return nil, err
}
pks = append(pks, receivedKeys...)
if len(txMap) == 0 {
break
}
}
return &pb.PrivateKeys{PrivateKeys: pks}, nil
}
func (s *server) checkDepositTxs(ctx context.Context, txMap map[*keystore.Key]*types.Transaction) ([][]byte,
error) {
pks := make([][]byte, 0, len(txMap))
for k, tx := range txMap {
receipt, err := s.client.TransactionReceipt(ctx, tx.Hash())
if errors.Is(err, ethereum.NotFound) {
// tx still not processed yet.
continue
}
if err != nil {
return nil, err
}
if receipt.Status == types.ReceiptStatusFailed {
delete(txMap, k)
continue
}
// append key if tx succeeded.
pks = append(pks, k.SecretKey.Marshal())
delete(txMap, k)
}
return pks, nil
}

View File

@@ -1,7 +0,0 @@
package main
import (
pb "github.com/prysmaticlabs/prysm/proto/cluster"
)
var _ pb.PrivateKeyServiceServer = (*server)(nil)

View File

@@ -1,71 +0,0 @@
package main
import (
"context"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
var queryInterval = 3 * time.Second
var namespace = "beacon-chain"
var podSelector = "component=validator"
type watchtower struct {
db *db
client *kubernetes.Clientset
}
func newWatchtower(db *db) *watchtower {
config, err := rest.InClusterConfig()
if err != nil {
panic(err)
}
client := kubernetes.NewForConfigOrDie(config)
return &watchtower{db, client}
}
// WatchPods for termination, update allocations
func (wt *watchtower) WatchPods() {
for {
time.Sleep(queryInterval)
if err := wt.queryPodsAndUpdateDB(); err != nil {
log.WithField("error", err).Error("Failed to update pods")
}
}
}
// Query k8s pods for existence.
func (wt *watchtower) queryPodsAndUpdateDB() error {
ctx := context.Background()
podNames, err := wt.db.AllocatedPodNames(ctx)
if err != nil {
return err
}
pList, err := wt.client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: podSelector})
if err != nil {
return err
}
podExist := make(map[string]bool)
for _, p := range pList.Items {
if p.Status.Phase == v1.PodRunning || p.Status.Phase == v1.PodPending {
podExist[p.Name] = true
} else {
log.Debugf("ignoring pod with phase %s", p.Status.Phase)
}
}
for _, p := range podNames {
if !podExist[p] {
log.WithField("pod", p).Debug("Removing assignment from dead pod")
if err := wt.db.RemovePKAssignment(ctx, p); err != nil {
return err
}
}
}
return nil
}