Compare commits

...

4 Commits

Author SHA1 Message Date
kasey
41008f9d3d cleanup for PR 2021-12-09 13:44:01 -06:00
kasey
b85859acec support fetching checkpoints from a beacon node 2021-12-03 12:04:41 -06:00
Kasey Kirkham
1523fbbd74 goimports 2021-12-03 12:04:04 -06:00
kasey
5edcfbd785 allow checkpoint or genesis origin; refactoring
some quick readability improvements and simplifying the logic enforcing
the startup ordering of the attestation processing routine
2021-12-02 17:41:54 -06:00
37 changed files with 1945 additions and 635 deletions

View File

@@ -0,0 +1,13 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["client.go"],
importpath = "github.com/prysmaticlabs/prysm/api/client/openapi",
visibility = ["//visibility:public"],
deps = [
"//proto/prysm/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -0,0 +1,278 @@
package openapi
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"path"
"strconv"
"time"
types "github.com/prysmaticlabs/eth2-types"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
log "github.com/sirupsen/logrus"
)
const GET_WEAK_SUBJECTIVITY_CHECKPOINT_EPOCH_PATH = "/eth/v1alpha1/beacon/weak_subjectivity_checkpoint_epoch"
const GET_WEAK_SUBJECTIVITY_CHECKPOINT_PATH = "/eth/v1alpha1/beacon/weak_subjectivity_checkpoint"
const GET_SIGNED_BLOCK_PATH = "/eth/v2/beacon/blocks"
const GET_STATE_PATH = "/eth/v2/debug/beacon/states"
// ClientOpt is a functional option for the Client type (http.Client wrapper)
type ClientOpt func(*Client)
// WithTimeout sets the .Timeout attribute of the wrapped http.Client
func WithTimeout(timeout time.Duration) ClientOpt {
return func(c *Client) {
c.c.Timeout = timeout
}
}
// Client provides a collection of helper methods for calling the beacon node OpenAPI endpoints
type Client struct {
c *http.Client
host string
scheme string
}
func (c *Client) urlForPath(methodPath string) *url.URL {
u := &url.URL{
Scheme: c.scheme,
Host: c.host,
}
u.Path = path.Join(u.Path, methodPath)
return u
}
// NewClient constructs a new client with the provided options (ex WithTimeout).
// host is the base host + port used to construct request urls. This value can be
// a URL string, or NewClient will assume an http endpoint if just `host:port` is used.
func NewClient(host string, opts ...ClientOpt) (*Client, error) {
host, err := validHostname(host)
if err != nil {
return nil, err
}
c := &Client{
c: &http.Client{},
scheme: "http",
host: host,
}
for _, o := range opts {
o(c)
}
return c, nil
}
func validHostname(h string) (string, error) {
// try to parse as url (being permissive)
u, err := url.Parse(h)
if err == nil && u.Host != "" {
return u.Host, nil
}
// try to parse as host:port
host, port, err := net.SplitHostPort(h)
if err != nil {
return "", err
}
return fmt.Sprintf("%s:%s", host, port), nil
}
type checkpointEpochResponse struct {
Epoch string
}
// GetWeakSubjectivityCheckpointEpoch retrieves the epoch for a finalized block within the weak subjectivity period.
// The main use case for method is to find the slot that can be used to fetch a block within the subjectivity period
// which can be used to sync (as an alternative to syncing from genesis).
func (c *Client) GetWeakSubjectivityCheckpointEpoch() (uint64, error) {
u := c.urlForPath(GET_WEAK_SUBJECTIVITY_CHECKPOINT_EPOCH_PATH)
r, err := c.c.Get(u.String())
if err != nil {
return 0, err
}
if r.StatusCode != http.StatusOK {
return 0, non200Err(r)
}
jsonr := &checkpointEpochResponse{}
err = json.NewDecoder(r.Body).Decode(jsonr)
if err != nil {
return 0, err
}
return strconv.ParseUint(jsonr.Epoch, 10, 64)
}
type wscResponse struct {
BlockRoot string
StateRoot string
Epoch string
}
// GetWeakSubjectivityCheckpoint calls an entirely different rpc method than GetWeakSubjectivityCheckpointEpoch
// This endpoint is much slower, because it uses stategen to generate the BeaconState at the beginning of the
// weak subjectivity epoch. This also results in a different set of state+block roots, because this endpoint currently
// uses the state's latest_block_header for the block hash, while the checkpoint sync code assumes that the block
// is from the first slot in the epoch and the state is from the subesequent slot.
func (c *Client) GetWeakSubjectivityCheckpoint() (*ethpb.WeakSubjectivityCheckpoint, error) {
u := c.urlForPath(GET_WEAK_SUBJECTIVITY_CHECKPOINT_PATH)
r, err := c.c.Get(u.String())
if err != nil {
return nil, err
}
if r.StatusCode != http.StatusOK {
return nil, non200Err(r)
}
v := &wscResponse{}
b := bytes.NewBuffer(nil)
bodyReader := io.TeeReader(r.Body, b)
err = json.NewDecoder(bodyReader).Decode(v)
if err != nil {
return nil, err
}
epoch, err := strconv.ParseUint(v.Epoch, 10, 64)
if err != nil {
return nil, err
}
blockRoot, err := base64.StdEncoding.DecodeString(v.BlockRoot)
if err != nil {
return nil, err
}
stateRoot, err := base64.StdEncoding.DecodeString(v.StateRoot)
if err != nil {
return nil, err
}
return &ethpb.WeakSubjectivityCheckpoint{
Epoch: types.Epoch(epoch),
BlockRoot: blockRoot,
StateRoot: stateRoot,
}, nil
}
// TODO: fix hardcoded pb type using sniff code
// GetBlockBySlot queries the beacon node API for the SignedBeaconBlockAltair for the given slot
func (c *Client) GetBlockBySlot(slot uint64) (*ethpb.SignedBeaconBlockAltair, error) {
blockPath := path.Join(GET_SIGNED_BLOCK_PATH, strconv.FormatUint(slot, 10))
u := c.urlForPath(blockPath)
log.Printf("requesting %s", u.String())
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/octet-stream")
r, err := c.c.Do(req)
if err != nil {
return nil, err
}
if r.StatusCode != http.StatusOK {
return nil, non200Err(r)
}
v := &ethpb.SignedBeaconBlockAltair{}
b := new(bytes.Buffer)
_, err = b.ReadFrom(r.Body)
if err != nil {
return nil, err
}
err = v.UnmarshalSSZ(b.Bytes())
return v, err
}
// GetBlockByRoot retrieves a SignedBeaconBlockAltair with the given root via the beacon node API
func (c *Client) GetBlockByRoot(blockHex string) (*ethpb.SignedBeaconBlockAltair, error) {
blockPath := path.Join(GET_SIGNED_BLOCK_PATH, blockHex)
u := c.urlForPath(blockPath)
log.Printf("requesting %s", u.String())
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/octet-stream")
r, err := c.c.Do(req)
if err != nil {
return nil, err
}
if r.StatusCode != http.StatusOK {
return nil, non200Err(r)
}
v := &ethpb.SignedBeaconBlockAltair{}
b := new(bytes.Buffer)
_, err = b.ReadFrom(r.Body)
if err != nil {
return nil, err
}
err = v.UnmarshalSSZ(b.Bytes())
return v, err
}
// GetStateByRoot retrieves a BeaconStateAltair with the given root via the beacon node API
func (c *Client) GetStateByRoot(stateHex string) (*ethpb.BeaconStateAltair, error) {
statePath := path.Join(GET_STATE_PATH, stateHex)
u := c.urlForPath(statePath)
log.Printf("requesting %s", u.String())
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/octet-stream")
r, err := c.c.Do(req)
if err != nil {
return nil, err
}
if r.StatusCode != http.StatusOK {
return nil, non200Err(r)
}
v := &ethpb.BeaconStateAltair{}
b := new(bytes.Buffer)
_, err = b.ReadFrom(r.Body)
if err != nil {
return nil, err
}
err = v.UnmarshalSSZ(b.Bytes())
return v, err
}
// GetStateBySlot retrieves a BeaconStateAltair at the given slot via the beacon node API
func (c *Client) GetStateBySlot(slot uint64) (*ethpb.BeaconStateAltair, error) {
statePath := path.Join(GET_STATE_PATH, strconv.FormatUint(slot, 10))
u := c.urlForPath(statePath)
log.Printf("requesting %s", u.String())
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/octet-stream")
r, err := c.c.Do(req)
if err != nil {
return nil, err
}
if r.StatusCode != http.StatusOK {
return nil, non200Err(r)
}
v := &ethpb.BeaconStateAltair{}
b := new(bytes.Buffer)
_, err = b.ReadFrom(r.Body)
if err != nil {
return nil, err
}
err = v.UnmarshalSSZ(b.Bytes())
return v, err
}
func non200Err(response *http.Response) error {
bodyBytes, err := ioutil.ReadAll(response.Body)
var body string
if err != nil {
body = "(Unable to read response body.)"
} else {
body = "response body:\n" + string(bodyBytes)
}
return fmt.Errorf("Got non-200 status code = %d requesting %s. %s", response.StatusCode, response.Request.URL, body)
}

View File

