Update cluster pk manager to assign multiple keys to validators (#2112)

* checkpoint

* move lock

* checkpoint

* checkpoint

* chkpt

* update readme so i know what im supposed to do

* ckpt

* checkpoint

* checkpoint

* chkpt

* Fix image build

* handle errors

* add port to pod

* lint

* Update beacon-config.config.yaml

* lint
This commit is contained in:
Preston Van Loon
2019-04-14 17:53:34 -04:00
committed by GitHub
parent 4659b4a2da
commit 2148a83b02
10 changed files with 345 additions and 103 deletions

View File

@@ -64,6 +64,8 @@ spec:
ports: ports:
- containerPort: 8000 - containerPort: 8000
name: grpc-service name: grpc-service
- containerPort: 8080
name: allocations
- containerPort: 9090 - containerPort: 9090
name: prometheus name: prometheus
volumeMounts: volumeMounts:

View File

@@ -25,6 +25,7 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type PrivateKeyRequest struct { type PrivateKeyRequest struct {
PodName string `protobuf:"bytes,1,opt,name=pod_name,json=podName,proto3" json:"pod_name,omitempty"` PodName string `protobuf:"bytes,1,opt,name=pod_name,json=podName,proto3" json:"pod_name,omitempty"`
NumberOfKeys uint64 `protobuf:"varint,2,opt,name=number_of_keys,json=numberOfKeys,proto3" json:"number_of_keys,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@@ -62,11 +63,19 @@ func (m *PrivateKeyRequest) GetPodName() string {
return "" return ""
} }
func (m *PrivateKeyRequest) GetNumberOfKeys() uint64 {
if m != nil {
return m.NumberOfKeys
}
return 0
}
type PrivateKeyResponse struct { type PrivateKeyResponse struct {
PrivateKey []byte `protobuf:"bytes,1,opt,name=private_key,json=privateKey,proto3" json:"private_key,omitempty"` PrivateKey []byte `protobuf:"bytes,1,opt,name=private_key,json=privateKey,proto3" json:"private_key,omitempty"` // Deprecated: Do not use.
XXX_NoUnkeyedLiteral struct{} `json:"-"` PrivateKeys *PrivateKeys `protobuf:"bytes,2,opt,name=private_keys,json=privateKeys,proto3" json:"private_keys,omitempty"`
XXX_unrecognized []byte `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
} }
func (m *PrivateKeyResponse) Reset() { *m = PrivateKeyResponse{} } func (m *PrivateKeyResponse) Reset() { *m = PrivateKeyResponse{} }
@@ -94,6 +103,7 @@ func (m *PrivateKeyResponse) XXX_DiscardUnknown() {
var xxx_messageInfo_PrivateKeyResponse proto.InternalMessageInfo var xxx_messageInfo_PrivateKeyResponse proto.InternalMessageInfo
// Deprecated: Do not use.
func (m *PrivateKeyResponse) GetPrivateKey() []byte { func (m *PrivateKeyResponse) GetPrivateKey() []byte {
if m != nil { if m != nil {
return m.PrivateKey return m.PrivateKey
@@ -101,27 +111,78 @@ func (m *PrivateKeyResponse) GetPrivateKey() []byte {
return nil return nil
} }
func (m *PrivateKeyResponse) GetPrivateKeys() *PrivateKeys {
if m != nil {
return m.PrivateKeys
}
return nil
}
type PrivateKeys struct {
PrivateKeys [][]byte `protobuf:"bytes,1,rep,name=private_keys,json=privateKeys,proto3" json:"private_keys,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *PrivateKeys) Reset() { *m = PrivateKeys{} }
func (m *PrivateKeys) String() string { return proto.CompactTextString(m) }
func (*PrivateKeys) ProtoMessage() {}
func (*PrivateKeys) Descriptor() ([]byte, []int) {
return fileDescriptor_f118b0fcaa41cfbe, []int{2}
}
func (m *PrivateKeys) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PrivateKeys.Unmarshal(m, b)
}
func (m *PrivateKeys) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_PrivateKeys.Marshal(b, m, deterministic)
}
func (m *PrivateKeys) XXX_Merge(src proto.Message) {
xxx_messageInfo_PrivateKeys.Merge(m, src)
}
func (m *PrivateKeys) XXX_Size() int {
return xxx_messageInfo_PrivateKeys.Size(m)
}
func (m *PrivateKeys) XXX_DiscardUnknown() {
xxx_messageInfo_PrivateKeys.DiscardUnknown(m)
}
var xxx_messageInfo_PrivateKeys proto.InternalMessageInfo
func (m *PrivateKeys) GetPrivateKeys() [][]byte {
if m != nil {
return m.PrivateKeys
}
return nil
}
func init() { func init() {
proto.RegisterType((*PrivateKeyRequest)(nil), "prysm.internal.cluster.PrivateKeyRequest") proto.RegisterType((*PrivateKeyRequest)(nil), "prysm.internal.cluster.PrivateKeyRequest")
proto.RegisterType((*PrivateKeyResponse)(nil), "prysm.internal.cluster.PrivateKeyResponse") proto.RegisterType((*PrivateKeyResponse)(nil), "prysm.internal.cluster.PrivateKeyResponse")
proto.RegisterType((*PrivateKeys)(nil), "prysm.internal.cluster.PrivateKeys")
} }
func init() { proto.RegisterFile("proto/cluster/services.proto", fileDescriptor_f118b0fcaa41cfbe) } func init() { proto.RegisterFile("proto/cluster/services.proto", fileDescriptor_f118b0fcaa41cfbe) }
var fileDescriptor_f118b0fcaa41cfbe = []byte{ var fileDescriptor_f118b0fcaa41cfbe = []byte{
// 182 bytes of a gzipped FileDescriptorProto // 256 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x29, 0x28, 0xca, 0x2f, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0x41, 0x4b, 0xc3, 0x30,
0xc9, 0xd7, 0x4f, 0xce, 0x29, 0x2d, 0x2e, 0x49, 0x2d, 0xd2, 0x2f, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x14, 0xc7, 0xc9, 0x14, 0xa7, 0xaf, 0x45, 0x30, 0x07, 0xa9, 0xe2, 0xa1, 0x76, 0x1e, 0xaa, 0x87,
0x4e, 0x2d, 0xd6, 0x03, 0x0b, 0x0b, 0x89, 0x15, 0x14, 0x55, 0x16, 0xe7, 0xea, 0x65, 0xe6, 0x95, 0x4c, 0xe6, 0x37, 0xf0, 0xe0, 0x65, 0xa0, 0x12, 0xbd, 0xd7, 0x6e, 0x7b, 0x83, 0xe2, 0x9a, 0xc4,
0xa4, 0x16, 0xe5, 0x25, 0xe6, 0xe8, 0x41, 0x95, 0x29, 0xe9, 0x71, 0x09, 0x06, 0x14, 0x65, 0x96, 0xbc, 0x74, 0xd0, 0xa3, 0xdf, 0x5c, 0x6c, 0xa6, 0x8d, 0x22, 0xb8, 0xeb, 0x3f, 0xbf, 0xf7, 0x4b,
0x25, 0x96, 0xa4, 0x7a, 0xa7, 0x56, 0x06, 0xa5, 0x16, 0x96, 0xa6, 0x16, 0x97, 0x08, 0x49, 0x72, 0xf2, 0x7f, 0x70, 0x66, 0xac, 0x76, 0x7a, 0x3c, 0x5f, 0x35, 0xe4, 0xd0, 0x8e, 0x09, 0xed, 0xba,
0x71, 0x14, 0xe4, 0xa7, 0xc4, 0xe7, 0x25, 0xe6, 0xa6, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x06, 0x9a, 0x23, 0x89, 0x2e, 0xe6, 0xc7, 0xc6, 0xb6, 0x54, 0x8b, 0x4a, 0x39, 0xb4, 0xaa, 0x5c, 0x89,
0xb1, 0x17, 0xe4, 0xa7, 0xf8, 0x25, 0xe6, 0xa6, 0x2a, 0x99, 0x72, 0x09, 0x21, 0xab, 0x2f, 0x2e, 0x0d, 0x96, 0x3d, 0xc3, 0xd1, 0xa3, 0xad, 0xd6, 0xa5, 0xc3, 0x29, 0xb6, 0x12, 0xdf, 0x1a, 0x24,
0xc8, 0xcf, 0x2b, 0x4e, 0x15, 0x92, 0xe7, 0xe2, 0x2e, 0x80, 0x88, 0xc6, 0x67, 0xa7, 0x56, 0x82, 0xc7, 0x4f, 0x60, 0xdf, 0xe8, 0x45, 0xa1, 0xca, 0x1a, 0x13, 0x96, 0xb2, 0xfc, 0x40, 0x0e, 0x8d,
0xf5, 0xf0, 0x04, 0x71, 0x15, 0xc0, 0x15, 0x1a, 0x95, 0x22, 0x5b, 0x13, 0x0c, 0x71, 0x9a, 0x50, 0x5e, 0xdc, 0x97, 0x35, 0xf2, 0x0b, 0x38, 0x54, 0x4d, 0x3d, 0x43, 0x5b, 0xe8, 0x65, 0xf1, 0x8a,
0x02, 0x17, 0x3b, 0xcc, 0x46, 0x4d, 0x3d, 0xec, 0xee, 0xd3, 0xc3, 0x70, 0x9c, 0x94, 0x16, 0x31, 0x2d, 0x25, 0x83, 0x94, 0xe5, 0xbb, 0x32, 0xf6, 0xe9, 0xc3, 0x72, 0x8a, 0x2d, 0x65, 0xef, 0x0c,
0x4a, 0x21, 0xee, 0x4a, 0x62, 0x03, 0x7b, 0xde, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0xb3, 0x09, 0x78, 0xa8, 0x25, 0xa3, 0x15, 0x21, 0x1f, 0x41, 0x64, 0x7c, 0xfa, 0x39, 0xda, 0xa9, 0xe3, 0xdb,
0xb6, 0x16, 0x1c, 0x01, 0x00, 0x00, 0x41, 0xc2, 0x24, 0x98, 0x6f, 0x98, 0xdf, 0x41, 0x1c, 0x40, 0xde, 0x1f, 0x4d, 0x46, 0xe2, 0xef,
0x0f, 0x88, 0xfe, 0x1a, 0x92, 0x51, 0xaf, 0xa1, 0xec, 0x1a, 0xa2, 0xe0, 0x8c, 0x9f, 0xff, 0xd2,
0xb2, 0x74, 0x27, 0x8f, 0x7f, 0x4c, 0x4c, 0x9a, 0xb0, 0x8b, 0x27, 0xdf, 0x1f, 0x7f, 0x81, 0xe1,
0x57, 0x2d, 0x97, 0xff, 0xbf, 0x61, 0x83, 0x9e, 0x5e, 0x6d, 0x83, 0xfa, 0x56, 0x66, 0x7b, 0xdd,
0x86, 0x6e, 0x3e, 0x02, 0x00, 0x00, 0xff, 0xff, 0x28, 0x1a, 0xa8, 0xa6, 0xc1, 0x01, 0x00, 0x00,
} }
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.

View File

@@ -8,8 +8,14 @@ service PrivateKeyService {
message PrivateKeyRequest { message PrivateKeyRequest {
string pod_name = 1; string pod_name = 1;
uint64 number_of_keys = 2;
} }
message PrivateKeyResponse { message PrivateKeyResponse {
bytes private_key = 1; bytes private_key = 1 [deprecated=true];
PrivateKeys private_keys = 2;
}
message PrivateKeys {
repeated bytes private_keys = 1;
} }

View File

@@ -13,13 +13,13 @@ pods.
Workflow for bootstraping a validator pod Workflow for bootstraping a validator pod
1. Request a private key from the pk manager. 1. Request `n` private keys from the pk manager.
1. If an unallocated private key exists (from previously terminated pod), assign 1. If unallocated private keys exist (from previously terminated pods), assign
to the requesting pod. to the requesting pod.
1. If all available private keys are in use, generate a new private key, and 1. If there are not at least `n` keys not in use, generate new private keys,
make the deposit on behalf of this newly generated private key. and make the deposits on behalf of these newly generated private keys.
1. Write the assignment to some persistent datastore and fulfill the request. 1. Write the key allocations to a persistent datastore and fulfill the request.
1. The validator uses this private key to act as a deposited validator in the 1. The client uses these private keys to act as deposited validators in the
system. system.
## Server ## Server
@@ -27,6 +27,40 @@ Workflow for bootstraping a validator pod
The server manages the private key database, allocates new private keys, makes The server manages the private key database, allocates new private keys, makes
validator deposits, and fulfills requests from pods for private key allocation. validator deposits, and fulfills requests from pods for private key allocation.
### Database structure
There are two buckets for the server, unallocated keys and allocated keys.
Unallocated keys bucket:
| key | value |
|-------------|-------|
| private key | nil |
Allocated keys bucket:
| key | value |
|----------|----------------------|
| pod name | list of private keys |
### Key management design
There are two types of operations with regards to private keys:
- Allocate(podName, keys)
- UnallocateAllKeys(podName)
Allocating keys will first check and attempt to recycle existing, unused keys.
If there are no unused keys available (or not enough), new keys are deposited.
Unallocating keys happens when a pod is destroyed. This should return all of
that's pods' keys to the unallocated keys bucket.
### Assignments HTTP Page `/assignments`
The server exposes an HTTP page which maps pod names to public keys.
This may be useful for determining which logs to follow for a given validator.
## Client ## Client
The client makes the private key request with a given pod name and generates a The client makes the private key request with a given pod name and generates a

View File

@@ -17,6 +17,7 @@ var (
podName = flag.String("pod-name", "", "The name of the pod running this tool") podName = flag.String("pod-name", "", "The name of the pod running this tool")
keystoreDir = flag.String("keystore-dir", "", "The directory to generate keystore with received validator key") keystoreDir = flag.String("keystore-dir", "", "The directory to generate keystore with received validator key")
password = flag.String("keystore-password", "", "The password to unlock the new keystore") password = flag.String("keystore-password", "", "The password to unlock the new keystore")
numKeys = flag.Uint64("keys", 1, "The number of keys to request")
) )
func main() { func main() {
@@ -34,25 +35,30 @@ func main() {
client := pb.NewPrivateKeyServiceClient(conn) client := pb.NewPrivateKeyServiceClient(conn)
resp, err := client.Request(ctx, &pb.PrivateKeyRequest{PodName: *podName}) resp, err := client.Request(ctx, &pb.PrivateKeyRequest{
PodName: *podName,
NumberOfKeys: *numKeys,
})
if err != nil { if err != nil {
panic(err) panic(err)
} }
pk, err := bls.SecretKeyFromBytes(resp.PrivateKey) for i, privateKey := range resp.PrivateKeys.PrivateKeys {
if err != nil { pk, err := bls.SecretKeyFromBytes(privateKey)
panic(err) if err != nil {
} panic(err)
}
k := &keystore.Key{ k := &keystore.Key{
PublicKey: pk.PublicKey(), PublicKey: pk.PublicKey(),
SecretKey: pk, SecretKey: pk,
} }
validatorKeyFile := *keystoreDir + params.BeaconConfig().ValidatorPrivkeyFileName validatorKeyFile := *keystoreDir + params.BeaconConfig().ValidatorPrivkeyFileName + "-" + fmt.Sprintf("%d", i)
if err := store.StoreKey(validatorKeyFile, k, *password); err != nil { if err := store.StoreKey(validatorKeyFile, k, *password); err != nil {
panic(err) panic(err)
} }
fmt.Printf("New key written to %s\n", validatorKeyFile) fmt.Printf("New key written to %s\n", validatorKeyFile)
}
} }

View File

@@ -5,6 +5,7 @@ load("@io_bazel_rules_docker//container:container.bzl", "container_push")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"allocations.go",
"db.go", "db.go",
"logger.go", "logger.go",
"main.go", "main.go",
@@ -16,6 +17,7 @@ go_library(
deps = [ deps = [
"//contracts/deposit-contract:go_default_library", "//contracts/deposit-contract:go_default_library",
"//proto/cluster:go_default_library", "//proto/cluster:go_default_library",
"//shared/bls:go_default_library",
"//shared/keystore:go_default_library", "//shared/keystore:go_default_library",
"//shared/prometheus:go_default_library", "//shared/prometheus:go_default_library",
"//shared/ssz:go_default_library", "//shared/ssz:go_default_library",
@@ -25,6 +27,7 @@ go_library(
"@com_github_ethereum_go_ethereum//crypto:go_default_library", "@com_github_ethereum_go_ethereum//crypto:go_default_library",
"@com_github_ethereum_go_ethereum//ethclient:go_default_library", "@com_github_ethereum_go_ethereum//ethclient:go_default_library",
"@com_github_ethereum_go_ethereum//rpc:go_default_library", "@com_github_ethereum_go_ethereum//rpc:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library", "@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library",
@@ -52,6 +55,7 @@ go_test(
go_image( go_image(
name = "image", name = "image",
srcs = [ srcs = [
"allocations.go",
"db.go", "db.go",
"logger.go", "logger.go",
"main.go", "main.go",
@@ -63,12 +67,14 @@ go_image(
deps = [ deps = [
"//contracts/deposit-contract:go_default_library", "//contracts/deposit-contract:go_default_library",
"//proto/cluster:go_default_library", "//proto/cluster:go_default_library",
"//shared/bls:go_default_library",
"//shared/keystore:go_default_library", "//shared/keystore:go_default_library",
"//shared/prometheus:go_default_library", "//shared/prometheus:go_default_library",
"//shared/ssz:go_default_library", "//shared/ssz:go_default_library",
"@com_github_boltdb_bolt//:go_default_library", "@com_github_boltdb_bolt//:go_default_library",
"@com_github_ethereum_go_ethereum//accounts/abi/bind:go_default_library", "@com_github_ethereum_go_ethereum//accounts/abi/bind:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_ethereum_go_ethereum//crypto:go_default_library", "@com_github_ethereum_go_ethereum//crypto:go_default_library",
"@com_github_ethereum_go_ethereum//ethclient:go_default_library", "@com_github_ethereum_go_ethereum//ethclient:go_default_library",
"@com_github_ethereum_go_ethereum//rpc:go_default_library", "@com_github_ethereum_go_ethereum//rpc:go_default_library",

View File

@@ -0,0 +1,37 @@
package main
import (
"fmt"
"io"
"net/http"
)
func (s *server) serveAllocationsHTTPPage() {
mux := http.NewServeMux()
mux.HandleFunc("/allocations", func(w http.ResponseWriter, _ *http.Request) {
res := ""
a, err := s.db.Allocations()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
if _, err := io.WriteString(w, err.Error()); err != nil {
log.Error(err)
}
return
}
for podName, pubkeys := range a {
for _, pk := range pubkeys {
res += fmt.Sprintf("%s=%#x\n", podName, pk)
}
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusOK)
if _, err := io.WriteString(w, res); err != nil {
log.Error(err)
}
})
srv := &http.Server{Addr: ":8080", Handler: mux}
go srv.ListenAndServe()
log.Info("Serving allocations page at :8080/allocations")
}

View File

@@ -3,13 +3,15 @@ package main
import ( import (
"bytes" "bytes"
"context" "context"
"errors"
"path" "path"
"time" "time"
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
pb "github.com/prysmaticlabs/prysm/proto/cluster"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/keystore" "github.com/prysmaticlabs/prysm/shared/keystore"
) )
@@ -55,8 +57,20 @@ func newDB(dbPath string) *db {
} }
if err := boltdb.View(func(tx *bolt.Tx) error { if err := boltdb.View(func(tx *bolt.Tx) error {
keys := tx.Bucket(assignedPkBucket).Stats().KeyN keys := 0
// Iterate over all of the pod assigned keys (one to many).
c := tx.Bucket(assignedPkBucket).Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
pks := &pb.PrivateKeys{}
if err := proto.Unmarshal(v, pks); err != nil {
return err
}
keys += len(pks.PrivateKeys)
}
assignedPkCount.Set(float64(keys)) assignedPkCount.Set(float64(keys))
// Add the unassigned keys count (one to one).
keys += tx.Bucket(unassignedPkBucket).Stats().KeyN keys += tx.Bucket(unassignedPkBucket).Stats().KeyN
allocatedPkCount.Add(float64(keys)) allocatedPkCount.Add(float64(keys))
return nil return nil
@@ -67,33 +81,36 @@ func newDB(dbPath string) *db {
return &db{db: boltdb} return &db{db: boltdb}
} }
// UnallocatedPK returns the first unassigned private key, if any are // UnallocatedPKs returns unassigned private keys, if any are available.
// available. func (d *db) UnallocatedPKs(_ context.Context, numKeys uint64) (*pb.PrivateKeys, error) {
func (d *db) UnallocatedPK(_ context.Context) ([]byte, error) { pks := &pb.PrivateKeys{}
var pk []byte
if err := d.db.View(func(tx *bolt.Tx) error { if err := d.db.View(func(tx *bolt.Tx) error {
c := tx.Bucket(unassignedPkBucket).Cursor() c := tx.Bucket(unassignedPkBucket).Cursor()
k, _ := c.First() i := uint64(0)
for k, _ := c.First(); k != nil && i < numKeys; k, _ = c.Next() {
pks.PrivateKeys = append(pks.PrivateKeys, k)
i++
}
pk = k
return nil return nil
}); err != nil { }); err != nil {
return nil, err return nil, err
} }
return pk, nil return pks, nil
} }
// PodPK returns an assigned private key to the given pod name, if one exists. // PodPK returns an assigned private key to the given pod name, if one exists.
func (d *db) PodPK(_ context.Context, podName string) ([]byte, error) { func (d *db) PodPKs(_ context.Context, podName string) (*pb.PrivateKeys, error) {
var pk []byte pks := &pb.PrivateKeys{}
if err := d.db.View(func(tx *bolt.Tx) error { if err := d.db.View(func(tx *bolt.Tx) error {
pk = tx.Bucket(assignedPkBucket).Get([]byte(podName)) b := tx.Bucket(assignedPkBucket).Get([]byte(podName))
return nil
return proto.Unmarshal(b, pks)
}); err != nil { }); err != nil {
return nil, err return nil, err
} }
return pk, nil return pks, nil
} }
// AllocateNewPkToPod records new private key assignment in DB. // AllocateNewPkToPod records new private key assignment in DB.
@@ -129,17 +146,31 @@ func (d *db) RemovePKAssignment(_ context.Context, podName string) error {
}) })
} }
// AssignExistingPK assigns a PK from the unassigned bucket to a given pod. // AssignExistingPKs assigns a PK from the unassigned bucket to a given pod.
func (d *db) AssignExistingPK(_ context.Context, pk []byte, podName string) error { func (d *db) AssignExistingPKs(_ context.Context, pks *pb.PrivateKeys, podName string) error {
return d.db.Update(func(tx *bolt.Tx) error { return d.db.Update(func(tx *bolt.Tx) error {
if !bytes.Equal(tx.Bucket(unassignedPkBucket).Get(pk), dummyVal) { for _, pk := range pks.PrivateKeys {
return errors.New("private key not in unassigned bucket") if bytes.Equal(tx.Bucket(unassignedPkBucket).Get(pk), dummyVal) {
if err := tx.Bucket(unassignedPkBucket).Delete(pk); err != nil {
return err
}
}
} }
if err := tx.Bucket(unassignedPkBucket).Delete(pk); err != nil { assignedPkCount.Add(float64(len(pks.PrivateKeys)))
// If pod assignment exists, append to it.
if existing := tx.Bucket(assignedPkBucket).Get([]byte(podName)); existing != nil {
existingKeys := &pb.PrivateKeys{}
if err := proto.Unmarshal(existing, existingKeys); err != nil {
pks.PrivateKeys = append(pks.PrivateKeys, existingKeys.PrivateKeys...)
}
}
data, err := proto.Marshal(pks)
if err != nil {
return err return err
} }
assignedPkCount.Inc() return tx.Bucket(assignedPkBucket).Put([]byte(podName), data)
return tx.Bucket(assignedPkBucket).Put([]byte(podName), pk)
}) })
return nil return nil
@@ -159,3 +190,32 @@ func (d *db) AllocatedPodNames(_ context.Context) ([]string, error) {
} }
return podNames, nil return podNames, nil
} }
func (d *db) Allocations() (map[string][][]byte, error) {
m := make(map[string][][]byte)
if err := d.db.View(func(tx *bolt.Tx) error {
return tx.Bucket(assignedPkBucket).ForEach(func(k, v []byte) error {
pks := &pb.PrivateKeys{}
if err := proto.Unmarshal(v, pks); err != nil {
return err
}
pubkeys := make([][]byte, len(pks.PrivateKeys))
for i, pk := range pks.PrivateKeys {
k, err := bls.SecretKeyFromBytes(pk)
if err != nil {
return err
}
pubkeys[i] = k.PublicKey().Marshal()
}
m[string(k)] = pubkeys
return nil
})
}); err != nil {
// do something
return nil, err
}
return m, nil
}

View File

@@ -15,7 +15,7 @@ var (
port = flag.Int("port", 8000, "The port to server gRPC") port = flag.Int("port", 8000, "The port to server gRPC")
metricsPort = flag.Int("metrics-port", 9090, "The port to serve /metrics") metricsPort = flag.Int("metrics-port", 9090, "The port to serve /metrics")
privateKey = flag.String("private-key", "", "The private key of funder") privateKey = flag.String("private-key", "", "The private key of funder")
rpcPath = flag.String("rpc", "", "RPC address of a running ETH1 node") rpcPath = flag.String("rpc", "https://goerli.prylabs.net", "RPC address of a running ETH1 node")
depositContractAddr = flag.String("deposit-contract", "", "Address of the deposit contract") depositContractAddr = flag.String("deposit-contract", "", "Address of the deposit contract")
depositAmount = flag.Int64("deposit-amount", 0, "The amount of wei to deposit into the contract") depositAmount = flag.Int64("deposit-amount", 0, "The amount of wei to deposit into the contract")
dbPath = flag.String("db-path", "", "The file path for database storage") dbPath = flag.String("db-path", "", "The file path for database storage")
@@ -40,6 +40,7 @@ func main() {
pb.RegisterPrivateKeyServiceServer(s, srv) pb.RegisterPrivateKeyServiceServer(s, srv)
go prometheus.RunSimpleServerOrDie(fmt.Sprintf(":%d", *metricsPort)) go prometheus.RunSimpleServerOrDie(fmt.Sprintf(":%d", *metricsPort))
srv.serveAllocationsHTTPPage()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil { if err != nil {

View File

@@ -67,9 +67,6 @@ func newServer(
} }
func (s *server) makeDeposit(data []byte) error { func (s *server) makeDeposit(data []byte) error {
s.clientLock.Lock()
defer s.clientLock.Unlock()
txOps := bind.NewKeyedTransactor(s.txPk) txOps := bind.NewKeyedTransactor(s.txPk)
txOps.Value = s.depositAmount txOps.Value = s.depositAmount
txOps.GasLimit = gasLimit txOps.GasLimit = gasLimit
@@ -83,57 +80,89 @@ func (s *server) makeDeposit(data []byte) error {
} }
func (s *server) Request(ctx context.Context, req *pb.PrivateKeyRequest) (*pb.PrivateKeyResponse, error) { func (s *server) Request(ctx context.Context, req *pb.PrivateKeyRequest) (*pb.PrivateKeyResponse, error) {
pk, err := s.db.PodPK(ctx, req.PodName) s.clientLock.Lock()
if err != nil { defer s.clientLock.Unlock()
return nil, err
} if req.NumberOfKeys == 0 {
if pk != nil { req.NumberOfKeys = 1
log.WithField("pod", req.PodName).Debug("Returning existing assignment")
return &pb.PrivateKeyResponse{PrivateKey: pk}, nil
} }
pk, err = s.db.UnallocatedPK(ctx) // build the list of PKs in the following order, until the requested
// amount is ready to return.
// - PKs already assigned to the pod
// - PKs that have not yet been allocated
// - PKs that are newly initialized with deposits
pks, err := s.db.PodPKs(ctx, req.PodName)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if pk != nil { if pks != nil && len(pks.PrivateKeys) > 0 {
log.WithField("pod", req.PodName).Debug("Recycling existing private key") log.WithField("pod", req.PodName).Debug("Returning existing assignment(s)")
if err := s.db.AssignExistingPK(ctx, pk, req.PodName); err != nil { return &pb.PrivateKeyResponse{
PrivateKeys: pks,
}, nil
}
unallocated, err := s.db.UnallocatedPKs(ctx, req.NumberOfKeys)
if err != nil {
return nil, err
}
log.WithField(
"pod", req.PodName,
).WithField(
"keys", len(unallocated.PrivateKeys),
).Debug("Recycling existing private key(s)")
pks.PrivateKeys = append(pks.PrivateKeys, unallocated.PrivateKeys...)
if len(pks.PrivateKeys) < int(req.NumberOfKeys) {
c := int(req.NumberOfKeys) - len(pks.PrivateKeys)
newKeys, err := s.allocateNewKeys(ctx, req.PodName, c)
if err != nil {
return nil, err return nil, err
} }
return &pb.PrivateKeyResponse{PrivateKey: pk}, nil pks.PrivateKeys = append(pks.PrivateKeys, newKeys.PrivateKeys...)
} }
log.WithField("pod", req.PodName).Debug("Allocating a new private key") if err := s.db.AssignExistingPKs(ctx, pks, req.PodName); err != nil {
return s.allocateNewKey(ctx, req.PodName) return nil, err
}
return &pb.PrivateKeyResponse{PrivateKeys: pks}, nil
} }
func (s *server) allocateNewKey(ctx context.Context, podName string) (*pb.PrivateKeyResponse, error) { func (s *server) allocateNewKeys(ctx context.Context, podName string, numKeys int) (*pb.PrivateKeys, error) {
key, err := keystore.NewKey(rand.Reader) pks := make([][]byte, numKeys)
if err != nil {
return nil, err for i := 0; i < numKeys; i++ {
key, err := keystore.NewKey(rand.Reader)
if err != nil {
return nil, err
}
// Make the validator deposit
// NOTE: This uses the validator key as the withdrawal key
di, err := keystore.DepositInput(key /*depositKey*/, key /*withdrawalKey*/)
if err != nil {
return nil, err
}
serializedData := new(bytes.Buffer)
if err := ssz.Encode(serializedData, di); err != nil {
return nil, fmt.Errorf("could not serialize deposit data: %v", err)
}
// Do the actual deposit
if err := s.makeDeposit(serializedData.Bytes()); err != nil {
return nil, err
}
// Store in database
if err := s.db.AllocateNewPkToPod(ctx, key, podName); err != nil {
return nil, err
}
secret := key.SecretKey.Marshal()
pks[i] = secret
} }
// Make the validator deposit return &pb.PrivateKeys{PrivateKeys: pks}, nil
// NOTE: This uses the validator key as the withdrawal key
di, err := keystore.DepositInput(key /*depositKey*/, key /*withdrawalKey*/)
if err != nil {
return nil, err
}
serializedData := new(bytes.Buffer)
if err := ssz.Encode(serializedData, di); err != nil {
return nil, fmt.Errorf("could not serialize deposit data: %v", err)
}
// Do the actual deposit
if err := s.makeDeposit(serializedData.Bytes()); err != nil {
return nil, err
}
// Store in database
if err := s.db.AllocateNewPkToPod(ctx, key, podName); err != nil {
return nil, err
}
return &pb.PrivateKeyResponse{PrivateKey: key.SecretKey.Marshal()}, nil
} }