Replace context.Background() with more appropriate context (#7136)

* Replace context.Background() with more appropriate context
* replace a few context.TODO
* Merge refs/heads/master into fix-ctx
* Update validator/accounts/v2/accounts_create.go

Co-authored-by: terence tsao <terence@prysmaticlabs.com>
* Fix tests
* fix stream tests
* gofmt
* Merge refs/heads/master into fix-ctx
* Merge refs/heads/master into fix-ctx
* Merge refs/heads/master into fix-ctx
* Merge refs/heads/master into fix-ctx
* Merge refs/heads/master into fix-ctx
* Merge refs/heads/master into fix-ctx
* Merge refs/heads/master into fix-ctx
* Merge refs/heads/master into fix-ctx
* fix conflicts and remove ctx background uses
* fix broken test
* Merge branch 'master' into fix-ctx
* imports
* Merge branch 'fix-ctx' of github.com:prysmaticlabs/prysm into fix-ctx
* Merge refs/heads/master into fix-ctx
* Merge refs/heads/master into fix-ctx
* fix conflicts
* Merge refs/heads/master into fix-ctx
* fmt
* Merge refs/heads/master into fix-ctx
* Merge refs/heads/master into fix-ctx
* Merge refs/heads/master into fix-ctx
* Merge refs/heads/master into fix-ctx
* Merge refs/heads/master into fix-ctx
* fixes tests
This commit is contained in:
Preston Van Loon
2020-09-09 04:48:52 -05:00
committed by GitHub
parent 6d83770534
commit a74cf5de90
50 changed files with 120 additions and 109 deletions

View File

@@ -111,7 +111,7 @@ func (s *Service) processAttestation(subscribedToStateEvents chan struct{}) {
case <-s.ctx.Done():
return
case <-st.C():
ctx := context.Background()
ctx := s.ctx
atts := s.attPool.ForkchoiceAttestations()
for _, a := range atts {
// Based on the spec, don't process the attestation until the subsequent slot.

View File

@@ -259,7 +259,7 @@ func (s *Service) initializeBeaconChain(
genesisTime time.Time,
preGenesisState *stateTrie.BeaconState,
eth1data *ethpb.Eth1Data) (*stateTrie.BeaconState, error) {
_, span := trace.StartSpan(context.Background(), "beacon-chain.Service.initializeBeaconChain")
ctx, span := trace.StartSpan(ctx, "beacon-chain.Service.initializeBeaconChain")
defer span.End()
s.genesisTime = genesisTime
unixTime := uint64(genesisTime.Unix())

View File

@@ -95,7 +95,7 @@ func GenesisBeaconState(deposits []*ethpb.Deposit, genesisTime uint64, eth1Data
return nil, err
}
state, err = b.ProcessPreGenesisDeposits(context.Background(), state, deposits)
state, err = b.ProcessPreGenesisDeposits(context.TODO(), state, deposits)
if err != nil {
return nil, errors.Wrap(err, "could not process validator deposits")
}

View File

@@ -36,7 +36,7 @@ func migrateArchivedIndex(tx *bolt.Tx) error {
continue
}
blk := &ethpb.SignedBeaconBlock{}
if err := decode(context.Background(), b, blk); err != nil {
if err := decode(context.TODO(), b, blk); err != nil {
return err
}
if err := tx.Bucket(stateSlotIndicesBucket).Put(bytesutil.Uint64ToBytesBigEndian(blk.Block.Slot), v); err != nil {

View File

@@ -151,7 +151,7 @@ func NewBeaconNode(cliCtx *cli.Context) (*BeaconNode, error) {
registry := shared.NewServiceRegistry()
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(cliCtx.Context)
beacon := &BeaconNode{
cliCtx: cliCtx,
ctx: ctx,
@@ -384,7 +384,7 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
}
}
svc, err := p2p.NewService(&p2p.Config{
svc, err := p2p.NewService(b.ctx, &p2p.Config{
NoDiscovery: cliCtx.Bool(cmd.NoDiscovery.Name),
StaticPeers: sliceutil.SplitCommaSeparated(cliCtx.StringSlice(cmd.StaticPeers.Name)),
BootstrapNodeAddr: bootnodeAddrs,
@@ -520,7 +520,7 @@ func (b *BeaconNode) registerSyncService() error {
return err
}
rs := prysmsync.NewRegularSync(&prysmsync.Config{
rs := prysmsync.NewRegularSync(b.ctx, &prysmsync.Config{
DB: b.db,
P2P: b.fetchP2P(),
Chain: chainService,
@@ -544,7 +544,7 @@ func (b *BeaconNode) registerInitialSyncService() error {
return err
}
is := initialsync.NewInitialSync(&initialsync.Config{
is := initialsync.NewInitialSync(b.ctx, &initialsync.Config{
DB: b.db,
Chain: chainService,
P2P: b.fetchP2P(),

View File

@@ -24,10 +24,9 @@ func (s *Service) prepareForkChoiceAtts() {
ticker := time.NewTicker(prepareForkChoiceAttsPeriod)
defer ticker.Stop()
for {
ctx := context.Background()
select {
case <-ticker.C:
if err := s.batchForkChoiceAtts(ctx); err != nil {
if err := s.batchForkChoiceAtts(s.ctx); err != nil {
log.WithError(err).Error("Could not prepare attestations for fork choice")
}
case <-s.ctx.Done():
@@ -41,7 +40,7 @@ func (s *Service) prepareForkChoiceAtts() {
// pool. Then finds the common data, aggregate and batch them for fork choice.
// The resulting attestations are saved in the fork choice pool.
func (s *Service) batchForkChoiceAtts(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "Operations.attestations.batchForkChoiceAtts")
ctx, span := trace.StartSpan(ctx, "Operations.attestations.batchForkChoiceAtts")
defer span.End()
if err := s.pool.AggregateUnaggregatedAttestations(); err != nil {

View File

@@ -1,6 +1,7 @@
package p2p
import (
"context"
"crypto/ecdsa"
"fmt"
"math/rand"
@@ -180,7 +181,7 @@ func TestStaticPeering_PeersAreAdded(t *testing.T) {
cfg.StaticPeers = staticPeers
cfg.StateNotifier = &mock.MockStateNotifier{}
cfg.NoDiscovery = true
s, err := NewService(cfg)
s, err := NewService(context.Background(), cfg)
require.NoError(t, err)
exitRoutine := make(chan bool)

View File

@@ -2,6 +2,7 @@ package p2p
import (
"bytes"
"context"
"math/rand"
"os"
"path"
@@ -86,7 +87,7 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) {
cfg.UDPPort = 14000
cfg.TCPPort = 14001
cfg.MaxPeers = 30
s, err = NewService(cfg)
s, err = NewService(context.Background(), cfg)
require.NoError(t, err)
s.genesisTime = genesisTime
s.genesisValidatorsRoot = make([]byte, 32)
@@ -174,7 +175,7 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
cfg.UDPPort = 14000
cfg.TCPPort = 14001
cfg.MaxPeers = 30
s, err = NewService(cfg)
s, err = NewService(context.Background(), cfg)
require.NoError(t, err)
s.genesisTime = genesisTime

View File

@@ -131,7 +131,7 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer
}
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnecting)
if err := reqFunc(context.Background(), conn.RemotePeer()); err != nil && err != io.EOF {
if err := reqFunc(context.TODO(), conn.RemotePeer()); err != nil && err != io.EOF {
log.WithError(err).Trace("Handshake failed")
disconnectFromPeer()
return
@@ -160,8 +160,7 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
priorState = peers.PeerDisconnected
}
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnecting)
ctx := context.Background()
if err := handler(ctx, conn.RemotePeer()); err != nil {
if err := handler(context.TODO(), conn.RemotePeer()); err != nil {
log.WithError(err).Error("Disconnect handler failed")
}
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)

View File

@@ -14,7 +14,7 @@ import (
)
func TestService_PublishToTopicConcurrentMapWrite(t *testing.T) {
s, err := NewService(&Config{})
s, err := NewService(context.Background(), &Config{})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())

View File

@@ -88,9 +88,9 @@ type Service struct {
// NewService initializes a new p2p service compatible with shared.Service interface. No
// connections are made until the Start function is called during the service registry startup.
func NewService(cfg *Config) (*Service, error) {
func NewService(ctx context.Context, cfg *Config) (*Service, error) {
var err error
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
_ = cancel // govet fix for lost cancel. Cancel is handled in service.Stop().
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: cacheNumCounters,

View File

@@ -71,7 +71,7 @@ func createHost(t *testing.T, port int) (host.Host, *ecdsa.PrivateKey, net.IP) {
}
func TestService_Stop_SetsStartedToFalse(t *testing.T) {
s, err := NewService(&Config{})
s, err := NewService(context.Background(), &Config{})
require.NoError(t, err)
s.started = true
s.dv5Listener = &mockListener{}
@@ -80,7 +80,7 @@ func TestService_Stop_SetsStartedToFalse(t *testing.T) {
}
func TestService_Stop_DontPanicIfDv5ListenerIsNotInited(t *testing.T) {
s, err := NewService(&Config{})
s, err := NewService(context.Background(), &Config{})
require.NoError(t, err)
assert.NoError(t, s.Stop())
}
@@ -92,7 +92,7 @@ func TestService_Start_OnlyStartsOnce(t *testing.T) {
TCPPort: 2000,
UDPPort: 2000,
}
s, err := NewService(cfg)
s, err := NewService(context.Background(), cfg)
require.NoError(t, err)
s.stateNotifier = &mock.MockStateNotifier{}
s.dv5Listener = &mockListener{}
@@ -193,7 +193,7 @@ func TestListenForNewNodes(t *testing.T) {
cfg.UDPPort = 14000
cfg.TCPPort = 14001
s, err = NewService(cfg)
s, err = NewService(context.Background(), cfg)
require.NoError(t, err)
s.stateNotifier = &mock.MockStateNotifier{}
exitRoutine := make(chan bool)
@@ -249,7 +249,7 @@ func TestPeer_Disconnect(t *testing.T) {
}
func TestService_JoinLeaveTopic(t *testing.T) {
s, err := NewService(&Config{})
s, err := NewService(context.Background(), &Config{})
require.NoError(t, err)
assert.Equal(t, 0, len(s.joinedTopics))

View File

@@ -79,7 +79,7 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
UDPPort: uint(port),
}
cfg.StateNotifier = &mock.MockStateNotifier{}
s, err = NewService(cfg)
s, err = NewService(context.Background(), cfg)
require.NoError(t, err)
exitRoutine := make(chan bool)
go func() {

View File

@@ -7,11 +7,11 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
)
func (s *Service) processDeposit(eth1Data *ethpb.Eth1Data, deposit *ethpb.Deposit) error {
func (s *Service) processDeposit(ctx context.Context, eth1Data *ethpb.Eth1Data, deposit *ethpb.Deposit) error {
var err error
if err := s.preGenesisState.SetEth1Data(eth1Data); err != nil {
return err
}
s.preGenesisState, err = blocks.ProcessPreGenesisDeposits(context.Background(), s.preGenesisState, []*ethpb.Deposit{deposit})
s.preGenesisState, err = blocks.ProcessPreGenesisDeposits(ctx, s.preGenesisState, []*ethpb.Deposit{deposit})
return err
}

View File

@@ -36,7 +36,7 @@ func TestProcessDeposit_OK(t *testing.T) {
eth1Data, err := testutil.DeterministicEth1Data(len(deposits))
require.NoError(t, err)
err = web3Service.processDeposit(eth1Data, deposits[0])
err = web3Service.processDeposit(context.Background(), eth1Data, deposits[0])
require.NoError(t, err, "could not process deposit")
valcount, err := helpers.ActiveValidatorCount(web3Service.preGenesisState, 0)
@@ -61,7 +61,7 @@ func TestProcessDeposit_InvalidMerkleBranch(t *testing.T) {
deposits[0].Proof = [][]byte{{'f', 'a', 'k', 'e'}}
err = web3Service.processDeposit(eth1Data, deposits[0])
err = web3Service.processDeposit(context.Background(), eth1Data, deposits[0])
require.NotNil(t, err, "No errors, when an error was expected")
want := "deposit merkle branch of deposit root did not verify for root"
@@ -99,7 +99,7 @@ func TestProcessDeposit_InvalidPublicKey(t *testing.T) {
DepositRoot: root[:],
}
err = web3Service.processDeposit(eth1Data, deposits[0])
err = web3Service.processDeposit(context.Background(), eth1Data, deposits[0])
require.NoError(t, err)
require.LogsContain(t, hook, pubKeyErr)
@@ -134,7 +134,7 @@ func TestProcessDeposit_InvalidSignature(t *testing.T) {
DepositRoot: root[:],
}
err = web3Service.processDeposit(eth1Data, deposits[0])
err = web3Service.processDeposit(context.Background(), eth1Data, deposits[0])
require.NoError(t, err)
require.LogsContain(t, hook, pubKeyErr)
@@ -166,7 +166,7 @@ func TestProcessDeposit_UnableToVerify(t *testing.T) {
proof, err := trie.MerkleProof(0)
require.NoError(t, err)
deposits[0].Proof = proof
err = web3Service.processDeposit(eth1Data, deposits[0])
err = web3Service.processDeposit(context.Background(), eth1Data, deposits[0])
require.NoError(t, err)
want := "signature did not verify"
@@ -225,7 +225,7 @@ func TestProcessDeposit_IncompleteDeposit(t *testing.T) {
deposit.Proof, err = trie.MerkleProof(i)
require.NoError(t, err)
err = web3Service.processDeposit(eth1Data, deposit)
err = web3Service.processDeposit(context.Background(), eth1Data, deposit)
require.NoError(t, err, fmt.Sprintf("Could not process deposit at %d", i))
valcount, err := helpers.ActiveValidatorCount(web3Service.preGenesisState, 0)
@@ -251,7 +251,7 @@ func TestProcessDeposit_AllDepositedSuccessfully(t *testing.T) {
for i := range keys {
eth1Data.DepositCount = uint64(i + 1)
err = web3Service.processDeposit(eth1Data, deposits[i])
err = web3Service.processDeposit(context.Background(), eth1Data, deposits[i])
require.NoError(t, err, fmt.Sprintf("Could not process deposit at %d", i))
valCount, err := helpers.ActiveValidatorCount(web3Service.preGenesisState, 0)

View File

@@ -171,7 +171,7 @@ func (s *Service) ProcessDepositLog(ctx context.Context, depositLog gethTypes.Lo
DepositRoot: root[:],
DepositCount: uint64(len(s.chainStartData.ChainstartDeposits)),
}
if err := s.processDeposit(eth1Data, deposit); err != nil {
if err := s.processDeposit(ctx, eth1Data, deposit); err != nil {
log.Errorf("Invalid deposit processed: %v", err)
validData = false
}

View File

@@ -327,7 +327,7 @@ func (s *Service) AreAllDepositsProcessed() (bool, error) {
return false, errors.Wrap(err, "could not get deposit count")
}
count := bytesutil.FromBytes8(countByte)
deposits := s.depositCache.AllDeposits(context.TODO(), nil)
deposits := s.depositCache.AllDeposits(s.ctx, nil)
if count != uint64(len(deposits)) {
return false, nil
}
@@ -606,6 +606,7 @@ func safelyHandlePanic() {
func (s *Service) handleETH1FollowDistance() {
defer safelyHandlePanic()
ctx := s.ctx
// use a 5 minutes timeout for block time, because the max mining time is 278 sec (block 7208027)
// (analyzed the time of the block from 2018-09-01 to 2019-02-13)
@@ -615,7 +616,7 @@ func (s *Service) handleETH1FollowDistance() {
log.Warn("eth1 client is not syncing")
}
if !s.chainStartData.Chainstarted {
if err := s.checkBlockNumberForChainStart(context.Background(), big.NewInt(int64(s.latestEth1Data.LastRequestedBlock))); err != nil {
if err := s.checkBlockNumberForChainStart(ctx, big.NewInt(int64(s.latestEth1Data.LastRequestedBlock))); err != nil {
s.runError = err
log.Error(err)
return
@@ -627,7 +628,7 @@ func (s *Service) handleETH1FollowDistance() {
if s.latestEth1Data.LastRequestedBlock == s.latestEth1Data.BlockHeight {
return
}
if err := s.requestBatchedLogs(context.Background()); err != nil {
if err := s.requestBatchedLogs(ctx); err != nil {
s.runError = err
log.Error(err)
return
@@ -646,6 +647,7 @@ func (s *Service) initPOWService() {
case <-s.ctx.Done():
return
default:
ctx := s.ctx
err := s.initDataFromContract()
if err != nil {
log.Errorf("Unable to retrieve data from deposit contract %v", err)
@@ -653,7 +655,7 @@ func (s *Service) initPOWService() {
continue
}
header, err := s.eth1DataFetcher.HeaderByNumber(context.Background(), nil)
header, err := s.eth1DataFetcher.HeaderByNumber(ctx, nil)
if err != nil {
log.Errorf("Unable to retrieve latest ETH1.0 chain header: %v", err)
s.retryETH1Node(err)
@@ -664,7 +666,7 @@ func (s *Service) initPOWService() {
s.latestEth1Data.BlockHash = header.Hash().Bytes()
s.latestEth1Data.BlockTime = header.Time
if err := s.processPastLogs(context.Background()); err != nil {
if err := s.processPastLogs(ctx); err != nil {
log.Errorf("Unable to process past logs %v", err)
s.retryETH1Node(err)
continue

View File

@@ -160,7 +160,7 @@ func TestGetBlock_AddsUnaggregatedAtts(t *testing.T) {
// we don't care for the purpose of this test.
var atts []*ethpb.Attestation
for i := uint64(0); len(atts) < int(params.BeaconConfig().MaxAttestations); i++ {
a, err := testutil.GenerateAttestations(beaconState, privKeys, 2, 1, true)
a, err := testutil.GenerateAttestations(beaconState, privKeys, 4, 1, true)
require.NoError(t, err)
atts = append(atts, a...)
}

View File

@@ -158,7 +158,7 @@ func (vs *Server) CanonicalHead(ctx context.Context, req *ptypes.Empty) (*ethpb.
// subscribes to an event stream triggered by the powchain service whenever the ChainStart log does
// occur in the Deposit Contract on ETH 1.0.
func (vs *Server) WaitForChainStart(req *ptypes.Empty, stream ethpb.BeaconNodeValidator_WaitForChainStartServer) error {
head, err := vs.HeadFetcher.HeadState(context.Background())
head, err := vs.HeadFetcher.HeadState(stream.Context())
if err != nil {
return status.Errorf(codes.Internal, "Could not retrieve head state: %v", err)
}
@@ -213,7 +213,7 @@ func (vs *Server) WaitForChainStart(req *ptypes.Empty, stream ethpb.BeaconNodeVa
// WaitForSynced subscribes to the state channel and ends the stream when the state channel
// indicates the beacon node has been initialized and is ready
func (vs *Server) WaitForSynced(req *ptypes.Empty, stream ethpb.BeaconNodeValidator_WaitForSyncedServer) error {
head, err := vs.HeadFetcher.HeadState(context.Background())
head, err := vs.HeadFetcher.HeadState(stream.Context())
if err != nil {
return status.Errorf(codes.Internal, "Could not retrieve head state: %v", err)
}

View File

@@ -300,6 +300,7 @@ func TestWaitForChainStart_ContextClosed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStream := mock.NewMockBeaconNodeValidator_WaitForChainStartServer(ctrl)
mockStream.EXPECT().Context().Return(ctx)
go func(tt *testing.T) {
err := Server.WaitForChainStart(&ptypes.Empty{}, mockStream)
assert.ErrorContains(tt, "Context canceled", err)
@@ -337,6 +338,7 @@ func TestWaitForChainStart_AlreadyStarted(t *testing.T) {
GenesisTime: uint64(time.Unix(0, 0).Unix()),
},
).Return(nil)
mockStream.EXPECT().Context().Return(context.Background())
assert.NoError(t, Server.WaitForChainStart(&ptypes.Empty{}, mockStream), "Could not call RPC method")
}
@@ -359,6 +361,7 @@ func TestWaitForChainStart_HeadStateDoesNotExist(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStream := mock.NewMockBeaconNodeValidator_WaitForChainStartServer(ctrl)
mockStream.EXPECT().Context().Return(context.Background())
wg := new(sync.WaitGroup)
wg.Add(1)
@@ -402,6 +405,7 @@ func TestWaitForChainStart_NotStartedThenLogFired(t *testing.T) {
GenesisTime: uint64(time.Unix(0, 0).Unix()),
},
).Return(nil)
mockStream.EXPECT().Context().Return(context.Background())
go func(tt *testing.T) {
assert.NoError(tt, Server.WaitForChainStart(&ptypes.Empty{}, mockStream))
<-exitRoutine
@@ -441,6 +445,7 @@ func TestWaitForSynced_ContextClosed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStream := mock.NewMockBeaconNodeValidator_WaitForSyncedServer(ctrl)
mockStream.EXPECT().Context().Return(context.Background())
go func(tt *testing.T) {
err := Server.WaitForSynced(&ptypes.Empty{}, mockStream)
assert.ErrorContains(tt, "Context canceled", err)
@@ -479,6 +484,7 @@ func TestWaitForSynced_AlreadySynced(t *testing.T) {
GenesisTime: uint64(time.Unix(0, 0).Unix()),
},
).Return(nil)
mockStream.EXPECT().Context().Return(context.Background())
assert.NoError(t, Server.WaitForSynced(&ptypes.Empty{}, mockStream), "Could not call RPC method")
}
@@ -506,6 +512,7 @@ func TestWaitForSynced_NotStartedThenLogFired(t *testing.T) {
GenesisTime: uint64(time.Unix(0, 0).Unix()),
},
).Return(nil)
mockStream.EXPECT().Context().Return(context.Background())
go func(tt *testing.T) {
assert.NoError(tt, Server.WaitForSynced(&ptypes.Empty{}, mockStream), "Could not call RPC method")
<-exitRoutine

View File

@@ -41,7 +41,7 @@ type batchBlockReceiverFn func(ctx context.Context, blks []*eth.SignedBeaconBloc
// after the finalized epoch, request blocks to head from some subset of peers
// where step = 1.
func (s *Service) roundRobinSync(genesis time.Time) error {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
defer s.chain.ClearCachedStates()
state.SkipSlotCache.Disable()

View File

@@ -298,6 +298,7 @@ func TestService_roundRobinSync(t *testing.T) {
},
} // no-op mock
s := &Service{
ctx: context.Background(),
chain: mc,
p2p: p,
db: beaconDB,
@@ -329,7 +330,7 @@ func TestService_processBlock(t *testing.T) {
err = beaconDB.SaveBlock(context.Background(), genesisBlk)
require.NoError(t, err)
st := testutil.NewBeaconState()
s := NewInitialSync(&Config{
s := NewInitialSync(context.Background(), &Config{
P2P: p2pt.NewTestP2P(t),
DB: beaconDB,
Chain: &mock.ChainService{
@@ -388,7 +389,7 @@ func TestService_processBlockBatch(t *testing.T) {
err = beaconDB.SaveBlock(context.Background(), genesisBlk)
require.NoError(t, err)
st := testutil.NewBeaconState()
s := NewInitialSync(&Config{
s := NewInitialSync(context.Background(), &Config{
P2P: p2pt.NewTestP2P(t),
DB: beaconDB,
Chain: &mock.ChainService{
@@ -527,6 +528,7 @@ func TestService_blockProviderScoring(t *testing.T) {
},
} // no-op mock
s := &Service{
ctx: context.Background(),
chain: mc,
p2p: p,
db: beaconDB,

View File

@@ -58,8 +58,8 @@ type Service struct {
// NewInitialSync configures the initial sync service responsible for bringing the node up to the
// latest head of the blockchain.
func NewInitialSync(cfg *Config) *Service {
ctx, cancel := context.WithCancel(context.Background())
func NewInitialSync(ctx context.Context, cfg *Config) *Service {
ctx, cancel := context.WithCancel(ctx)
return &Service{
ctx: ctx,
cancel: cancel,
@@ -192,7 +192,7 @@ func (s *Service) Resync() error {
// set it to false since we are syncing again
s.synced = false
defer func() { s.synced = true }() // Reset it at the end of the method.
headState, err := s.chain.HeadState(context.Background())
headState, err := s.chain.HeadState(s.ctx)
if err != nil {
return errors.Wrap(err, "could not retrieve head state")
}

View File

@@ -23,12 +23,11 @@ var processPendingAttsPeriod = slotutil.DivideSlotBy(2 /* twice per slot */)
// This processes pending attestation queues on every `processPendingAttsPeriod`.
func (s *Service) processPendingAttsQueue() {
ctx := context.Background()
// Prevents multiple queue processing goroutines (invoked by RunEvery) from contending for data.
mutex := new(sync.Mutex)
runutil.RunEvery(s.ctx, processPendingAttsPeriod, func() {
mutex.Lock()
if err := s.processPendingAtts(ctx); err != nil {
if err := s.processPendingAtts(s.ctx); err != nil {
log.WithError(err).Debugf("Could not process pending attestation: %v", err)
}
mutex.Unlock()

View File

@@ -28,12 +28,11 @@ const numOfTries = 5
// processes pending blocks queue on every processPendingBlocksPeriod
func (s *Service) processPendingBlocksQueue() {
ctx := context.Background()
// Prevents multiple queue processing goroutines (invoked by RunEvery) from contending for data.
locker := new(sync.Mutex)
runutil.RunEvery(s.ctx, processPendingBlocksPeriod, func() {
locker.Lock()
if err := s.processPendingBlocks(ctx); err != nil {
if err := s.processPendingBlocks(s.ctx); err != nil {
log.WithError(err).Debug("Failed to process pending blocks")
}
locker.Unlock()

View File

@@ -62,7 +62,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
topic := baseTopic + s.p2p.Encoding().ProtocolSuffix()
log := log.WithField("topic", topic)
s.p2p.SetStreamHandler(topic, func(stream network.Stream) {
ctx, cancel := context.WithTimeout(context.Background(), ttfbTimeout)
ctx, cancel := context.WithTimeout(s.ctx, ttfbTimeout)
defer cancel()
defer func() {
if err := helpers.FullClose(stream); err != nil && err.Error() != mux.ErrReset.Error() {

View File

@@ -111,9 +111,9 @@ type Service struct {
}
// NewRegularSync service.
func NewRegularSync(cfg *Config) *Service {
func NewRegularSync(ctx context.Context, cfg *Config) *Service {
rLimiter := newRateLimiter(cfg.P2P)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
r := &Service{
ctx: ctx,
cancel: cancel,

View File

@@ -110,7 +110,7 @@ func (s *Service) subscribeWithBase(base proto.Message, topic string, validator
// Pipeline decodes the incoming subscription data, runs the validation, and handles the
// message.
pipeline := func(msg *pubsub.Message) {
ctx, cancel := context.WithTimeout(context.Background(), pubsubMessageTimeout)
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
defer cancel()
ctx, span := trace.StartSpan(ctx, "sync.pubsub")
defer span.End()

View File

@@ -142,7 +142,7 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms
return pubsub.ValidationReject
}
parentState, err = state.ProcessSlots(context.Background(), parentState, blk.Block.Slot)
parentState, err = state.ProcessSlots(ctx, parentState, blk.Block.Slot)
if err != nil {
log.Errorf("Could not advance slot to calculate proposer index: %v", err)
return pubsub.ValidationIgnore

View File

@@ -26,7 +26,7 @@ func init() {
logrus.SetLevel(logrus.PanicLevel)
var err error
p, err = p2p.NewService(&p2p.Config{
p, err = p2p.NewService(context.Background(), &p2p.Config{
NoDiscovery: true,
})
if err != nil {
@@ -45,7 +45,7 @@ func init() {
if err := p.Connect(info); err != nil {
panic(errors.Wrap(err, "could not connect to peer"))
}
sync.NewRegularSync(&sync.Config{
sync.NewRegularSync(context.Background(), &sync.Config{
P2P: p,
DB: nil,
AttPool: nil,

View File

@@ -152,7 +152,7 @@ func (s *resubscribeSub) subscribe() Subscription {
retry:
for {
s.lastTry = mclockutil.Now()
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.TODO())
go func() {
rsub, err := s.fn(ctx)
sub = rsub

View File

@@ -6,12 +6,13 @@ package mock
import (
context "context"
reflect "reflect"
types "github.com/gogo/protobuf/types"
gomock "github.com/golang/mock/gomock"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
grpc "google.golang.org/grpc"
metadata "google.golang.org/grpc/metadata"
reflect "reflect"
)
// MockBeaconNodeValidatorClient is a mock of BeaconNodeValidatorClient interface

View File

@@ -75,7 +75,7 @@ func NewSlasherNode(cliCtx *cli.Context) (*SlasherNode, error) {
cmd.ConfigureSlasher(cliCtx)
registry := shared.NewServiceRegistry()
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(cliCtx.Context)
slasher := &SlasherNode{
cliCtx: cliCtx,
ctx: ctx,

View File

@@ -24,8 +24,8 @@ type ValidatorStatusMetadata struct {
}
// RunStatusCommand is the entry point to the `validator status` command.
func RunStatusCommand(pubKeys [][]byte, beaconNodeRPCProvider ethpb.BeaconNodeValidatorClient) error {
statuses, err := FetchAccountStatuses(context.Background(), beaconNodeRPCProvider, pubKeys)
func RunStatusCommand(ctx context.Context, pubKeys [][]byte, beaconNodeRPCProvider ethpb.BeaconNodeValidatorClient) error {
statuses, err := FetchAccountStatuses(ctx, beaconNodeRPCProvider, pubKeys)
if err != nil {
return errors.Wrap(err, "could not fetch account statuses from the beacon node")
}

View File

@@ -1,7 +1,6 @@
package v2
import (
"context"
"fmt"
"os"
"strings"
@@ -60,7 +59,7 @@ func SendDepositCli(cliCtx *cli.Context) error {
}
func createDepositConfig(cliCtx *cli.Context, km *derived.Keymanager) (*derived.SendDepositConfig, error) {
pubKeysBytes, err := km.FetchValidatingPublicKeys(context.Background())
pubKeysBytes, err := km.FetchValidatingPublicKeys(cliCtx.Context)
if err != nil {
return nil, errors.Wrap(err, "could not fetch validating public keys")
}

View File

@@ -76,7 +76,6 @@ type ImportAccountsConfig struct {
// new accounts into the Prysm validator wallet. This uses the CLI to extract
// values necessary to run the function.
func ImportAccountsCli(cliCtx *cli.Context) error {
ctx := context.Background()
au := aurora.NewAurora(true)
wallet, err := OpenWalletOrElseCli(cliCtx, func(cliCtx *cli.Context) (*Wallet, error) {
cfg, err := extractWalletCreationConfigFromCli(cliCtx, v2keymanager.Direct)
@@ -137,7 +136,7 @@ func ImportAccountsCli(cliCtx *cli.Context) error {
// specify this value in their filename.
sort.Sort(byDerivationPath(filesInDir))
for _, name := range filesInDir {
keystore, err := readKeystoreFile(ctx, filepath.Join(keysDir, name))
keystore, err := readKeystoreFile(cliCtx.Context, filepath.Join(keysDir, name))
if err != nil && strings.Contains(err.Error(), "could not decode keystore json") {
continue
} else if err != nil {
@@ -146,7 +145,7 @@ func ImportAccountsCli(cliCtx *cli.Context) error {
keystoresImported = append(keystoresImported, keystore)
}
} else {
keystore, err := readKeystoreFile(ctx, keysDir)
keystore, err := readKeystoreFile(cliCtx.Context, keysDir)
if err != nil {
return errors.Wrap(err, "could not import keystore")
}

View File

@@ -41,7 +41,7 @@ func ListAccountsCli(cliCtx *cli.Context) error {
if !ok {
return errors.New("could not assert keymanager interface to concrete type")
}
if err := listDirectKeymanagerAccounts(showDepositData, km); err != nil {
if err := listDirectKeymanagerAccounts(cliCtx.Context, showDepositData, km); err != nil {
return errors.Wrap(err, "could not list validator accounts with direct keymanager")
}
case v2keymanager.Derived:
@@ -49,7 +49,7 @@ func ListAccountsCli(cliCtx *cli.Context) error {
if !ok {
return errors.New("could not assert keymanager interface to concrete type")
}
if err := listDerivedKeymanagerAccounts(showDepositData, km); err != nil {
if err := listDerivedKeymanagerAccounts(cliCtx.Context, showDepositData, km); err != nil {
return errors.Wrap(err, "could not list validator accounts with derived keymanager")
}
case v2keymanager.Remote:
@@ -57,7 +57,7 @@ func ListAccountsCli(cliCtx *cli.Context) error {
if !ok {
return errors.New("could not assert keymanager interface to concrete type")
}
if err := listRemoteKeymanagerAccounts(wallet, km, km.KeymanagerOpts()); err != nil {
if err := listRemoteKeymanagerAccounts(cliCtx.Context, wallet, km, km.KeymanagerOpts()); err != nil {
return errors.Wrap(err, "could not list validator accounts with remote keymanager")
}
default:
@@ -67,6 +67,7 @@ func ListAccountsCli(cliCtx *cli.Context) error {
}
func listDirectKeymanagerAccounts(
ctx context.Context,
showDepositData bool,
keymanager *direct.Keymanager,
) error {
@@ -89,7 +90,6 @@ func listDirectKeymanagerAccounts(
"by running `validator accounts-v2 list --show-deposit-data"),
)
ctx := context.Background()
pubKeys, err := keymanager.FetchValidatingPublicKeys(ctx)
if err != nil {
return errors.Wrap(err, "could not fetch validating public keys")
@@ -113,13 +113,13 @@ func listDirectKeymanagerAccounts(
}
func listDerivedKeymanagerAccounts(
ctx context.Context,
showDepositData bool,
keymanager *derived.Keymanager,
) error {
au := aurora.NewAurora(true)
fmt.Printf("(keymanager kind) %s\n", au.BrightGreen("derived, (HD) hierarchical-deterministic").Bold())
fmt.Printf("(derivation format) %s\n", au.BrightGreen(keymanager.KeymanagerOpts().DerivedPathStructure).Bold())
ctx := context.Background()
validatingPubKeys, err := keymanager.FetchValidatingPublicKeys(ctx)
if err != nil {
return errors.Wrap(err, "could not fetch validating public keys")
@@ -178,6 +178,7 @@ func listDerivedKeymanagerAccounts(
}
func listRemoteKeymanagerAccounts(
ctx context.Context,
wallet *Wallet,
keymanager v2keymanager.IKeymanager,
opts *remote.KeymanagerOpts,
@@ -188,7 +189,6 @@ func listRemoteKeymanagerAccounts(
"(configuration file path) %s\n",
au.BrightGreen(filepath.Join(wallet.AccountsDir(), KeymanagerConfigFileName)).Bold(),
)
ctx := context.Background()
fmt.Println(" ")
fmt.Printf("%s\n", au.BrightGreen("Configuration options").Bold())
fmt.Println(opts)

View File

@@ -70,7 +70,7 @@ func TestListAccounts_DirectKeymanager(t *testing.T) {
os.Stdout = w
// We call the list direct keymanager accounts function.
require.NoError(t, listDirectKeymanagerAccounts(true /* show deposit data */, keymanager))
require.NoError(t, listDirectKeymanagerAccounts(context.Background(), true /* show deposit data */, keymanager))
require.NoError(t, w.Close())
out, err := ioutil.ReadAll(r)
@@ -201,7 +201,7 @@ func TestListAccounts_DerivedKeymanager(t *testing.T) {
os.Stdout = w
// We call the list direct keymanager accounts function.
require.NoError(t, listDerivedKeymanagerAccounts(true /* show deposit data */, keymanager))
require.NoError(t, listDerivedKeymanagerAccounts(cliCtx.Context, true /* show deposit data */, keymanager))
require.NoError(t, w.Close())
out, err := ioutil.ReadAll(r)
@@ -334,7 +334,7 @@ func TestListAccounts_RemoteKeymanager(t *testing.T) {
},
}
// We call the list remote keymanager accounts function.
require.NoError(t, listRemoteKeymanagerAccounts(wallet, km, km.opts))
require.NoError(t, listRemoteKeymanagerAccounts(context.Background(), wallet, km, km.opts))
require.NoError(t, w.Close())
out, err := ioutil.ReadAll(r)

View File

@@ -188,7 +188,7 @@ func (v *validator) signAtt(ctx context.Context, pubKey [48]byte, data *ethpb.At
if protectingKeymanager, supported := v.keyManager.(keymanager.ProtectingKeyManager); supported {
sig, err = protectingKeymanager.SignAttestation(pubKey, bytesutil.ToBytes32(domain.SignatureDomain), data)
} else {
sig, err = v.keyManager.Sign(pubKey, root)
sig, err = v.keyManager.Sign(ctx, pubKey, root)
}
}
if err != nil {

View File

@@ -142,7 +142,7 @@ func TestAttestToBlockHead_AttestsCorrectly(t *testing.T) {
root, err := helpers.ComputeSigningRoot(expectedAttestation.Data, make([]byte, 32))
require.NoError(t, err)
sig, err := validator.keyManager.Sign(validatorPubKey, root)
sig, err := validator.keyManager.Sign(context.Background(), validatorPubKey, root)
require.NoError(t, err)
expectedAttestation.Signature = sig.Marshal()
if !reflect.DeepEqual(generatedAttestation, expectedAttestation) {

View File

@@ -237,7 +237,7 @@ func (v *validator) signBlock(ctx context.Context, pubKey [48]byte, epoch uint64
if err != nil {
return nil, errors.Wrap(err, signingRootErr)
}
sig, err = v.keyManager.Sign(pubKey, blockRoot)
sig, err = v.keyManager.Sign(ctx, pubKey, blockRoot)
if err != nil {
return nil, errors.Wrap(err, "could not sign block proposal")
}
@@ -279,7 +279,7 @@ func (v *validator) signVoluntaryExit(ctx context.Context, pubKey [48]byte, exit
return nil, errors.Wrap(err, signExitErr)
}
} else {
sig, err = v.keyManager.Sign(pubKey, exitRoot)
sig, err = v.keyManager.Sign(ctx, pubKey, exitRoot)
if err != nil {
return nil, errors.Wrap(err, signExitErr)
}

View File

@@ -274,7 +274,7 @@ func (v *validator) signObject(
if err != nil {
return nil, err
}
return v.keyManager.Sign(pubKey, root)
return v.keyManager.Sign(ctx, pubKey, root)
}
// ConstructDialOptions constructs a list of grpc dial options

View File

@@ -1,6 +1,8 @@
package v1
import (
"context"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
)
@@ -38,7 +40,7 @@ func (km *Direct) FetchValidatingKeys() ([][48]byte, error) {
}
// Sign signs a message for the validator to broadcast.
func (km *Direct) Sign(pubKey [48]byte, root [32]byte) (bls.Signature, error) {
func (km *Direct) Sign(ctx context.Context, pubKey [48]byte, root [32]byte) (bls.Signature, error) {
if secretKey, exists := km.secretKeys[pubKey]; exists {
return secretKey.Sign(root[:]), nil
}

View File

@@ -1,6 +1,7 @@
package v1_test
import (
"context"
"testing"
"github.com/prysmaticlabs/prysm/shared/bls"
@@ -41,7 +42,7 @@ func TestDirectListValidatingKeysMultiple(t *testing.T) {
func TestSignNoSuchKey(t *testing.T) {
sks := make([]bls.SecretKey, 0)
direct := keymanager.NewDirect(sks)
_, err := direct.Sign([48]byte{}, [32]byte{})
_, err := direct.Sign(context.Background(), [48]byte{}, [32]byte{})
assert.ErrorContains(t, keymanager.ErrNoSuchKey.Error(), err)
}
@@ -52,7 +53,7 @@ func TestSign(t *testing.T) {
pubKey := bytesutil.ToBytes48(sks[0].PublicKey().Marshal())
msg := [32]byte{}
sig, err := direct.Sign(pubKey, msg)
sig, err := direct.Sign(context.Background(), pubKey, msg)
require.NoError(t, err)
require.Equal(t, true, sig.Verify(sks[0].PublicKey(), bytesutil.FromBytes32(msg)), "Failed to verify generated signature")
}

View File

@@ -1,6 +1,7 @@
package v1
import (
"context"
"errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
@@ -22,7 +23,7 @@ type KeyManager interface {
FetchValidatingKeys() ([][48]byte, error)
// Sign signs a message for the validator to broadcast.
// Note that the domain should already be part of the root, but it is passed along for security purposes.
Sign(pubKey [48]byte, root [32]byte) (bls.Signature, error)
Sign(ctx context.Context, pubKey [48]byte, root [32]byte) (bls.Signature, error)
}
// ProtectingKeyManager provides access to a keymanager that protects its clients from slashing events.

View File

@@ -155,12 +155,12 @@ func (km *Remote) FetchValidatingKeys() ([][48]byte, error) {
}
// Sign without protection is not supported by remote keymanagers.
func (km *Remote) Sign(pubKey [48]byte, root [32]byte) (bls.Signature, error) {
func (km *Remote) Sign(ctx context.Context, pubKey [48]byte, root [32]byte) (bls.Signature, error) {
return nil, errors.New("remote keymanager does not support unprotected signing")
}
// SignGeneric signs a generic message for the validator to broadcast.
func (km *Remote) SignGeneric(pubKey [48]byte, root [32]byte, domain [32]byte) (bls.Signature, error) {
func (km *Remote) SignGeneric(ctx context.Context, pubKey [48]byte, root [32]byte, domain [32]byte) (bls.Signature, error) {
accountInfo, exists := km.accounts[pubKey]
if !exists {
return nil, ErrNoSuchKey
@@ -172,7 +172,7 @@ func (km *Remote) SignGeneric(pubKey [48]byte, root [32]byte, domain [32]byte) (
Data: root[:],
Domain: domain[:],
}
resp, err := client.Sign(context.Background(), req)
resp, err := client.Sign(ctx, req)
if err != nil {
return nil, err
}
@@ -186,7 +186,7 @@ func (km *Remote) SignGeneric(pubKey [48]byte, root [32]byte, domain [32]byte) (
}
// SignProposal signs a block proposal for the validator to broadcast.
func (km *Remote) SignProposal(pubKey [48]byte, domain [32]byte, data *ethpb.BeaconBlockHeader) (bls.Signature, error) {
func (km *Remote) SignProposal(ctx context.Context, pubKey [48]byte, domain [32]byte, data *ethpb.BeaconBlockHeader) (bls.Signature, error) {
accountInfo, exists := km.accounts[pubKey]
if !exists {
return nil, ErrNoSuchKey
@@ -204,7 +204,7 @@ func (km *Remote) SignProposal(pubKey [48]byte, domain [32]byte, data *ethpb.Bea
BodyRoot: data.BodyRoot,
},
}
resp, err := client.SignBeaconProposal(context.Background(), req)
resp, err := client.SignBeaconProposal(ctx, req)
if err != nil {
return nil, err
}
@@ -218,7 +218,7 @@ func (km *Remote) SignProposal(pubKey [48]byte, domain [32]byte, data *ethpb.Bea
}
// SignAttestation signs an attestation for the validator to broadcast.
func (km *Remote) SignAttestation(pubKey [48]byte, domain [32]byte, data *ethpb.AttestationData) (bls.Signature, error) {
func (km *Remote) SignAttestation(ctx context.Context, pubKey [48]byte, domain [32]byte, data *ethpb.AttestationData) (bls.Signature, error) {
accountInfo, exists := km.accounts[pubKey]
if !exists {
return nil, ErrNoSuchKey
@@ -242,7 +242,7 @@ func (km *Remote) SignAttestation(pubKey [48]byte, domain [32]byte, data *ethpb.
},
},
}
resp, err := client.SignBeaconAttestation(context.Background(), req)
resp, err := client.SignBeaconAttestation(ctx, req)
if err != nil {
return nil, err
}
@@ -261,7 +261,7 @@ func (km *Remote) RefreshValidatingKeys() error {
listAccountsReq := &pb.ListAccountsRequest{
Paths: km.paths,
}
resp, err := listerClient.ListAccounts(context.Background(), listAccountsReq)
resp, err := listerClient.ListAccounts(context.TODO(), listAccountsReq)
if err != nil {
return err
}

View File

@@ -141,7 +141,7 @@ func (km *Wallet) FetchValidatingKeys() ([][48]byte, error) {
}
// Sign signs a message for the validator to broadcast.
func (km *Wallet) Sign(pubKey [48]byte, root [32]byte) (bls.Signature, error) {
func (km *Wallet) Sign(ctx context.Context, pubKey [48]byte, root [32]byte) (bls.Signature, error) {
account, exists := km.accounts[pubKey]
if !exists {
return nil, ErrNoSuchKey
@@ -151,7 +151,7 @@ func (km *Wallet) Sign(pubKey [48]byte, root [32]byte) (bls.Signature, error) {
if !ok {
return nil, errors.New("account does not implement the AccountSigner interface")
}
sig, err := signer.Sign(context.Background(), root[:])
sig, err := signer.Sign(ctx, root[:])
if err != nil {
return nil, err
}

View File

@@ -349,7 +349,7 @@ func (dr *Keymanager) Sign(ctx context.Context, req *validatorpb.SignRequest) (b
}
func (dr *Keymanager) initializeAccountKeystore(ctx context.Context) error {
encoded, err := dr.wallet.ReadFileAtPath(context.Background(), AccountsPath, accountsKeystoreFileName)
encoded, err := dr.wallet.ReadFileAtPath(ctx, AccountsPath, accountsKeystoreFileName)
if err != nil && strings.Contains(err.Error(), "no files found") {
// If there are no keys to initialize at all, just exit.
return nil

View File

@@ -196,7 +196,7 @@ contract in order to activate the validator client`,
return err
}
}
ctx, cancel := context.WithTimeout(context.Background(), connTimeout)
ctx, cancel := context.WithTimeout(cliCtx.Context, connTimeout)
defer cancel()
dialOpts := client.ConstructDialOptions(
cliCtx.Int(cmd.GrpcMaxCallRecvMsgSizeFlag.Name),
@@ -211,7 +211,7 @@ contract in order to activate the validator client`,
log.WithError(err).Errorf("Failed to dial beacon node endpoint at %s", endpoint)
return err
}
err = v1.RunStatusCommand(pubKeys, ethpb.NewBeaconNodeValidatorClient(conn))
err = v1.RunStatusCommand(ctx, pubKeys, ethpb.NewBeaconNodeValidatorClient(conn))
if closed := conn.Close(); closed != nil {
log.WithError(closed).Error("Could not close connection to beacon node")
}
@@ -259,7 +259,7 @@ contract in order to activate the validator client`,
sources := strings.Split(passedSources, ",")
target := cliCtx.String(flags.TargetDirectory.Name)
if err := v1.Merge(context.Background(), sources, target); err != nil {
if err := v1.Merge(cliCtx.Context, sources, target); err != nil {
log.WithError(err).Error("Merging validator data failed")
} else {
log.Info("Merge completed successfully")
@@ -279,7 +279,7 @@ contract in order to activate the validator client`,
source := cliCtx.String(flags.SourceDirectory.Name)
target := cliCtx.String(flags.TargetDirectory.Name)
if err := v1.Split(context.Background(), source, target); err != nil {
if err := v1.Split(cliCtx.Context, source, target); err != nil {
log.WithError(err).Error("Splitting validator data failed")
} else {
log.Info("Split completed successfully")

View File

@@ -4,7 +4,6 @@
package node
import (
"context"
"fmt"
"io/ioutil"
"os"
@@ -329,7 +328,7 @@ func (s *ValidatorClient) registerClientService(
if err := s.services.FetchService(&sp); err == nil {
protector = sp
}
v, err := client.NewValidatorService(context.Background(), &client.Config{
v, err := client.NewValidatorService(s.cliCtx.Context, &client.Config{
Endpoint: endpoint,
DataDir: dataDir,
KeyManager: keyManager,
@@ -363,7 +362,7 @@ func (s *ValidatorClient) registerSlasherClientService() error {
maxCallRecvMsgSize := s.cliCtx.Int(cmd.GrpcMaxCallRecvMsgSizeFlag.Name)
grpcRetries := s.cliCtx.Uint(flags.GrpcRetriesFlag.Name)
grpcRetryDelay := s.cliCtx.Duration(flags.GrpcRetryDelayFlag.Name)
sp, err := slashing_protection.NewSlashingProtectionService(context.Background(), &slashing_protection.Config{
sp, err := slashing_protection.NewSlashingProtectionService(s.cliCtx.Context, &slashing_protection.Config{
Endpoint: endpoint,
CertFlag: cert,
GrpcMaxCallRecvMsgSizeFlag: maxCallRecvMsgSize,
@@ -385,7 +384,7 @@ func (s *ValidatorClient) registerRPCService(cliCtx *cli.Context) error {
rpcHost := cliCtx.String(flags.RPCHost.Name)
rpcPort := cliCtx.Int(flags.RPCPort.Name)
nodeGatewayEndpoint := cliCtx.String(flags.BeaconRPCGatewayProviderFlag.Name)
server := rpc.NewServer(context.Background(), &rpc.Config{
server := rpc.NewServer(cliCtx.Context, &rpc.Config{
ValDB: s.db,
Host: rpcHost,
Port: fmt.Sprintf("%d", rpcPort),
@@ -407,7 +406,7 @@ func (s *ValidatorClient) registerRPCGatewayService(cliCtx *cli.Context) error {
gatewayAddress := fmt.Sprintf("%s:%d", gatewayHost, gatewayPort)
allowedOrigins := strings.Split(cliCtx.String(flags.GPRCGatewayCorsDomain.Name), ",")
gatewaySrv := gateway.New(
context.Background(),
cliCtx.Context,
rpcAddr,
gatewayAddress,
allowedOrigins,