@@ -74,10 +74,22 @@ func (m *ApiProxyMiddleware) PrepareRequestForProxying(endpoint Endpoint, req *h
return nil
}
type ClientOption func(*http.Client)
func WithTimeout(t time.Duration) ClientOption {
return func(c *http.Client) {
c.Timeout = t
}
}
// ProxyRequest proxies the request to grpc-gateway.
func ProxyRequest(req *http.Request) (*http.Response, ErrorJson) {
func ProxyRequest(req *http.Request, opts ...ClientOption) (*http.Response, ErrorJson) {
// We do not use http.DefaultClient because it does not have any timeout.
// TODO: think about exposing this as a flag, or based on endpoint
netClient := &http.Client{Timeout: time.Minute * 2}
for _, o := range opts {
o(netClient)
}
grpcResp, err := netClient.Do(req)
if err != nil {
return nil, InternalServerErrorWithMessage(err, "could not proxy request")

View File

@@ -56,8 +56,8 @@ func TestFinalizedCheckpt_GenesisRootOk(t *testing.T) {
cp := &ethpb.Checkpoint{Root: genesisRoot[:]}
c := setupBeaconChain(t, beaconDB)
c.finalizedCheckpt = cp
c.genesisRoot = genesisRoot
assert.DeepEqual(t, c.genesisRoot[:], c.FinalizedCheckpt().Root)
c.originRoot = genesisRoot
assert.DeepEqual(t, c.originRoot[:], c.FinalizedCheckpt().Root)
}
func TestCurrentJustifiedCheckpt_CanRetrieve(t *testing.T) {
@@ -77,8 +77,8 @@ func TestJustifiedCheckpt_GenesisRootOk(t *testing.T) {
genesisRoot := [32]byte{'B'}
cp := &ethpb.Checkpoint{Root: genesisRoot[:]}
c.justifiedCheckpt = cp
c.genesisRoot = genesisRoot
assert.DeepEqual(t, c.genesisRoot[:], c.CurrentJustifiedCheckpt().Root)
c.originRoot = genesisRoot
assert.DeepEqual(t, c.originRoot[:], c.CurrentJustifiedCheckpt().Root)
}
func TestPreviousJustifiedCheckpt_CanRetrieve(t *testing.T) {
@@ -98,8 +98,8 @@ func TestPrevJustifiedCheckpt_GenesisRootOk(t *testing.T) {
cp := &ethpb.Checkpoint{Root: genesisRoot[:]}
c := setupBeaconChain(t, beaconDB)
c.prevJustifiedCheckpt = cp
c.genesisRoot = genesisRoot
assert.DeepEqual(t, c.genesisRoot[:], c.PreviousJustifiedCheckpt().Root)
c.originRoot = genesisRoot
assert.DeepEqual(t, c.originRoot[:], c.PreviousJustifiedCheckpt().Root)
}
func TestHeadSlot_CanRetrieve(t *testing.T) {

View File

@@ -46,11 +46,11 @@ func (s *Service) updateHead(ctx context.Context, balances []uint64) error {
// Get head from the fork choice service.
f := s.finalizedCheckpt
j := s.justifiedCheckpt
// To get head before the first justified epoch, the fork choice will start with genesis root
// To get head before the first justified epoch, the fork choice will start with origin root
// instead of zero hashes.
headStartRoot := bytesutil.ToBytes32(j.Root)
if headStartRoot == params.BeaconConfig().ZeroHash {
headStartRoot = s.genesisRoot
headStartRoot = s.originRoot
}
// In order to process head, fork choice store requires justified info.
@@ -277,8 +277,8 @@ func (s *Service) notifyNewHeadEvent(
newHeadStateRoot,
newHeadRoot []byte,
) error {
previousDutyDependentRoot := s.genesisRoot[:]
currentDutyDependentRoot := s.genesisRoot[:]
previousDutyDependentRoot := s.originRoot[:]
currentDutyDependentRoot := s.originRoot[:]
var previousDutyEpoch types.Epoch
currentDutyEpoch := slots.ToEpoch(newHeadSlot)

View File

@@ -158,7 +158,7 @@ func Test_notifyNewHeadEvent(t *testing.T) {
cfg: &config{
StateNotifier: notifier,
},
genesisRoot: [32]byte{1},
originRoot: [32]byte{1},
}
newHeadStateRoot := [32]byte{2}
newHeadRoot := [32]byte{3}
@@ -174,8 +174,8 @@ func Test_notifyNewHeadEvent(t *testing.T) {
Block: newHeadRoot[:],
State: newHeadStateRoot[:],
EpochTransition: false,
PreviousDutyDependentRoot: srv.genesisRoot[:],
CurrentDutyDependentRoot: srv.genesisRoot[:],
PreviousDutyDependentRoot: srv.originRoot[:],
CurrentDutyDependentRoot: srv.originRoot[:],
}
require.DeepSSZEqual(t, wanted, eventHead)
})
@@ -187,7 +187,7 @@ func Test_notifyNewHeadEvent(t *testing.T) {
cfg: &config{
StateNotifier: notifier,
},
genesisRoot: genesisRoot,
originRoot: genesisRoot,
}
epoch1Start, err := slots.EpochStart(1)
require.NoError(t, err)

View File

@@ -439,7 +439,7 @@ func (s *Service) deletePoolAtts(atts []*ethpb.Attestation) error {
// fork choice justification routine.
func (s *Service) ensureRootNotZeros(root [32]byte) [32]byte {
if root == params.BeaconConfig().ZeroHash {
return s.genesisRoot
return s.originRoot
}
return root
}

View File

@@ -733,10 +733,10 @@ func TestEnsureRootNotZeroHashes(t *testing.T) {
opts := testServiceOptsNoDB()
service, err := NewService(ctx, opts...)
require.NoError(t, err)
service.genesisRoot = [32]byte{'a'}
service.originRoot = [32]byte{'a'}
r := service.ensureRootNotZeros(params.BeaconConfig().ZeroHash)
assert.Equal(t, service.genesisRoot, r, "Did not get wanted justified root")
assert.Equal(t, service.originRoot, r, "Did not get wanted justified root")
root := [32]byte{'b'}
r = service.ensureRootNotZeros(root)
assert.Equal(t, root, r, "Did not get wanted justified root")
@@ -917,7 +917,7 @@ func TestUpdateJustifiedInitSync(t *testing.T) {
require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: gRoot[:]}))
beaconState, _ := util.DeterministicGenesisState(t, 32)
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, beaconState, gRoot))
service.genesisRoot = gRoot
service.originRoot = gRoot
currentCp := &ethpb.Checkpoint{Epoch: 1}
service.justifiedCheckpt = currentCp
newCp := &ethpb.Checkpoint{Epoch: 2, Root: gRoot[:]}

View File

@@ -7,6 +7,10 @@ import (
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"github.com/prysmaticlabs/prysm/async/event"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
@@ -14,8 +18,6 @@ import (
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
// AttestationStateFetcher allows for retrieving a beacon state corresponding to the block
@@ -103,45 +105,52 @@ func (s *Service) VerifyFinalizedConsistency(ctx context.Context, root []byte) e
}
// This routine processes fork choice attestations from the pool to account for validator votes and fork choice.
func (s *Service) processAttestationsRoutine(subscribedToStateEvents chan<- struct{}) {
func (s *Service) spawnProcessAttestationsRoutine(stateFeed *event.Feed) {
// Wait for state to be initialized.
stateChannel := make(chan *feed.Event, 1)
stateSub := s.cfg.StateNotifier.StateFeed().Subscribe(stateChannel)
subscribedToStateEvents <- struct{}{}
<-stateChannel
stateSub.Unsubscribe()
if s.genesisTime.IsZero() {
log.Warn("ProcessAttestations routine waiting for genesis time")
for s.genesisTime.IsZero() {
time.Sleep(1 * time.Second)
}
log.Warn("Genesis time received, now available to process attestations")
}
st := slots.NewSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot)
for {
stateSub := stateFeed.Subscribe(stateChannel)
go func() {
select {
case <-s.ctx.Done():
stateSub.Unsubscribe()
return
case <-st.C():
// Continue when there's no fork choice attestation, there's nothing to process and update head.
// This covers the condition when the node is still initial syncing to the head of the chain.
if s.cfg.AttPool.ForkchoiceAttestationCount() == 0 {
continue
}
s.processAttestations(s.ctx)
case <-stateChannel:
stateSub.Unsubscribe()
break
}
balances, err := s.justifiedBalances.get(s.ctx, bytesutil.ToBytes32(s.justifiedCheckpt.Root))
if err != nil {
log.Errorf("Unable to get justified balances for root %v w/ error %s", s.justifiedCheckpt.Root, err)
continue
if s.genesisTime.IsZero() {
log.Warn("ProcessAttestations routine waiting for genesis time")
for s.genesisTime.IsZero() {
time.Sleep(1 * time.Second)
}
if err := s.updateHead(s.ctx, balances); err != nil {
log.Warnf("Resolving fork due to new attestation: %v", err)
log.Warn("Genesis time received, now available to process attestations")
}
st := slots.NewSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot)
for {
select {
case <-s.ctx.Done():
return
case <-st.C():
// Continue when there's no fork choice attestation, there's nothing to process and update head.
// This covers the condition when the node is still initial syncing to the head of the chain.
if s.cfg.AttPool.ForkchoiceAttestationCount() == 0 {
continue
}
s.processAttestations(s.ctx)
balances, err := s.justifiedBalances.get(s.ctx, bytesutil.ToBytes32(s.justifiedCheckpt.Root))
if err != nil {
log.Errorf("Unable to get justified balances for root %v w/ error %s", s.justifiedCheckpt.Root, err)
continue
}
if err := s.updateHead(s.ctx, balances); err != nil {
log.Warnf("Resolving fork due to new attestation: %v", err)
}
}
}
}
}()
}
// This processes fork choice attestations from the pool to account for validator votes and fork choice.

View File

@@ -9,6 +9,8 @@ import (
"sync"
"time"
prysmTime "github.com/prysmaticlabs/prysm/time"
"github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/async/event"
@@ -45,13 +47,14 @@ const headSyncMinEpochsAfterCheckpoint = 128
// Service represents a service that handles the internal
// logic of managing the full PoS beacon chain.
type Service struct {
cfg *config
ctx context.Context
cancel context.CancelFunc
genesisTime time.Time
head *head
headLock sync.RWMutex
genesisRoot [32]byte
cfg *config
ctx context.Context
cancel context.CancelFunc
genesisTime time.Time
head *head
headLock sync.RWMutex
// originRoot is the genesis root, or weak subjectivity checkpoint root, depending on how the node is initialized
originRoot [32]byte
justifiedCheckpt *ethpb.Checkpoint
prevJustifiedCheckpt *ethpb.Checkpoint
bestJustifiedCheckpt *ethpb.Checkpoint
@@ -120,179 +123,17 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) {
// Start a blockchain service's main event loop.
func (s *Service) Start() {
beaconState := s.cfg.FinalizedStateAtStartUp
saved := s.cfg.FinalizedStateAtStartUp
// Make sure that attestation processor is subscribed and ready for state initializing event.
attestationProcessorSubscribed := make(chan struct{}, 1)
// If the chain has already been initialized, simply start the block processing routine.
if beaconState != nil && !beaconState.IsNil() {
log.Info("Blockchain data already exists in DB, initializing...")
s.genesisTime = time.Unix(int64(beaconState.GenesisTime()), 0)
s.cfg.AttService.SetGenesisTime(beaconState.GenesisTime())
if err := s.initializeChainInfo(s.ctx); err != nil {
log.Fatalf("Could not set up chain info: %v", err)
if saved != nil && !saved.IsNil() {
if err := s.startFromSavedState(saved); err != nil {
log.Fatal(err)
}
// We start a counter to genesis, if needed.
gState, err := s.cfg.BeaconDB.GenesisState(s.ctx)
if err != nil {
log.Fatalf("Could not retrieve genesis state: %v", err)
}
gRoot, err := gState.HashTreeRoot(s.ctx)
if err != nil {
log.Fatalf("Could not hash tree root genesis state: %v", err)
}
go slots.CountdownToGenesis(s.ctx, s.genesisTime, uint64(gState.NumValidators()), gRoot)
justifiedCheckpoint, err := s.cfg.BeaconDB.JustifiedCheckpoint(s.ctx)
if err != nil {
log.Fatalf("Could not get justified checkpoint: %v", err)
}
finalizedCheckpoint, err := s.cfg.BeaconDB.FinalizedCheckpoint(s.ctx)
if err != nil {
log.Fatalf("Could not get finalized checkpoint: %v", err)
}
// Resume fork choice.
s.justifiedCheckpt = ethpb.CopyCheckpoint(justifiedCheckpoint)
s.prevJustifiedCheckpt = ethpb.CopyCheckpoint(justifiedCheckpoint)
s.bestJustifiedCheckpt = ethpb.CopyCheckpoint(justifiedCheckpoint)
s.finalizedCheckpt = ethpb.CopyCheckpoint(finalizedCheckpoint)
s.prevFinalizedCheckpt = ethpb.CopyCheckpoint(finalizedCheckpoint)
s.resumeForkChoice(justifiedCheckpoint, finalizedCheckpoint)
ss, err := slots.EpochStart(s.finalizedCheckpt.Epoch)
if err != nil {
log.Fatalf("Could not get start slot of finalized epoch: %v", err)
}
h := s.headBlock().Block()
if h.Slot() > ss {
log.WithFields(logrus.Fields{
"startSlot": ss,
"endSlot": h.Slot(),
}).Info("Loading blocks to fork choice store, this may take a while.")
if err := s.fillInForkChoiceMissingBlocks(s.ctx, h, s.finalizedCheckpt, s.justifiedCheckpt); err != nil {
log.Fatalf("Could not fill in fork choice store missing blocks: %v", err)
}
}
// not attempting to save initial sync blocks here, because there shouldn't be until
// after the statefeed.Initialized event is fired (below)
if err := s.wsVerifier.VerifyWeakSubjectivity(s.ctx, s.finalizedCheckpt.Epoch); err != nil {
// Exit run time if the node failed to verify weak subjectivity checkpoint.
log.Fatalf("could not verify initial checkpoint provided for chain sync, with err=: %v", err)
}
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: s.genesisTime,
GenesisValidatorsRoot: beaconState.GenesisValidatorRoot(),
},
})
} else {
log.Info("Waiting to reach the validator deposit threshold to start the beacon chain...")
if s.cfg.ChainStartFetcher == nil {
log.Fatal("Not configured web3Service for POW chain")
return // return need for TestStartUninitializedChainWithoutConfigPOWChain.
if err := s.startFromPOWChain(); err != nil {
log.Fatal(err)
}
go func() {
stateChannel := make(chan *feed.Event, 1)
stateSub := s.cfg.StateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
<-attestationProcessorSubscribed
for {
select {
case event := <-stateChannel:
if event.Type == statefeed.ChainStarted {
data, ok := event.Data.(*statefeed.ChainStartedData)
if !ok {
log.Error("event data is not type *statefeed.ChainStartedData")
return
}
log.WithField("starttime", data.StartTime).Debug("Received chain start event")
s.processChainStartTime(s.ctx, data.StartTime)
return
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return
case err := <-stateSub.Err():
log.WithError(err).Error("Subscription to state notifier failed")
return
}
}
}()
}
go s.processAttestationsRoutine(attestationProcessorSubscribed)
}
// processChainStartTime initializes a series of deposits from the ChainStart deposits in the eth1
// deposit contract, initializes the beacon chain's state, and kicks off the beacon chain.
func (s *Service) processChainStartTime(ctx context.Context, genesisTime time.Time) {
preGenesisState := s.cfg.ChainStartFetcher.PreGenesisState()
initializedState, err := s.initializeBeaconChain(ctx, genesisTime, preGenesisState, s.cfg.ChainStartFetcher.ChainStartEth1Data())
if err != nil {
log.Fatalf("Could not initialize beacon chain: %v", err)
}
// We start a counter to genesis, if needed.
gRoot, err := initializedState.HashTreeRoot(s.ctx)
if err != nil {
log.Fatalf("Could not hash tree root genesis state: %v", err)
}
go slots.CountdownToGenesis(ctx, genesisTime, uint64(initializedState.NumValidators()), gRoot)
// We send out a state initialized event to the rest of the services
// running in the beacon node.
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: genesisTime,
GenesisValidatorsRoot: initializedState.GenesisValidatorRoot(),
},
})
}
// initializes the state and genesis block of the beacon chain to persistent storage
// based on a genesis timestamp value obtained from the ChainStart event emitted
// by the ETH1.0 Deposit Contract and the POWChain service of the node.
func (s *Service) initializeBeaconChain(
ctx context.Context,
genesisTime time.Time,
preGenesisState state.BeaconState,
eth1data *ethpb.Eth1Data) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "beacon-chain.Service.initializeBeaconChain")
defer span.End()
s.genesisTime = genesisTime
unixTime := uint64(genesisTime.Unix())
genesisState, err := transition.OptimizedGenesisBeaconState(unixTime, preGenesisState, eth1data)
if err != nil {
return nil, errors.Wrap(err, "could not initialize genesis state")
}
if err := s.saveGenesisData(ctx, genesisState); err != nil {
return nil, errors.Wrap(err, "could not save genesis data")
}
log.Info("Initialized beacon chain genesis state")
// Clear out all pre-genesis data now that the state is initialized.
s.cfg.ChainStartFetcher.ClearPreGenesisData()
// Update committee shuffled indices for genesis epoch.
if err := helpers.UpdateCommitteeCache(genesisState, 0 /* genesis epoch */); err != nil {
return nil, err
}
if err := helpers.UpdateProposerIndicesInCache(ctx, genesisState); err != nil {
return nil, err
}
s.cfg.AttService.SetGenesisTime(genesisState.GenesisTime())
return genesisState, nil
}
// Stop the blockchain service's main event loop and associated goroutines.
@@ -312,7 +153,7 @@ func (s *Service) Stop() error {
// Status always returns nil unless there is an error condition that causes
// this service to be unhealthy.
func (s *Service) Status() error {
if s.genesisRoot == params.BeaconConfig().ZeroHash {
if s.originRoot == params.BeaconConfig().ZeroHash {
return errors.New("genesis state has not been created")
}
if runtime.NumGoroutine() > s.cfg.MaxRoutines {
@@ -321,61 +162,105 @@ func (s *Service) Status() error {
return nil
}
// This gets called when beacon chain is first initialized to save genesis data (state, block, and more) in db.
func (s *Service) saveGenesisData(ctx context.Context, genesisState state.BeaconState) error {
if err := s.cfg.BeaconDB.SaveGenesisData(ctx, genesisState); err != nil {
return errors.Wrap(err, "could not save genesis data")
}
genesisBlk, err := s.cfg.BeaconDB.GenesisBlock(ctx)
if err != nil || genesisBlk == nil || genesisBlk.IsNil() {
return fmt.Errorf("could not load genesis block: %v", err)
}
genesisBlkRoot, err := genesisBlk.Block().HashTreeRoot()
func (s *Service) startFromSavedState(saved state.BeaconState) error {
log.Info("Blockchain data already exists in DB, initializing...")
s.genesisTime = time.Unix(int64(saved.GenesisTime()), 0)
s.cfg.AttService.SetGenesisTime(saved.GenesisTime())
originRoot, err := s.originRootFromSavedState(s.ctx)
if err != nil {
return errors.Wrap(err, "could not get genesis block root")
return err
}
s.originRoot = originRoot
if err := s.initializeHeadFromDB(s.ctx); err != nil {
return errors.Wrap(err, "could not set up chain info")
}
spawnCountdownIfPreGenesis(s.ctx, s.genesisTime, s.cfg.BeaconDB)
justified, err := s.cfg.BeaconDB.JustifiedCheckpoint(s.ctx)
if err != nil {
return errors.Wrap(err, "could not get justified checkpoint")
}
s.prevJustifiedCheckpt = ethpb.CopyCheckpoint(justified)
s.bestJustifiedCheckpt = ethpb.CopyCheckpoint(justified)
s.justifiedCheckpt = ethpb.CopyCheckpoint(justified)
finalized, err := s.cfg.BeaconDB.FinalizedCheckpoint(s.ctx)
if err != nil {
return errors.Wrap(err, "could not get finalized checkpoint")
}
s.prevFinalizedCheckpt = ethpb.CopyCheckpoint(finalized)
s.finalizedCheckpt = ethpb.CopyCheckpoint(finalized)
store := protoarray.New(justified.Epoch, finalized.Epoch, bytesutil.ToBytes32(finalized.Root))
s.cfg.ForkChoiceStore = store
ss, err := slots.EpochStart(s.finalizedCheckpt.Epoch)
if err != nil {
return errors.Wrap(err, "could not get start slot of finalized epoch")
}
h := s.headBlock().Block()
if h.Slot() > ss {
log.WithFields(logrus.Fields{
"startSlot": ss,
"endSlot": h.Slot(),
}).Info("Loading blocks to fork choice store, this may take a while.")
if err := s.fillInForkChoiceMissingBlocks(s.ctx, h, s.finalizedCheckpt, s.justifiedCheckpt); err != nil {
return errors.Wrap(err, "could not fill in fork choice store missing blocks")
}
}
s.genesisRoot = genesisBlkRoot
s.cfg.StateGen.SaveFinalizedState(0 /*slot*/, genesisBlkRoot, genesisState)
// Finalized checkpoint at genesis is a zero hash.
genesisCheckpoint := genesisState.FinalizedCheckpoint()
s.justifiedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint)
s.prevJustifiedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint)
s.bestJustifiedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint)
s.finalizedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint)
s.prevFinalizedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint)
if err := s.cfg.ForkChoiceStore.ProcessBlock(ctx,
genesisBlk.Block().Slot(),
genesisBlkRoot,
params.BeaconConfig().ZeroHash,
[32]byte{},
genesisCheckpoint.Epoch,
genesisCheckpoint.Epoch); err != nil {
log.Fatalf("Could not process genesis block for fork choice: %v", err)
// not attempting to save initial sync blocks here, because there shouldn't be any until
// after the statefeed.Initialized event is fired (below)
if err := s.wsVerifier.VerifyWeakSubjectivity(s.ctx, s.finalizedCheckpt.Epoch); err != nil {
// Exit run time if the node failed to verify weak subjectivity checkpoint.
return errors.Wrap(err, "could not verify initial checkpoint provided for chain sync")
}
s.setHead(genesisBlkRoot, genesisBlk, genesisState)
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: s.genesisTime,
GenesisValidatorsRoot: saved.GenesisValidatorRoot(),
},
})
s.spawnProcessAttestationsRoutine(s.cfg.StateNotifier.StateFeed())
return nil
}
// This gets called to initialize chain info variables using the finalized checkpoint stored in DB
func (s *Service) initializeChainInfo(ctx context.Context) error {
func (s *Service) originRootFromSavedState(ctx context.Context) ([32]byte, error) {
// first check if we have started from checkpoint sync and have a root
originRoot, err := s.cfg.BeaconDB.OriginCheckpointRoot(ctx)
if err == nil {
return originRoot, nil
}
// if it's an ErrNotFound, that probably just means the node was synced from genesis
if !errors.Is(err, db.ErrNotFound) {
return originRoot, errors.Wrap(err, "could not retrieve checkpoint sync chain origin data from db")
}
// if the chain started from a genesis state, we should have a value for GenesisBlock
genesisBlock, err := s.cfg.BeaconDB.GenesisBlock(ctx)
if err != nil {
return errors.Wrap(err, "could not get genesis block from db")
return originRoot, errors.Wrap(err, "could not get genesis block from db")
}
if err := helpers.BeaconBlockIsNil(genesisBlock); err != nil {
return err
return originRoot, err
}
genesisBlkRoot, err := genesisBlock.Block().HashTreeRoot()
if err != nil {
return errors.Wrap(err, "could not get signing root of genesis block")
return genesisBlkRoot, errors.Wrap(err, "could not get signing root of genesis block")
}
s.genesisRoot = genesisBlkRoot
return genesisBlkRoot, nil
}
// initializeHeadFromDB uses the finalized checkpoint and head block found in the database to set the current head
// note that this may block until stategen replays blocks between the finalized and head blocks
// if the head sync flag was specified and the gap between the finalized and head blocks is at least 128 epochs long
func (s *Service) initializeHeadFromDB(ctx context.Context) error {
finalized, err := s.cfg.BeaconDB.FinalizedCheckpoint(ctx)
if err != nil {
return errors.Wrap(err, "could not get finalized checkpoint from db")
@@ -442,11 +327,146 @@ func (s *Service) initializeChainInfo(ctx context.Context) error {
return nil
}
// This is called when a client starts from non-genesis slot. This passes last justified and finalized
// information to fork choice service to initializes fork choice store.
func (s *Service) resumeForkChoice(justifiedCheckpoint, finalizedCheckpoint *ethpb.Checkpoint) {
store := protoarray.New(justifiedCheckpoint.Epoch, finalizedCheckpoint.Epoch, bytesutil.ToBytes32(finalizedCheckpoint.Root))
s.cfg.ForkChoiceStore = store
func (s *Service) startFromPOWChain() error {
log.Info("Waiting to reach the validator deposit threshold to start the beacon chain...")
if s.cfg.ChainStartFetcher == nil {
return errors.New("not configured web3Service for POW chain")
}
go func() {
stateChannel := make(chan *feed.Event, 1)
stateSub := s.cfg.StateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
s.spawnProcessAttestationsRoutine(s.cfg.StateNotifier.StateFeed())
for {
select {
case event := <-stateChannel:
if event.Type == statefeed.ChainStarted {
data, ok := event.Data.(*statefeed.ChainStartedData)
if !ok {
log.Error("event data is not type *statefeed.ChainStartedData")
return
}
log.WithField("starttime", data.StartTime).Debug("Received chain start event")
s.onPowchainStart(s.ctx, data.StartTime)
return
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return
case err := <-stateSub.Err():
log.WithError(err).Error("Subscription to state notifier failed")
return
}
}
}()
return nil
}
// onPowchainStart initializes a series of deposits from the ChainStart deposits in the eth1
// deposit contract, initializes the beacon chain's state, and kicks off the beacon chain.
func (s *Service) onPowchainStart(ctx context.Context, genesisTime time.Time) {
preGenesisState := s.cfg.ChainStartFetcher.PreGenesisState()
initializedState, err := s.initializeBeaconChain(ctx, genesisTime, preGenesisState, s.cfg.ChainStartFetcher.ChainStartEth1Data())
if err != nil {
log.Fatalf("Could not initialize beacon chain: %v", err)
}
// We start a counter to genesis, if needed.
gRoot, err := initializedState.HashTreeRoot(s.ctx)
if err != nil {
log.Fatalf("Could not hash tree root genesis state: %v", err)
}
go slots.CountdownToGenesis(ctx, genesisTime, uint64(initializedState.NumValidators()), gRoot)
// We send out a state initialized event to the rest of the services
// running in the beacon node.
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: genesisTime,
GenesisValidatorsRoot: initializedState.GenesisValidatorRoot(),
},
})
}
// initializes the state and genesis block of the beacon chain to persistent storage
// based on a genesis timestamp value obtained from the ChainStart event emitted
// by the ETH1.0 Deposit Contract and the POWChain service of the node.
func (s *Service) initializeBeaconChain(
ctx context.Context,
genesisTime time.Time,
preGenesisState state.BeaconState,
eth1data *ethpb.Eth1Data) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "beacon-chain.Service.initializeBeaconChain")
defer span.End()
s.genesisTime = genesisTime
unixTime := uint64(genesisTime.Unix())
genesisState, err := transition.OptimizedGenesisBeaconState(unixTime, preGenesisState, eth1data)
if err != nil {
return nil, errors.Wrap(err, "could not initialize genesis state")
}
if err := s.saveGenesisData(ctx, genesisState); err != nil {
return nil, errors.Wrap(err, "could not save genesis data")
}
log.Info("Initialized beacon chain genesis state")
// Clear out all pre-genesis data now that the state is initialized.
s.cfg.ChainStartFetcher.ClearPreGenesisData()
// Update committee shuffled indices for genesis epoch.
if err := helpers.UpdateCommitteeCache(genesisState, 0 /* genesis epoch */); err != nil {
return nil, err
}
if err := helpers.UpdateProposerIndicesInCache(ctx, genesisState); err != nil {
return nil, err
}
s.cfg.AttService.SetGenesisTime(genesisState.GenesisTime())
return genesisState, nil
}
// This gets called when beacon chain is first initialized to save genesis data (state, block, and more) in db.
func (s *Service) saveGenesisData(ctx context.Context, genesisState state.BeaconState) error {
if err := s.cfg.BeaconDB.SaveGenesisData(ctx, genesisState); err != nil {
return errors.Wrap(err, "could not save genesis data")
}
genesisBlk, err := s.cfg.BeaconDB.GenesisBlock(ctx)
if err != nil || genesisBlk == nil || genesisBlk.IsNil() {
return fmt.Errorf("could not load genesis block: %v", err)
}
genesisBlkRoot, err := genesisBlk.Block().HashTreeRoot()
if err != nil {
return errors.Wrap(err, "could not get genesis block root")
}
s.originRoot = genesisBlkRoot
s.cfg.StateGen.SaveFinalizedState(0 /*slot*/, genesisBlkRoot, genesisState)
// Finalized checkpoint at genesis is a zero hash.
genesisCheckpoint := genesisState.FinalizedCheckpoint()
s.justifiedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint)
s.prevJustifiedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint)
s.bestJustifiedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint)
s.finalizedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint)
s.prevFinalizedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint)
if err := s.cfg.ForkChoiceStore.ProcessBlock(ctx,
genesisBlk.Block().Slot(),
genesisBlkRoot,
params.BeaconConfig().ZeroHash,
[32]byte{},
genesisCheckpoint.Epoch,
genesisCheckpoint.Epoch); err != nil {
log.Fatalf("Could not process genesis block for fork choice: %v", err)
}
s.setHead(genesisBlkRoot, genesisBlk, genesisState)
return nil
}
// This returns true if block has been processed before. Two ways to verify the block has been processed:
@@ -460,3 +480,20 @@ func (s *Service) hasBlock(ctx context.Context, root [32]byte) bool {
return s.cfg.BeaconDB.HasBlock(ctx, root)
}
func spawnCountdownIfPreGenesis(ctx context.Context, genesisTime time.Time, db db.HeadAccessDatabase) {
currentTime := prysmTime.Now()
if currentTime.After(genesisTime) {
return
}
gState, err := db.GenesisState(ctx)
if err != nil {
log.Fatalf("Could not retrieve genesis state: %v", err)
}
gRoot, err := gState.HashTreeRoot(ctx)
if err != nil {
log.Fatalf("Could not hash tree root genesis state: %v", err)
}
go slots.CountdownToGenesis(ctx, genesisTime, uint64(gState.NumValidators()), gRoot)
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/prysmaticlabs/prysm/async/event"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache"
b "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
@@ -283,9 +284,11 @@ func TestChainService_InitializeChainInfo(t *testing.T) {
require.NoError(t, beaconDB.SaveState(ctx, headState, genesisRoot))
require.NoError(t, beaconDB.SaveBlock(ctx, wrapper.WrappedPhase0SignedBeaconBlock(headBlock)))
require.NoError(t, beaconDB.SaveFinalizedCheckpoint(ctx, &ethpb.Checkpoint{Epoch: slots.ToEpoch(finalizedSlot), Root: headRoot[:]}))
c := &Service{cfg: &config{BeaconDB: beaconDB, StateGen: stategen.New(beaconDB)}}
c.cfg.FinalizedStateAtStartUp = headState
require.NoError(t, c.initializeChainInfo(ctx))
attSrv, err := attestations.NewService(ctx, &attestations.Config{})
require.NoError(t, err)
c, err := NewService(ctx, WithDatabase(beaconDB), WithStateGen(stategen.New(beaconDB)), WithAttestationService(attSrv), WithStateNotifier(&mock.MockStateNotifier{}), WithFinalizedStateAtStartUp(headState))
require.NoError(t, err)
require.NoError(t, c.startFromSavedState(headState))
headBlk, err := c.HeadBlock(ctx)
require.NoError(t, err)
assert.DeepEqual(t, headBlock, headBlk.Proto(), "Head block incorrect")
@@ -298,7 +301,7 @@ func TestChainService_InitializeChainInfo(t *testing.T) {
if !bytes.Equal(headRoot[:], r) {
t.Error("head slot incorrect")
}
assert.Equal(t, genesisRoot, c.genesisRoot, "Genesis block root incorrect")
assert.Equal(t, genesisRoot, c.originRoot, "Genesis block root incorrect")
}
func TestChainService_InitializeChainInfo_SetHeadAtGenesis(t *testing.T) {
@@ -324,12 +327,15 @@ func TestChainService_InitializeChainInfo_SetHeadAtGenesis(t *testing.T) {
require.NoError(t, beaconDB.SaveState(ctx, headState, headRoot))
require.NoError(t, beaconDB.SaveState(ctx, headState, genesisRoot))
require.NoError(t, beaconDB.SaveBlock(ctx, wrapper.WrappedPhase0SignedBeaconBlock(headBlock)))
c := &Service{cfg: &config{BeaconDB: beaconDB, StateGen: stategen.New(beaconDB)}}
require.NoError(t, c.initializeChainInfo(ctx))
attSrv, err := attestations.NewService(ctx, &attestations.Config{})
require.NoError(t, err)
c, err := NewService(ctx, WithDatabase(beaconDB), WithStateGen(stategen.New(beaconDB)), WithAttestationService(attSrv), WithStateNotifier(&mock.MockStateNotifier{}))
require.NoError(t, err)
require.NoError(t, c.startFromSavedState(headState))
s, err := c.HeadState(ctx)
require.NoError(t, err)
assert.DeepSSZEqual(t, headState.InnerStateUnsafe(), s.InnerStateUnsafe(), "Head state incorrect")
assert.Equal(t, genesisRoot, c.genesisRoot, "Genesis block root incorrect")
assert.Equal(t, genesisRoot, c.originRoot, "Genesis block root incorrect")
assert.DeepEqual(t, genesis, c.head.block.Proto())
}
@@ -381,13 +387,15 @@ func TestChainService_InitializeChainInfo_HeadSync(t *testing.T) {
Root: finalizedRoot[:],
}))
c := &Service{cfg: &config{BeaconDB: beaconDB, StateGen: stategen.New(beaconDB)}}
c.cfg.FinalizedStateAtStartUp = headState
require.NoError(t, c.initializeChainInfo(ctx))
attSrv, err := attestations.NewService(ctx, &attestations.Config{})
require.NoError(t, err)
c, err := NewService(ctx, WithDatabase(beaconDB), WithStateGen(stategen.New(beaconDB)), WithAttestationService(attSrv), WithStateNotifier(&mock.MockStateNotifier{}), WithFinalizedStateAtStartUp(headState))
require.NoError(t, err)
require.NoError(t, c.startFromSavedState(headState))
s, err := c.HeadState(ctx)
require.NoError(t, err)
assert.DeepSSZEqual(t, headState.InnerStateUnsafe(), s.InnerStateUnsafe(), "Head state incorrect")
assert.Equal(t, genesisRoot, c.genesisRoot, "Genesis block root incorrect")
assert.Equal(t, genesisRoot, c.originRoot, "Genesis block root incorrect")
// Since head sync is not triggered, chain is initialized to the last finalization checkpoint.
assert.DeepEqual(t, finalizedBlock, c.head.block.Proto())
assert.LogsContain(t, hook, "resetting head from the checkpoint ('--head-sync' flag is ignored)")
@@ -404,11 +412,11 @@ func TestChainService_InitializeChainInfo_HeadSync(t *testing.T) {
require.NoError(t, beaconDB.SaveHeadBlockRoot(ctx, headRoot))
hook.Reset()
require.NoError(t, c.initializeChainInfo(ctx))
require.NoError(t, c.initializeHeadFromDB(ctx))
s, err = c.HeadState(ctx)
require.NoError(t, err)
assert.DeepSSZEqual(t, headState.InnerStateUnsafe(), s.InnerStateUnsafe(), "Head state incorrect")
assert.Equal(t, genesisRoot, c.genesisRoot, "Genesis block root incorrect")
assert.Equal(t, genesisRoot, c.originRoot, "Genesis block root incorrect")
// Head slot is far beyond the latest finalized checkpoint, head sync is triggered.
assert.DeepEqual(t, headBlock, c.head.block.Proto())
assert.LogsContain(t, hook, "Regenerating state from the last checkpoint at slot 225")
@@ -478,7 +486,7 @@ func TestProcessChainStartTime_ReceivedFeed(t *testing.T) {
stateChannel := make(chan *feed.Event, 1)
stateSub := service.cfg.StateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
service.processChainStartTime(context.Background(), time.Now())
service.onPowchainStart(context.Background(), time.Now())
stateEvent := <-stateChannel
require.Equal(t, int(stateEvent.Type), statefeed.Initialized)

View File

@@ -5,6 +5,7 @@ go_library(
srcs = [
"alias.go",
"db.go",
"errors.go",
"log.go",
"restore.go",
],

View File

@@ -0,0 +1,5 @@
package db
import "github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
var ErrNotFound = kv.ErrNotFound

View File

@@ -50,6 +50,9 @@ type ReadOnlyDatabase interface {
DepositContractAddress(ctx context.Context) ([]byte, error)
// Powchain operations.
PowchainData(ctx context.Context) (*v2.ETH1ChainData, error)
// Origin checkpoint sync support
OriginCheckpointRoot(ctx context.Context) ([32]byte, error)
}
// NoHeadAccessDatabase defines a struct without access to chain head data.
@@ -92,6 +95,9 @@ type HeadAccessDatabase interface {
LoadGenesis(ctx context.Context, r io.Reader) error
SaveGenesisData(ctx context.Context, state state.BeaconState) error
EnsureEmbeddedGenesis(ctx context.Context) error
// db support for checkpoint sync
SaveCheckpointInitialState(ctx context.Context, state io.Reader, block io.Reader) error
}
// SlasherDatabase interface for persisting data related to detecting slashable offenses on Ethereum.

View File

@@ -10,6 +10,7 @@ go_library(
"checkpoint.go",
"deposit_contract.go",
"encoding.go",
"error.go",
"finalized_block_roots.go",
"genesis.go",
"kv.go",
@@ -24,6 +25,7 @@ go_library(
"state_summary.go",
"state_summary_cache.go",
"utils.go",
"wss.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/db/kv",
visibility = [
@@ -79,6 +81,7 @@ go_test(
"checkpoint_test.go",
"deposit_contract_test.go",
"encoding_test.go",
"error_test.go",
"finalized_block_roots_test.go",
"genesis_test.go",
"init_test.go",

View File

@@ -46,6 +46,24 @@ func (s *Store) Block(ctx context.Context, blockRoot [32]byte) (block.SignedBeac
return blk, err
}
func (s *Store) OriginCheckpointRoot(ctx context.Context) ([32]byte, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.OriginCheckpointRoot")
defer span.End()
var root [32]byte
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(blocksBucket)
rootSlice := bkt.Get(originCheckpointBlockKey)
if rootSlice == nil {
return ErrNotFoundOriginCheckpoint
}
copy(root[:], rootSlice)
return nil
})
return root, err
}
// HeadBlock returns the latest canonical block in the Ethereum Beacon Chain.
func (s *Store) HeadBlock(ctx context.Context) (block.SignedBeaconBlock, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.HeadBlock")
@@ -336,6 +354,17 @@ func (s *Store) SaveGenesisBlockRoot(ctx context.Context, blockRoot [32]byte) er
})
}
// SaveCheckpointInitialBlockRoot saves the latest block header from the weak subjectivity
// initial sync state.
func (s *Store) SaveCheckpointInitialBlockRoot(ctx context.Context, blockRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveCheckpointInitialBlockRoot")
defer span.End()
return s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(blocksBucket)
return bucket.Put(originCheckpointBlockKey, blockRoot[:])
})
}
// HighestSlotBlocksBelow returns the block with the highest slot below the input slot from the db.
func (s *Store) HighestSlotBlocksBelow(ctx context.Context, slot types.Slot) ([]block.SignedBeaconBlock, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.HighestSlotBlocksBelow")

View File

@@ -0,0 +1,31 @@
package kv
import "errors"
var ErrNotFound = errors.New("not found in db")
var ErrNotFoundOriginCheckpoint = WrapDBError(ErrNotFound, "OriginCheckpointRoot")
var ErrNotFoundFinalizedCheckpoint = WrapDBError(ErrNotFound, "FinalizedCheckpoint")
func WrapDBError(e error, outer string) error {
return DBError{
Wraps: e,
Outer: errors.New(outer),
}
}
type DBError struct {
Wraps error
Outer error
}
func (e DBError) Error() string {
es := e.Outer.Error()
if e.Wraps != nil {
es += ": " + e.Wraps.Error()
}
return es
}
func (e DBError) Unwrap() error {
return e.Wraps
}

View File

@@ -0,0 +1,24 @@
package kv
import (
"errors"
"testing"
)
func TestWrappedSentinelError(t *testing.T) {
e := ErrNotFoundOriginCheckpoint
if !errors.Is(e, ErrNotFoundOriginCheckpoint) {
t.Error("expected that a copy of ErrNotFoundOriginCheckpoint would have an is-a relationship")
}
outer := errors.New("wrapped error")
e2 := DBError{Wraps: ErrNotFoundOriginCheckpoint, Outer: outer}
if !errors.Is(e2, ErrNotFoundOriginCheckpoint) {
t.Error("expected that errors.Is would know DBError wraps ErrNotFoundOriginCheckpoint")
}
// test that the innermost not found error is detected
if !errors.Is(e2, ErrNotFound) {
t.Error("expected that errors.Is would know ErrNotFoundOriginCheckpoint wraps ErrNotFound")
}
}

View File

@@ -46,6 +46,8 @@ var (
// Altair key used to identify object is altair compatible.
// Objects that are only compatible with altair should be prefixed with such key.
altairKey = []byte("altair")
// block root included in the beacon state used by weak subjectivity initial sync
originCheckpointBlockKey = []byte("checkpoint-initial-block-root")
// Deprecated: This index key was migrated in PR 6461. Do not use, except for migrations.
lastArchivedIndexKey = []byte("last-archived")

85
beacon-chain/db/kv/wss.go Normal file
View File

@@ -0,0 +1,85 @@
package kv
import (
"context"
"io"
"io/ioutil"
"github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
statev2 "github.com/prysmaticlabs/prysm/beacon-chain/state/v2"
"github.com/prysmaticlabs/prysm/config/params"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
)
const SLOTS_PER_EPOCH = 32
// SaveStateToHead sets the current head state.
func (s *Store) SaveStateToHead(ctx context.Context, bs state.BeaconState) error {
return nil
}
// SaveInitialCheckpointState loads an ssz serialized BeaconState from an io.Reader
// (ex: an open file) and sets the given state to the head of the chain.
func (s *Store) SaveCheckpointInitialState(ctx context.Context, stateReader io.Reader, blockReader io.Reader) error {
// save block to database
blk := &ethpb.SignedBeaconBlockAltair{}
bb, err := ioutil.ReadAll(blockReader)
if err != nil {
return err
}
if err := blk.UnmarshalSSZ(bb); err != nil {
return errors.Wrap(err, "could not unmarshal checkpoint block")
}
wblk, err := wrapper.WrappedAltairSignedBeaconBlock(blk)
if err != nil {
return errors.Wrap(err, "could not wrap checkpoint block")
}
if err := s.SaveBlock(ctx, wblk); err != nil {
return errors.Wrap(err, "could not save checkpoint block")
}
blockRoot, err := blk.Block.HashTreeRoot()
if err != nil {
return errors.Wrap(err, "could not compute HashTreeRoot of checkpoint block")
}
bs, err := statev2.InitializeFromSSZReader(stateReader)
if err != nil {
return errors.Wrap(err, "could not initialize checkpoint state from reader")
}
if err = s.SaveState(ctx, bs, blockRoot); err != nil {
return errors.Wrap(err, "could not save state")
}
if err = s.SaveStateSummary(ctx, &ethpb.StateSummary{
Slot: bs.Slot(),
Root: blockRoot[:],
}); err != nil {
return err
}
if err = s.SaveHeadBlockRoot(ctx, blockRoot); err != nil {
return errors.Wrap(err, "could not save head block root")
}
if err = s.SaveCheckpointInitialBlockRoot(ctx, blockRoot); err != nil {
return err
}
slotEpoch, err := blk.Block.Slot.SafeDivSlot(params.BeaconConfig().SlotsPerEpoch)
if err != nil {
return err
}
chkpt := &ethpb.Checkpoint{
Epoch: types.Epoch(slotEpoch),
Root: blockRoot[:],
}
if err = s.SaveJustifiedCheckpoint(ctx, chkpt); err != nil {
return errors.Wrap(err, "could not mark checkpoint sync block as justified")
}
if err = s.SaveFinalizedCheckpoint(ctx, chkpt); err != nil {
return errors.Wrap(err, "could not mark checkpoint sync block as finalized")
}
return nil
}

View File

@@ -10,6 +10,7 @@ import (
"net/http"
"strconv"
"strings"
"time"
"github.com/prysmaticlabs/prysm/api/gateway/apimiddleware"
"github.com/prysmaticlabs/prysm/api/grpc"
@@ -21,6 +22,7 @@ type sszConfig struct {
sszPath string
fileName string
responseJson sszResponseJson
clientOpts []apimiddleware.ClientOption
}
func handleGetBeaconStateSSZ(m *apimiddleware.ApiProxyMiddleware, endpoint apimiddleware.Endpoint, w http.ResponseWriter, req *http.Request) (handled bool) {
@@ -28,6 +30,7 @@ func handleGetBeaconStateSSZ(m *apimiddleware.ApiProxyMiddleware, endpoint apimi
sszPath: "/eth/v1/debug/beacon/states/{state_id}/ssz",
fileName: "beacon_state.ssz",
responseJson: &beaconStateSSZResponseJson{},
clientOpts: []apimiddleware.ClientOption{apimiddleware.WithTimeout(time.Minute * 4)},
}
return handleGetSSZ(m, endpoint, w, req, config)
}
@@ -46,6 +49,7 @@ func handleGetBeaconStateSSZV2(m *apimiddleware.ApiProxyMiddleware, endpoint api
sszPath: "/eth/v2/debug/beacon/states/{state_id}/ssz",
fileName: "beacon_state.ssz",
responseJson: &beaconStateSSZResponseV2Json{},
clientOpts: []apimiddleware.ClientOption{apimiddleware.WithTimeout(time.Minute * 4)},
}
return handleGetSSZ(m, endpoint, w, req, config)
}
@@ -74,7 +78,7 @@ func handleGetSSZ(
apimiddleware.WriteError(w, errJson, nil)
return true
}
grpcResponse, errJson := apimiddleware.ProxyRequest(req)
grpcResponse, errJson := apimiddleware.ProxyRequest(req, config.clientOpts...)
if errJson != nil {
apimiddleware.WriteError(w, errJson, nil)
return true

View File

@@ -550,6 +550,22 @@ func (bs *Server) chainHeadRetrieval(ctx context.Context) (*ethpb.ChainHead, err
}, nil
}
// GetWeakSubjectivityCheckpointEpoch only computes the epoch for the weak subjectivity checkpoint.
func (bs *Server) GetWeakSubjectivityCheckpointEpoch(ctx context.Context, _ *emptypb.Empty) (*ethpb.WeakSubjectivityCheckpointEpoch, error) {
hs, err := bs.HeadFetcher.HeadState(ctx)
if err != nil {
return nil, status.Error(codes.Internal, "Could not get head state")
}
wsEpoch, err := helpers.LatestWeakSubjectivityEpoch(ctx, hs)
if err != nil {
return nil, status.Error(codes.Internal, "Could not get weak subjectivity epoch")
}
return &ethpb.WeakSubjectivityCheckpointEpoch{
Epoch: uint64(wsEpoch),
}, nil
}
// GetWeakSubjectivityCheckpoint retrieves weak subjectivity state root, block root, and epoch.
func (bs *Server) GetWeakSubjectivityCheckpoint(ctx context.Context, _ *emptypb.Empty) (*ethpb.WeakSubjectivityCheckpoint, error) {
hs, err := bs.HeadFetcher.HeadState(ctx)

View File

@@ -2,6 +2,8 @@ package v1
import (
"context"
"io"
"io/ioutil"
"runtime"
"sort"
@@ -30,6 +32,22 @@ var (
})
)
func InitializeFromSSZReader(r io.Reader) (*BeaconState, error) {
b, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
return InitializeFromSSZBytes(b)
}
func InitializeFromSSZBytes(marshaled []byte) (*BeaconState, error) {
st := &ethpb.BeaconState{}
if err := st.UnmarshalSSZ(marshaled); err != nil {
return nil, err
}
return InitializeFromProtoUnsafe(st)
}
// InitializeFromProto the beacon state from a protobuf representation.
func InitializeFromProto(st *ethpb.BeaconState) (*BeaconState, error) {
return InitializeFromProtoUnsafe(proto.Clone(st).(*ethpb.BeaconState))
@@ -252,6 +270,23 @@ func (b *BeaconState) IsNil() bool {
return b == nil || b.state == nil
}
// Equal does a deep equality check to another BeaconState object using HashTreeRoot
// This method is expensive because it computes HTR for 2 beacon state values!
func (bs *BeaconState) Equal(ctx context.Context, other state.BeaconState) (bool, error) {
if other == nil || other.IsNil() {
return false, nil
}
bsr, err := bs.HashTreeRoot(ctx)
if err != nil {
return false, err
}
osr, err := other.HashTreeRoot(ctx)
if err != nil {
return false, err
}
return osr == bsr, nil
}
func (b *BeaconState) rootSelector(ctx context.Context, field types.FieldIndex) ([32]byte, error) {
ctx, span := trace.StartSpan(ctx, "beaconState.rootSelector")
defer span.End()

View File

@@ -2,6 +2,8 @@ package v2
import (
"context"
"io"
"io/ioutil"
"runtime"
"sort"
@@ -35,6 +37,22 @@ func InitializeFromProto(st *ethpb.BeaconStateAltair) (*BeaconState, error) {
return InitializeFromProtoUnsafe(proto.Clone(st).(*ethpb.BeaconStateAltair))
}
func InitializeFromSSZReader(r io.Reader) (*BeaconState, error) {
b, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
return InitializeFromSSZBytes(b)
}
func InitializeFromSSZBytes(marshaled []byte) (*BeaconState, error) {
st := &ethpb.BeaconStateAltair{}
if err := st.UnmarshalSSZ(marshaled); err != nil {
return nil, err
}
return InitializeFromProtoUnsafe(st)
}
// InitializeFromProtoUnsafe directly uses the beacon state protobuf pointer
// and sets it as the inner state of the BeaconState type.
func InitializeFromProtoUnsafe(st *ethpb.BeaconStateAltair) (*BeaconState, error) {

21
cmd/prysmctl/BUILD.bazel Normal file
View File

@@ -0,0 +1,21 @@
load("@io_bazel_rules_go//go:def.bzl", "go_binary")
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["main.go"],
importpath = "github.com/prysmaticlabs/prysm/cmd/prysmctl",
visibility = ["//visibility:private"],
deps = [
"//cmd/prysmctl/checkpoint:go_default_library",
"//cmd/prysmctl/get:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
],
)
go_binary(
name = "prysmctl",
embed = [":go_default_library"],
visibility = ["//visibility:public"],
)

View File

@@ -0,0 +1,18 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"checkpoint.go",
"latest.go",
"save.go",
],
importpath = "github.com/prysmaticlabs/prysm/cmd/prysmctl/checkpoint",
visibility = ["//visibility:public"],
deps = [
"//api/client/openapi:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
],
)

View File

@@ -0,0 +1,15 @@
package checkpoint
import "github.com/urfave/cli/v2"
var Commands = []*cli.Command{
{
Name: "checkpoint",
Aliases: []string{"cpt"},
Usage: "commands for managing checkpoint syncing",
Subcommands: []*cli.Command{
latestCmd,
saveCmd,
},
},
}

View File

@@ -0,0 +1,80 @@
package checkpoint
import (
"fmt"
"github.com/ethereum/go-ethereum/common/hexutil"
"net"
"net/url"
"time"
"github.com/prysmaticlabs/prysm/api/client/openapi"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)
var latestFlags = struct {
BeaconNodeHost string
Timeout string
}{}
var latestCmd = &cli.Command{
Name: "latest",
Usage: "Connect to a beacon-node server and print the block_root:epoch for the latest checkpoint.",
Action: cliActionLatest,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "beacon-node-host",
Usage: "host:port for beacon node to query",
Destination: &latestFlags.BeaconNodeHost,
Value: "localhost:3500",
},
&cli.StringFlag{
Name: "http-timeout",
Usage: "timeout for http requests made to beacon-node-url (uses duration format, ex: 2m31s). default: 2m",
Destination: &latestFlags.Timeout,
Value: "2m",
},
},
}
func cliActionLatest(c *cli.Context) error {
f := latestFlags
opts := make([]openapi.ClientOpt, 0)
log.Printf("--beacon-node-url=%s", f.BeaconNodeHost)
timeout, err := time.ParseDuration(f.Timeout)
if err != nil {
return err
}
opts = append(opts, openapi.WithTimeout(timeout))
validatedHost, err := validHostname(latestFlags.BeaconNodeHost)
if err != nil {
return err
}
log.Printf("host:port=%s", validatedHost)
client, err := openapi.NewClient(validatedHost, opts...)
if err != nil {
return err
}
wsc, err := client.GetWeakSubjectivityCheckpoint()
if err != nil {
return err
}
log.Print("writing weak subjectivity results to stdout")
fmt.Printf("epoch: %d\nblock_root: %s\nstate_root: %s\n", int(wsc.Epoch), hexutil.Encode(wsc.BlockRoot), hexutil.Encode(wsc.StateRoot))
return nil
}
func validHostname(h string) (string, error) {
// try to parse as url (being permissive)
u, err := url.Parse(h)
if err == nil && u.Host != "" {
return u.Host, nil
}
// try to parse as host:port
host, port, err := net.SplitHostPort(h)
if err != nil {
return "", err
}
return fmt.Sprintf("%s:%s", host, port), nil
}

View File

@@ -0,0 +1,147 @@
package checkpoint
import (
"fmt"
"os"
"time"
"github.com/prysmaticlabs/prysm/api/client/openapi"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)
const SLOTS_PER_EPOCH = 32
var saveFlags = struct {
BeaconNodeHost string
Timeout string
BlockHex string
BlockSavePath string
StateHex string
Epoch int
}{}
var saveCmd = &cli.Command{
Name: "save",
Usage: "query for the current weak subjectivity period epoch, then download the corresponding state and block. To be used for checkpoint sync.",
Action: cliActionSave,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "beacon-node-host",
Usage: "host:port for beacon node connection",
Destination: &saveFlags.BeaconNodeHost,
Value: "localhost:3500",
},
&cli.StringFlag{
Name: "http-timeout",
Usage: "timeout for http requests made to beacon-node-url (uses duration format, ex: 2m31s). default: 2m",
Destination: &saveFlags.Timeout,
Value: "4m",
},
&cli.IntFlag{
Name: "epoch",
Usage: "instead of state-root, epoch can be used to find the BeaconState for the slot at the epoch boundary.",
Destination: &saveFlags.Epoch,
},
},
}
func cliActionSave(c *cli.Context) error {
f := saveFlags
opts := make([]openapi.ClientOpt, 0)
log.Printf("--beacon-node-url=%s", f.BeaconNodeHost)
timeout, err := time.ParseDuration(f.Timeout)
if err != nil {
return err
}
opts = append(opts, openapi.WithTimeout(timeout))
client, err := openapi.NewClient(saveFlags.BeaconNodeHost, opts...)
if err != nil {
return err
}
if saveFlags.Epoch > 0 {
return saveCheckpointByEpoch(client, uint64(saveFlags.Epoch))
}
return saveCheckpoint(client)
}
func saveCheckpoint(client *openapi.Client) error {
epoch, err := client.GetWeakSubjectivityCheckpointEpoch()
if err != nil {
return err
}
log.Printf("Beacon node computes the current weak subjectivity checkpoint as epoch = %d", epoch)
return saveCheckpointByEpoch(client, epoch)
}
func saveCheckpointByEpoch(client *openapi.Client, epoch uint64) error {
slot := epoch * SLOTS_PER_EPOCH
block, err := client.GetBlockBySlot(slot)
blockRoot, err := block.Block.HashTreeRoot()
if err != nil {
return err
}
blockRootHex := fmt.Sprintf("%#x", blockRoot)
log.Printf("retrieved block at slot %d with root=%s", slot, fmt.Sprintf("%#x", blockRoot))
blockStateRoot := block.Block.StateRoot
log.Printf("retrieved block has state root %s", fmt.Sprintf("%#x", blockStateRoot))
// assigning this variable to make it extra obvious that the state slot is different
stateSlot := slot + 1
// using the state at (slot % 32 = 1) instead of the epoch boundary ensures the
// next block applied to the state will have the block at the weak subjectivity checkpoint
// as its parent, satisfying prysm's sync code current verification that the parent block is present in the db
state, err := client.GetStateBySlot(stateSlot)
if err != nil {
return err
}
stateRoot, err := state.HashTreeRoot()
if err != nil {
return err
}
log.Printf("retrieved state for checkpoint at slot %d, w/ root=%s", slot, fmt.Sprintf("%#x", stateRoot))
latestBlockRoot, err := state.LatestBlockHeader.HashTreeRoot()
if err != nil {
return err
}
// we only want to provide checkpoints+state pairs where the state integrates the checkpoint block as its latest root
// this ensures that when syncing begins from the provided state, the next block in the chain can find the
// latest block in the db.
if blockRoot == latestBlockRoot {
log.Printf("State latest_block_header root matches block root=%#x", latestBlockRoot)
} else {
return fmt.Errorf("fatal error, state latest_block_header root=%#x, does not match block root=%#x", latestBlockRoot)
}
bb, err := block.MarshalSSZ()
if err != nil {
return err
}
blockPath := fmt.Sprintf("block-%s.ssz", blockRootHex)
log.Printf("saving ssz-encoded block to to %s", blockPath)
err = os.WriteFile(blockPath, bb, 0644)
if err != nil {
return err
}
sb, err := state.MarshalSSZ()
if err != nil {
return err
}
statePath := fmt.Sprintf("state-%s.ssz", fmt.Sprintf("%#x", stateRoot))
log.Printf("saving ssz-encoded state to to %s", statePath)
err = os.WriteFile(statePath, sb, 0644)
if err != nil {
return err
}
fmt.Println("To validate that your client is using this checkpoint, specify the following flag when starting prysm:")
fmt.Printf("--weak-subjectivity-checkpoint=%s:%d\n\n", blockRootHex, epoch)
fmt.Println("To sync a new beacon node starting from the checkpoint state, you may specify the following flags (assuming the files are in your current working directory)")
fmt.Printf("--checkpoint-state=%s --checkpoint-block=%s\n", statePath, blockPath)
return nil
}

View File

@@ -0,0 +1,18 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"block.go",
"get.go",
"state.go",
],
importpath = "github.com/prysmaticlabs/prysm/cmd/prysmctl/get",
visibility = ["//visibility:public"],
deps = [
"//api/client/openapi:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
],
)

89
cmd/prysmctl/get/block.go Normal file
View File

@@ -0,0 +1,89 @@
package get
import (
"fmt"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/prysmaticlabs/prysm/api/client/openapi"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
"os"
"time"
)
var getBlockFlags = struct {
BeaconNodeHost string
Timeout string
BlockHex string
BlockSavePath string
}{}
var getBlockCmd = &cli.Command{
Name: "block",
Usage: "Retrieve ssz-encoded block data from a beacon node.",
Action: cliActionGetBlock,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "beacon-node-host",
Usage: "host:port for beacon node connection",
Destination: &getBlockFlags.BeaconNodeHost,
Required: true,
},
&cli.StringFlag{
Name: "http-timeout",
Usage: "timeout for http requests made to beacon-node-url (uses duration format, ex: 2m31s). default: 2m",
Destination: &getBlockFlags.Timeout,
Value: "2m",
},
&cli.StringFlag{
Name: "block-root",
Usage: "block root (in 0x hex string format) used to retrieve the SignedBeaconBlock for checkpoint state.",
Destination: &getBlockFlags.BlockHex,
Required: true,
},
&cli.StringFlag{
Name: "block-save-path",
Usage: "path to file where block root should be saved. defaults to `block-<block_root>.ssz`",
Destination: &getBlockFlags.BlockSavePath,
},
},
}
func cliActionGetBlock(c *cli.Context) error {
f := getBlockFlags
if f.BlockHex != "" {
}
opts := make([]openapi.ClientOpt, 0)
log.Printf("beacon-node-url=%s", f.BeaconNodeHost)
timeout, err := time.ParseDuration(f.Timeout)
if err != nil {
return err
}
opts = append(opts, openapi.WithTimeout(timeout))
client, err := openapi.NewClient(f.BeaconNodeHost, opts...)
if err != nil {
return err
}
return saveBlock(client, f.BlockHex, f.BlockSavePath)
}
func saveBlock(client *openapi.Client, root, path string) error {
block, err := client.GetBlockByRoot(root)
if err != nil {
return err
}
blockRoot, err := block.Block.HashTreeRoot()
if err != nil {
return err
}
log.Printf("retrieved block for checkpoint, w/ block (header) root=%s", hexutil.Encode(blockRoot[:]))
if path == "" {
path = fmt.Sprintf("block-%s.ssz", root)
}
log.Printf("saving to %s...", path)
blockBytes, err := block.MarshalSSZ()
if err != nil {
return err
}
return os.WriteFile(path, blockBytes, 0644)
}

14
cmd/prysmctl/get/get.go Normal file
View File

@@ -0,0 +1,14 @@
package get
import "github.com/urfave/cli/v2"
var Commands = []*cli.Command{
{
Name: "get",
Usage: "commands for retrieving objects from a running beacon node",
Subcommands: []*cli.Command{
getBlockCmd,
getStateCmd,
},
},
}

72
cmd/prysmctl/get/state.go Normal file
View File

@@ -0,0 +1,72 @@
package get
import (
"fmt"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/prysmaticlabs/prysm/api/client/openapi"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
"os"
)
var getStateFlags = struct {
BeaconNodeHost string
Timeout string
StateHex string
StateSavePath string
}{}
var getStateCmd = &cli.Command{
Name: "state",
Usage: "Download a state identified by slot or epoch",
Action: cliActionGetState,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "beacon-node-host",
Usage: "host:port for beacon node connection",
Destination: &getStateFlags.BeaconNodeHost,
Required: true,
},
&cli.StringFlag{
Name: "http-timeout",
Usage: "timeout for http requests made to beacon-node-url (uses duration format, ex: 2m31s). default: 2m",
Destination: &getStateFlags.Timeout,
Value: "2m",
},
&cli.StringFlag{
Name: "state-root",
Usage: "instead of epoch, state root (in 0x hex string format) can be used to retrieve from the beacon-node and save locally.",
Destination: &getStateFlags.StateHex,
},
&cli.StringFlag{
Name: "state-save-path",
Usage: "path to file where state root should be saved if specified. defaults to `state-<state_root>.ssz`",
Destination: &getStateFlags.StateSavePath,
},
},
}
func saveStateByRoot(client *openapi.Client, root, path string) error {
state, err := client.GetStateByRoot(root)
if err != nil {
return err
}
stateRoot, err := state.HashTreeRoot()
if err != nil {
return err
}
log.Printf("retrieved state for checkpoint, w/ root=%s", hexutil.Encode(stateRoot[:]))
if path == "" {
path = fmt.Sprintf("state-%s.ssz", root)
}
log.Printf("saving to %s...", path)
blockBytes, err := state.MarshalSSZ()
if err != nil {
return err
}
return os.WriteFile(path, blockBytes, 0644)
}
func cliActionGetState(c *cli.Context) error {
return nil
}

28
cmd/prysmctl/main.go Normal file
View File

@@ -0,0 +1,28 @@
package main
import (
"github.com/prysmaticlabs/prysm/cmd/prysmctl/get"
"os"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
"github.com/prysmaticlabs/prysm/cmd/prysmctl/checkpoint"
)
var prysmctlCommands []*cli.Command
func main() {
app := &cli.App{
Commands: prysmctlCommands,
}
err := app.Run(os.Args)
if err != nil {
log.Fatal(err)
}
}
func init() {
prysmctlCommands = append(prysmctlCommands, checkpoint.Commands...)
prysmctlCommands = append(prysmctlCommands, get.Commands...)
}

File diff suppressed because it is too large Load Diff

View File

@@ -332,6 +332,24 @@ func local_request_BeaconChain_GetWeakSubjectivityCheckpoint_0(ctx context.Conte
}
func request_BeaconChain_GetWeakSubjectivityCheckpointEpoch_0(ctx context.Context, marshaler runtime.Marshaler, client BeaconChainClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := client.GetWeakSubjectivityCheckpointEpoch(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_BeaconChain_GetWeakSubjectivityCheckpointEpoch_0(ctx context.Context, marshaler runtime.Marshaler, server BeaconChainServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := server.GetWeakSubjectivityCheckpointEpoch(ctx, &protoReq)
return msg, metadata, err
}
var (
filter_BeaconChain_ListBeaconCommittees_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)}
)
@@ -1011,6 +1029,29 @@ func RegisterBeaconChainHandlerServer(ctx context.Context, mux *runtime.ServeMux
})
mux.Handle("GET", pattern_BeaconChain_GetWeakSubjectivityCheckpointEpoch_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/ethereum.eth.v1alpha1.BeaconChain/GetWeakSubjectivityCheckpointEpoch")
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_BeaconChain_GetWeakSubjectivityCheckpointEpoch_0(rctx, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_BeaconChain_GetWeakSubjectivityCheckpointEpoch_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_BeaconChain_ListBeaconCommittees_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
@@ -1578,6 +1619,26 @@ func RegisterBeaconChainHandlerClient(ctx context.Context, mux *runtime.ServeMux
})
mux.Handle("GET", pattern_BeaconChain_GetWeakSubjectivityCheckpointEpoch_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req, "/ethereum.eth.v1alpha1.BeaconChain/GetWeakSubjectivityCheckpointEpoch")
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_BeaconChain_GetWeakSubjectivityCheckpointEpoch_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_BeaconChain_GetWeakSubjectivityCheckpointEpoch_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_BeaconChain_ListBeaconCommittees_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
@@ -1884,6 +1945,8 @@ var (
pattern_BeaconChain_GetWeakSubjectivityCheckpoint_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"eth", "v1alpha1", "beacon", "weak_subjectivity_checkpoint"}, ""))
pattern_BeaconChain_GetWeakSubjectivityCheckpointEpoch_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"eth", "v1alpha1", "beacon", "weak_subjectivity_checkpoint_epoch"}, ""))
pattern_BeaconChain_ListBeaconCommittees_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"eth", "v1alpha1", "beacon", "committees"}, ""))
pattern_BeaconChain_ListValidatorBalances_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"eth", "v1alpha1", "validators", "balances"}, ""))
@@ -1936,6 +1999,8 @@ var (
forward_BeaconChain_GetWeakSubjectivityCheckpoint_0 = runtime.ForwardResponseMessage
forward_BeaconChain_GetWeakSubjectivityCheckpointEpoch_0 = runtime.ForwardResponseMessage
forward_BeaconChain_ListBeaconCommittees_0 = runtime.ForwardResponseMessage
forward_BeaconChain_ListValidatorBalances_0 = runtime.ForwardResponseMessage

View File

@@ -160,6 +160,15 @@ service BeaconChain {
};
}
// Retrieve the epoch that the server believes should be used as a weak subjectivity checkpoint.
// With the epoch, an rpc/api client can then download the BeaconState for the first slot, and then
// also download the latest block observed in the unmarshaled state for a complete checkpoint.
rpc GetWeakSubjectivityCheckpointEpoch(google.protobuf.Empty) returns (WeakSubjectivityCheckpointEpoch) {
option (google.api.http) = {
get: "/eth/v1alpha1/beacon/weak_subjectivity_checkpoint_epoch"
};
}
// Retrieve the beacon chain committees for a given epoch.
//
// If no filter criteria is specified, the response returns
@@ -914,3 +923,8 @@ message WeakSubjectivityCheckpoint {
// The epoch of weak subjectivity checkpoint.
uint64 epoch = 3 [(ethereum.eth.ext.cast_type) = "github.com/prysmaticlabs/eth2-types.Epoch"];
}
message WeakSubjectivityCheckpointEpoch {
// The epoch of the weak subjectivity checkpoint.
uint64 epoch = 1;
}