From a74cf5de909ed1d5a4638af9638d7a158fa700f8 Mon Sep 17 00:00:00 2001 From: Preston Van Loon Date: Wed, 9 Sep 2020 04:48:52 -0500 Subject: [PATCH] Replace context.Background() with more appropriate context (#7136) * Replace context.Background() with more appropriate context * replace a few context.TODO * Merge refs/heads/master into fix-ctx * Update validator/accounts/v2/accounts_create.go Co-authored-by: terence tsao * Fix tests * fix stream tests * gofmt * Merge refs/heads/master into fix-ctx * Merge refs/heads/master into fix-ctx * Merge refs/heads/master into fix-ctx * Merge refs/heads/master into fix-ctx * Merge refs/heads/master into fix-ctx * Merge refs/heads/master into fix-ctx * Merge refs/heads/master into fix-ctx * Merge refs/heads/master into fix-ctx * fix conflicts and remove ctx background uses * fix broken test * Merge branch 'master' into fix-ctx * imports * Merge branch 'fix-ctx' of github.com:prysmaticlabs/prysm into fix-ctx * Merge refs/heads/master into fix-ctx * Merge refs/heads/master into fix-ctx * fix conflicts * Merge refs/heads/master into fix-ctx * fmt * Merge refs/heads/master into fix-ctx * Merge refs/heads/master into fix-ctx * Merge refs/heads/master into fix-ctx * Merge refs/heads/master into fix-ctx * Merge refs/heads/master into fix-ctx * fixes tests --- beacon-chain/blockchain/receive_attestation.go | 2 +- beacon-chain/blockchain/service.go | 2 +- beacon-chain/core/state/state.go | 2 +- beacon-chain/db/kv/migration_archived_index.go | 2 +- beacon-chain/node/node.go | 8 ++++---- .../attestations/prepare_forkchoice.go | 5 ++--- beacon-chain/p2p/discovery_test.go | 3 ++- beacon-chain/p2p/fork_test.go | 5 +++-- beacon-chain/p2p/handshake.go | 5 ++--- beacon-chain/p2p/pubsub_test.go | 2 +- beacon-chain/p2p/service.go | 4 ++-- beacon-chain/p2p/service_test.go | 10 +++++----- beacon-chain/p2p/subnets_test.go | 2 +- beacon-chain/powchain/deposit.go | 4 ++-- beacon-chain/powchain/deposit_test.go | 14 +++++++------- beacon-chain/powchain/log_processing.go | 2 +- beacon-chain/powchain/service.go | 12 +++++++----- beacon-chain/rpc/validator/proposer_test.go | 2 +- beacon-chain/rpc/validator/server.go | 4 ++-- beacon-chain/rpc/validator/server_test.go | 7 +++++++ beacon-chain/sync/initial-sync/round_robin.go | 2 +- .../sync/initial-sync/round_robin_test.go | 6 ++++-- beacon-chain/sync/initial-sync/service.go | 6 +++--- beacon-chain/sync/pending_attestations_queue.go | 3 +-- beacon-chain/sync/pending_blocks_queue.go | 3 +-- beacon-chain/sync/rpc.go | 2 +- beacon-chain/sync/service.go | 4 ++-- beacon-chain/sync/subscriber.go | 2 +- beacon-chain/sync/validate_beacon_blocks.go | 2 +- fuzz/rpc_status_fuzz.go | 4 ++-- shared/event/subscription.go | 2 +- shared/mock/beacon_validator_client_mock.go | 3 ++- slasher/node/node.go | 2 +- validator/accounts/v1/status.go | 4 ++-- validator/accounts/v2/accounts_deposit.go | 3 +-- validator/accounts/v2/accounts_import.go | 5 ++--- validator/accounts/v2/accounts_list.go | 12 ++++++------ validator/accounts/v2/accounts_list_test.go | 6 +++--- validator/client/attest.go | 2 +- validator/client/attest_test.go | 2 +- validator/client/propose.go | 4 ++-- validator/client/service.go | 2 +- validator/keymanager/v1/direct.go | 4 +++- validator/keymanager/v1/direct_test.go | 5 +++-- validator/keymanager/v1/keymanager.go | 3 ++- validator/keymanager/v1/remote.go | 16 ++++++++-------- validator/keymanager/v1/wallet.go | 4 ++-- validator/keymanager/v2/direct/direct.go | 2 +- validator/main.go | 8 ++++---- validator/node/node.go | 9 ++++----- 50 files changed, 120 insertions(+), 109 deletions(-) diff --git a/beacon-chain/blockchain/receive_attestation.go b/beacon-chain/blockchain/receive_attestation.go index e903800aeb..693534bb36 100644 --- a/beacon-chain/blockchain/receive_attestation.go +++ b/beacon-chain/blockchain/receive_attestation.go @@ -111,7 +111,7 @@ func (s *Service) processAttestation(subscribedToStateEvents chan struct{}) { case <-s.ctx.Done(): return case <-st.C(): - ctx := context.Background() + ctx := s.ctx atts := s.attPool.ForkchoiceAttestations() for _, a := range atts { // Based on the spec, don't process the attestation until the subsequent slot. diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 108ee49144..70ca581b2b 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -259,7 +259,7 @@ func (s *Service) initializeBeaconChain( genesisTime time.Time, preGenesisState *stateTrie.BeaconState, eth1data *ethpb.Eth1Data) (*stateTrie.BeaconState, error) { - _, span := trace.StartSpan(context.Background(), "beacon-chain.Service.initializeBeaconChain") + ctx, span := trace.StartSpan(ctx, "beacon-chain.Service.initializeBeaconChain") defer span.End() s.genesisTime = genesisTime unixTime := uint64(genesisTime.Unix()) diff --git a/beacon-chain/core/state/state.go b/beacon-chain/core/state/state.go index 070a9feddc..501947c91a 100644 --- a/beacon-chain/core/state/state.go +++ b/beacon-chain/core/state/state.go @@ -95,7 +95,7 @@ func GenesisBeaconState(deposits []*ethpb.Deposit, genesisTime uint64, eth1Data return nil, err } - state, err = b.ProcessPreGenesisDeposits(context.Background(), state, deposits) + state, err = b.ProcessPreGenesisDeposits(context.TODO(), state, deposits) if err != nil { return nil, errors.Wrap(err, "could not process validator deposits") } diff --git a/beacon-chain/db/kv/migration_archived_index.go b/beacon-chain/db/kv/migration_archived_index.go index 2cd8c19c84..879421b556 100644 --- a/beacon-chain/db/kv/migration_archived_index.go +++ b/beacon-chain/db/kv/migration_archived_index.go @@ -36,7 +36,7 @@ func migrateArchivedIndex(tx *bolt.Tx) error { continue } blk := ðpb.SignedBeaconBlock{} - if err := decode(context.Background(), b, blk); err != nil { + if err := decode(context.TODO(), b, blk); err != nil { return err } if err := tx.Bucket(stateSlotIndicesBucket).Put(bytesutil.Uint64ToBytesBigEndian(blk.Block.Slot), v); err != nil { diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 6b616d3212..93a9b1a03a 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -151,7 +151,7 @@ func NewBeaconNode(cliCtx *cli.Context) (*BeaconNode, error) { registry := shared.NewServiceRegistry() - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(cliCtx.Context) beacon := &BeaconNode{ cliCtx: cliCtx, ctx: ctx, @@ -384,7 +384,7 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error { } } - svc, err := p2p.NewService(&p2p.Config{ + svc, err := p2p.NewService(b.ctx, &p2p.Config{ NoDiscovery: cliCtx.Bool(cmd.NoDiscovery.Name), StaticPeers: sliceutil.SplitCommaSeparated(cliCtx.StringSlice(cmd.StaticPeers.Name)), BootstrapNodeAddr: bootnodeAddrs, @@ -520,7 +520,7 @@ func (b *BeaconNode) registerSyncService() error { return err } - rs := prysmsync.NewRegularSync(&prysmsync.Config{ + rs := prysmsync.NewRegularSync(b.ctx, &prysmsync.Config{ DB: b.db, P2P: b.fetchP2P(), Chain: chainService, @@ -544,7 +544,7 @@ func (b *BeaconNode) registerInitialSyncService() error { return err } - is := initialsync.NewInitialSync(&initialsync.Config{ + is := initialsync.NewInitialSync(b.ctx, &initialsync.Config{ DB: b.db, Chain: chainService, P2P: b.fetchP2P(), diff --git a/beacon-chain/operations/attestations/prepare_forkchoice.go b/beacon-chain/operations/attestations/prepare_forkchoice.go index 4fbb684b80..1dc3a5f24e 100644 --- a/beacon-chain/operations/attestations/prepare_forkchoice.go +++ b/beacon-chain/operations/attestations/prepare_forkchoice.go @@ -24,10 +24,9 @@ func (s *Service) prepareForkChoiceAtts() { ticker := time.NewTicker(prepareForkChoiceAttsPeriod) defer ticker.Stop() for { - ctx := context.Background() select { case <-ticker.C: - if err := s.batchForkChoiceAtts(ctx); err != nil { + if err := s.batchForkChoiceAtts(s.ctx); err != nil { log.WithError(err).Error("Could not prepare attestations for fork choice") } case <-s.ctx.Done(): @@ -41,7 +40,7 @@ func (s *Service) prepareForkChoiceAtts() { // pool. Then finds the common data, aggregate and batch them for fork choice. // The resulting attestations are saved in the fork choice pool. func (s *Service) batchForkChoiceAtts(ctx context.Context) error { - _, span := trace.StartSpan(ctx, "Operations.attestations.batchForkChoiceAtts") + ctx, span := trace.StartSpan(ctx, "Operations.attestations.batchForkChoiceAtts") defer span.End() if err := s.pool.AggregateUnaggregatedAttestations(); err != nil { diff --git a/beacon-chain/p2p/discovery_test.go b/beacon-chain/p2p/discovery_test.go index 11c0e81657..40ffa02e4b 100644 --- a/beacon-chain/p2p/discovery_test.go +++ b/beacon-chain/p2p/discovery_test.go @@ -1,6 +1,7 @@ package p2p import ( + "context" "crypto/ecdsa" "fmt" "math/rand" @@ -180,7 +181,7 @@ func TestStaticPeering_PeersAreAdded(t *testing.T) { cfg.StaticPeers = staticPeers cfg.StateNotifier = &mock.MockStateNotifier{} cfg.NoDiscovery = true - s, err := NewService(cfg) + s, err := NewService(context.Background(), cfg) require.NoError(t, err) exitRoutine := make(chan bool) diff --git a/beacon-chain/p2p/fork_test.go b/beacon-chain/p2p/fork_test.go index 96b35192ed..cc659d6ccf 100644 --- a/beacon-chain/p2p/fork_test.go +++ b/beacon-chain/p2p/fork_test.go @@ -2,6 +2,7 @@ package p2p import ( "bytes" + "context" "math/rand" "os" "path" @@ -86,7 +87,7 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) { cfg.UDPPort = 14000 cfg.TCPPort = 14001 cfg.MaxPeers = 30 - s, err = NewService(cfg) + s, err = NewService(context.Background(), cfg) require.NoError(t, err) s.genesisTime = genesisTime s.genesisValidatorsRoot = make([]byte, 32) @@ -174,7 +175,7 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) { cfg.UDPPort = 14000 cfg.TCPPort = 14001 cfg.MaxPeers = 30 - s, err = NewService(cfg) + s, err = NewService(context.Background(), cfg) require.NoError(t, err) s.genesisTime = genesisTime diff --git a/beacon-chain/p2p/handshake.go b/beacon-chain/p2p/handshake.go index 42fa7ecbca..4f00ce9fed 100644 --- a/beacon-chain/p2p/handshake.go +++ b/beacon-chain/p2p/handshake.go @@ -131,7 +131,7 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer } s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnecting) - if err := reqFunc(context.Background(), conn.RemotePeer()); err != nil && err != io.EOF { + if err := reqFunc(context.TODO(), conn.RemotePeer()); err != nil && err != io.EOF { log.WithError(err).Trace("Handshake failed") disconnectFromPeer() return @@ -160,8 +160,7 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p priorState = peers.PeerDisconnected } s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnecting) - ctx := context.Background() - if err := handler(ctx, conn.RemotePeer()); err != nil { + if err := handler(context.TODO(), conn.RemotePeer()); err != nil { log.WithError(err).Error("Disconnect handler failed") } s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected) diff --git a/beacon-chain/p2p/pubsub_test.go b/beacon-chain/p2p/pubsub_test.go index d55e73175e..fb76de182b 100644 --- a/beacon-chain/p2p/pubsub_test.go +++ b/beacon-chain/p2p/pubsub_test.go @@ -14,7 +14,7 @@ import ( ) func TestService_PublishToTopicConcurrentMapWrite(t *testing.T) { - s, err := NewService(&Config{}) + s, err := NewService(context.Background(), &Config{}) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index c95a89b2a7..2490d758b0 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -88,9 +88,9 @@ type Service struct { // NewService initializes a new p2p service compatible with shared.Service interface. No // connections are made until the Start function is called during the service registry startup. -func NewService(cfg *Config) (*Service, error) { +func NewService(ctx context.Context, cfg *Config) (*Service, error) { var err error - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) _ = cancel // govet fix for lost cancel. Cancel is handled in service.Stop(). cache, err := ristretto.NewCache(&ristretto.Config{ NumCounters: cacheNumCounters, diff --git a/beacon-chain/p2p/service_test.go b/beacon-chain/p2p/service_test.go index 96e3437b6e..e8b740df04 100644 --- a/beacon-chain/p2p/service_test.go +++ b/beacon-chain/p2p/service_test.go @@ -71,7 +71,7 @@ func createHost(t *testing.T, port int) (host.Host, *ecdsa.PrivateKey, net.IP) { } func TestService_Stop_SetsStartedToFalse(t *testing.T) { - s, err := NewService(&Config{}) + s, err := NewService(context.Background(), &Config{}) require.NoError(t, err) s.started = true s.dv5Listener = &mockListener{} @@ -80,7 +80,7 @@ func TestService_Stop_SetsStartedToFalse(t *testing.T) { } func TestService_Stop_DontPanicIfDv5ListenerIsNotInited(t *testing.T) { - s, err := NewService(&Config{}) + s, err := NewService(context.Background(), &Config{}) require.NoError(t, err) assert.NoError(t, s.Stop()) } @@ -92,7 +92,7 @@ func TestService_Start_OnlyStartsOnce(t *testing.T) { TCPPort: 2000, UDPPort: 2000, } - s, err := NewService(cfg) + s, err := NewService(context.Background(), cfg) require.NoError(t, err) s.stateNotifier = &mock.MockStateNotifier{} s.dv5Listener = &mockListener{} @@ -193,7 +193,7 @@ func TestListenForNewNodes(t *testing.T) { cfg.UDPPort = 14000 cfg.TCPPort = 14001 - s, err = NewService(cfg) + s, err = NewService(context.Background(), cfg) require.NoError(t, err) s.stateNotifier = &mock.MockStateNotifier{} exitRoutine := make(chan bool) @@ -249,7 +249,7 @@ func TestPeer_Disconnect(t *testing.T) { } func TestService_JoinLeaveTopic(t *testing.T) { - s, err := NewService(&Config{}) + s, err := NewService(context.Background(), &Config{}) require.NoError(t, err) assert.Equal(t, 0, len(s.joinedTopics)) diff --git a/beacon-chain/p2p/subnets_test.go b/beacon-chain/p2p/subnets_test.go index 6b82f0e525..d2a40b9fe5 100644 --- a/beacon-chain/p2p/subnets_test.go +++ b/beacon-chain/p2p/subnets_test.go @@ -79,7 +79,7 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) { UDPPort: uint(port), } cfg.StateNotifier = &mock.MockStateNotifier{} - s, err = NewService(cfg) + s, err = NewService(context.Background(), cfg) require.NoError(t, err) exitRoutine := make(chan bool) go func() { diff --git a/beacon-chain/powchain/deposit.go b/beacon-chain/powchain/deposit.go index 68040edf49..422635516b 100644 --- a/beacon-chain/powchain/deposit.go +++ b/beacon-chain/powchain/deposit.go @@ -7,11 +7,11 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" ) -func (s *Service) processDeposit(eth1Data *ethpb.Eth1Data, deposit *ethpb.Deposit) error { +func (s *Service) processDeposit(ctx context.Context, eth1Data *ethpb.Eth1Data, deposit *ethpb.Deposit) error { var err error if err := s.preGenesisState.SetEth1Data(eth1Data); err != nil { return err } - s.preGenesisState, err = blocks.ProcessPreGenesisDeposits(context.Background(), s.preGenesisState, []*ethpb.Deposit{deposit}) + s.preGenesisState, err = blocks.ProcessPreGenesisDeposits(ctx, s.preGenesisState, []*ethpb.Deposit{deposit}) return err } diff --git a/beacon-chain/powchain/deposit_test.go b/beacon-chain/powchain/deposit_test.go index 0163bf8ad9..9540269278 100644 --- a/beacon-chain/powchain/deposit_test.go +++ b/beacon-chain/powchain/deposit_test.go @@ -36,7 +36,7 @@ func TestProcessDeposit_OK(t *testing.T) { eth1Data, err := testutil.DeterministicEth1Data(len(deposits)) require.NoError(t, err) - err = web3Service.processDeposit(eth1Data, deposits[0]) + err = web3Service.processDeposit(context.Background(), eth1Data, deposits[0]) require.NoError(t, err, "could not process deposit") valcount, err := helpers.ActiveValidatorCount(web3Service.preGenesisState, 0) @@ -61,7 +61,7 @@ func TestProcessDeposit_InvalidMerkleBranch(t *testing.T) { deposits[0].Proof = [][]byte{{'f', 'a', 'k', 'e'}} - err = web3Service.processDeposit(eth1Data, deposits[0]) + err = web3Service.processDeposit(context.Background(), eth1Data, deposits[0]) require.NotNil(t, err, "No errors, when an error was expected") want := "deposit merkle branch of deposit root did not verify for root" @@ -99,7 +99,7 @@ func TestProcessDeposit_InvalidPublicKey(t *testing.T) { DepositRoot: root[:], } - err = web3Service.processDeposit(eth1Data, deposits[0]) + err = web3Service.processDeposit(context.Background(), eth1Data, deposits[0]) require.NoError(t, err) require.LogsContain(t, hook, pubKeyErr) @@ -134,7 +134,7 @@ func TestProcessDeposit_InvalidSignature(t *testing.T) { DepositRoot: root[:], } - err = web3Service.processDeposit(eth1Data, deposits[0]) + err = web3Service.processDeposit(context.Background(), eth1Data, deposits[0]) require.NoError(t, err) require.LogsContain(t, hook, pubKeyErr) @@ -166,7 +166,7 @@ func TestProcessDeposit_UnableToVerify(t *testing.T) { proof, err := trie.MerkleProof(0) require.NoError(t, err) deposits[0].Proof = proof - err = web3Service.processDeposit(eth1Data, deposits[0]) + err = web3Service.processDeposit(context.Background(), eth1Data, deposits[0]) require.NoError(t, err) want := "signature did not verify" @@ -225,7 +225,7 @@ func TestProcessDeposit_IncompleteDeposit(t *testing.T) { deposit.Proof, err = trie.MerkleProof(i) require.NoError(t, err) - err = web3Service.processDeposit(eth1Data, deposit) + err = web3Service.processDeposit(context.Background(), eth1Data, deposit) require.NoError(t, err, fmt.Sprintf("Could not process deposit at %d", i)) valcount, err := helpers.ActiveValidatorCount(web3Service.preGenesisState, 0) @@ -251,7 +251,7 @@ func TestProcessDeposit_AllDepositedSuccessfully(t *testing.T) { for i := range keys { eth1Data.DepositCount = uint64(i + 1) - err = web3Service.processDeposit(eth1Data, deposits[i]) + err = web3Service.processDeposit(context.Background(), eth1Data, deposits[i]) require.NoError(t, err, fmt.Sprintf("Could not process deposit at %d", i)) valCount, err := helpers.ActiveValidatorCount(web3Service.preGenesisState, 0) diff --git a/beacon-chain/powchain/log_processing.go b/beacon-chain/powchain/log_processing.go index ed440bfafd..d983ad5b16 100644 --- a/beacon-chain/powchain/log_processing.go +++ b/beacon-chain/powchain/log_processing.go @@ -171,7 +171,7 @@ func (s *Service) ProcessDepositLog(ctx context.Context, depositLog gethTypes.Lo DepositRoot: root[:], DepositCount: uint64(len(s.chainStartData.ChainstartDeposits)), } - if err := s.processDeposit(eth1Data, deposit); err != nil { + if err := s.processDeposit(ctx, eth1Data, deposit); err != nil { log.Errorf("Invalid deposit processed: %v", err) validData = false } diff --git a/beacon-chain/powchain/service.go b/beacon-chain/powchain/service.go index ae8ae90e13..2a7c322a32 100644 --- a/beacon-chain/powchain/service.go +++ b/beacon-chain/powchain/service.go @@ -327,7 +327,7 @@ func (s *Service) AreAllDepositsProcessed() (bool, error) { return false, errors.Wrap(err, "could not get deposit count") } count := bytesutil.FromBytes8(countByte) - deposits := s.depositCache.AllDeposits(context.TODO(), nil) + deposits := s.depositCache.AllDeposits(s.ctx, nil) if count != uint64(len(deposits)) { return false, nil } @@ -606,6 +606,7 @@ func safelyHandlePanic() { func (s *Service) handleETH1FollowDistance() { defer safelyHandlePanic() + ctx := s.ctx // use a 5 minutes timeout for block time, because the max mining time is 278 sec (block 7208027) // (analyzed the time of the block from 2018-09-01 to 2019-02-13) @@ -615,7 +616,7 @@ func (s *Service) handleETH1FollowDistance() { log.Warn("eth1 client is not syncing") } if !s.chainStartData.Chainstarted { - if err := s.checkBlockNumberForChainStart(context.Background(), big.NewInt(int64(s.latestEth1Data.LastRequestedBlock))); err != nil { + if err := s.checkBlockNumberForChainStart(ctx, big.NewInt(int64(s.latestEth1Data.LastRequestedBlock))); err != nil { s.runError = err log.Error(err) return @@ -627,7 +628,7 @@ func (s *Service) handleETH1FollowDistance() { if s.latestEth1Data.LastRequestedBlock == s.latestEth1Data.BlockHeight { return } - if err := s.requestBatchedLogs(context.Background()); err != nil { + if err := s.requestBatchedLogs(ctx); err != nil { s.runError = err log.Error(err) return @@ -646,6 +647,7 @@ func (s *Service) initPOWService() { case <-s.ctx.Done(): return default: + ctx := s.ctx err := s.initDataFromContract() if err != nil { log.Errorf("Unable to retrieve data from deposit contract %v", err) @@ -653,7 +655,7 @@ func (s *Service) initPOWService() { continue } - header, err := s.eth1DataFetcher.HeaderByNumber(context.Background(), nil) + header, err := s.eth1DataFetcher.HeaderByNumber(ctx, nil) if err != nil { log.Errorf("Unable to retrieve latest ETH1.0 chain header: %v", err) s.retryETH1Node(err) @@ -664,7 +666,7 @@ func (s *Service) initPOWService() { s.latestEth1Data.BlockHash = header.Hash().Bytes() s.latestEth1Data.BlockTime = header.Time - if err := s.processPastLogs(context.Background()); err != nil { + if err := s.processPastLogs(ctx); err != nil { log.Errorf("Unable to process past logs %v", err) s.retryETH1Node(err) continue diff --git a/beacon-chain/rpc/validator/proposer_test.go b/beacon-chain/rpc/validator/proposer_test.go index bdb37cce76..ce45d48ac6 100644 --- a/beacon-chain/rpc/validator/proposer_test.go +++ b/beacon-chain/rpc/validator/proposer_test.go @@ -160,7 +160,7 @@ func TestGetBlock_AddsUnaggregatedAtts(t *testing.T) { // we don't care for the purpose of this test. var atts []*ethpb.Attestation for i := uint64(0); len(atts) < int(params.BeaconConfig().MaxAttestations); i++ { - a, err := testutil.GenerateAttestations(beaconState, privKeys, 2, 1, true) + a, err := testutil.GenerateAttestations(beaconState, privKeys, 4, 1, true) require.NoError(t, err) atts = append(atts, a...) } diff --git a/beacon-chain/rpc/validator/server.go b/beacon-chain/rpc/validator/server.go index f59ab97ef6..68832a88a9 100644 --- a/beacon-chain/rpc/validator/server.go +++ b/beacon-chain/rpc/validator/server.go @@ -158,7 +158,7 @@ func (vs *Server) CanonicalHead(ctx context.Context, req *ptypes.Empty) (*ethpb. // subscribes to an event stream triggered by the powchain service whenever the ChainStart log does // occur in the Deposit Contract on ETH 1.0. func (vs *Server) WaitForChainStart(req *ptypes.Empty, stream ethpb.BeaconNodeValidator_WaitForChainStartServer) error { - head, err := vs.HeadFetcher.HeadState(context.Background()) + head, err := vs.HeadFetcher.HeadState(stream.Context()) if err != nil { return status.Errorf(codes.Internal, "Could not retrieve head state: %v", err) } @@ -213,7 +213,7 @@ func (vs *Server) WaitForChainStart(req *ptypes.Empty, stream ethpb.BeaconNodeVa // WaitForSynced subscribes to the state channel and ends the stream when the state channel // indicates the beacon node has been initialized and is ready func (vs *Server) WaitForSynced(req *ptypes.Empty, stream ethpb.BeaconNodeValidator_WaitForSyncedServer) error { - head, err := vs.HeadFetcher.HeadState(context.Background()) + head, err := vs.HeadFetcher.HeadState(stream.Context()) if err != nil { return status.Errorf(codes.Internal, "Could not retrieve head state: %v", err) } diff --git a/beacon-chain/rpc/validator/server_test.go b/beacon-chain/rpc/validator/server_test.go index c209642257..694db57560 100644 --- a/beacon-chain/rpc/validator/server_test.go +++ b/beacon-chain/rpc/validator/server_test.go @@ -300,6 +300,7 @@ func TestWaitForChainStart_ContextClosed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockStream := mock.NewMockBeaconNodeValidator_WaitForChainStartServer(ctrl) + mockStream.EXPECT().Context().Return(ctx) go func(tt *testing.T) { err := Server.WaitForChainStart(&ptypes.Empty{}, mockStream) assert.ErrorContains(tt, "Context canceled", err) @@ -337,6 +338,7 @@ func TestWaitForChainStart_AlreadyStarted(t *testing.T) { GenesisTime: uint64(time.Unix(0, 0).Unix()), }, ).Return(nil) + mockStream.EXPECT().Context().Return(context.Background()) assert.NoError(t, Server.WaitForChainStart(&ptypes.Empty{}, mockStream), "Could not call RPC method") } @@ -359,6 +361,7 @@ func TestWaitForChainStart_HeadStateDoesNotExist(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockStream := mock.NewMockBeaconNodeValidator_WaitForChainStartServer(ctrl) + mockStream.EXPECT().Context().Return(context.Background()) wg := new(sync.WaitGroup) wg.Add(1) @@ -402,6 +405,7 @@ func TestWaitForChainStart_NotStartedThenLogFired(t *testing.T) { GenesisTime: uint64(time.Unix(0, 0).Unix()), }, ).Return(nil) + mockStream.EXPECT().Context().Return(context.Background()) go func(tt *testing.T) { assert.NoError(tt, Server.WaitForChainStart(&ptypes.Empty{}, mockStream)) <-exitRoutine @@ -441,6 +445,7 @@ func TestWaitForSynced_ContextClosed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockStream := mock.NewMockBeaconNodeValidator_WaitForSyncedServer(ctrl) + mockStream.EXPECT().Context().Return(context.Background()) go func(tt *testing.T) { err := Server.WaitForSynced(&ptypes.Empty{}, mockStream) assert.ErrorContains(tt, "Context canceled", err) @@ -479,6 +484,7 @@ func TestWaitForSynced_AlreadySynced(t *testing.T) { GenesisTime: uint64(time.Unix(0, 0).Unix()), }, ).Return(nil) + mockStream.EXPECT().Context().Return(context.Background()) assert.NoError(t, Server.WaitForSynced(&ptypes.Empty{}, mockStream), "Could not call RPC method") } @@ -506,6 +512,7 @@ func TestWaitForSynced_NotStartedThenLogFired(t *testing.T) { GenesisTime: uint64(time.Unix(0, 0).Unix()), }, ).Return(nil) + mockStream.EXPECT().Context().Return(context.Background()) go func(tt *testing.T) { assert.NoError(tt, Server.WaitForSynced(&ptypes.Empty{}, mockStream), "Could not call RPC method") <-exitRoutine diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index 39463942ac..6628034d18 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -41,7 +41,7 @@ type batchBlockReceiverFn func(ctx context.Context, blks []*eth.SignedBeaconBloc // after the finalized epoch, request blocks to head from some subset of peers // where step = 1. func (s *Service) roundRobinSync(genesis time.Time) error { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(s.ctx) defer cancel() defer s.chain.ClearCachedStates() state.SkipSlotCache.Disable() diff --git a/beacon-chain/sync/initial-sync/round_robin_test.go b/beacon-chain/sync/initial-sync/round_robin_test.go index c5cc8c994a..037c105cde 100644 --- a/beacon-chain/sync/initial-sync/round_robin_test.go +++ b/beacon-chain/sync/initial-sync/round_robin_test.go @@ -298,6 +298,7 @@ func TestService_roundRobinSync(t *testing.T) { }, } // no-op mock s := &Service{ + ctx: context.Background(), chain: mc, p2p: p, db: beaconDB, @@ -329,7 +330,7 @@ func TestService_processBlock(t *testing.T) { err = beaconDB.SaveBlock(context.Background(), genesisBlk) require.NoError(t, err) st := testutil.NewBeaconState() - s := NewInitialSync(&Config{ + s := NewInitialSync(context.Background(), &Config{ P2P: p2pt.NewTestP2P(t), DB: beaconDB, Chain: &mock.ChainService{ @@ -388,7 +389,7 @@ func TestService_processBlockBatch(t *testing.T) { err = beaconDB.SaveBlock(context.Background(), genesisBlk) require.NoError(t, err) st := testutil.NewBeaconState() - s := NewInitialSync(&Config{ + s := NewInitialSync(context.Background(), &Config{ P2P: p2pt.NewTestP2P(t), DB: beaconDB, Chain: &mock.ChainService{ @@ -527,6 +528,7 @@ func TestService_blockProviderScoring(t *testing.T) { }, } // no-op mock s := &Service{ + ctx: context.Background(), chain: mc, p2p: p, db: beaconDB, diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 825ba571a2..6a18469e50 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -58,8 +58,8 @@ type Service struct { // NewInitialSync configures the initial sync service responsible for bringing the node up to the // latest head of the blockchain. -func NewInitialSync(cfg *Config) *Service { - ctx, cancel := context.WithCancel(context.Background()) +func NewInitialSync(ctx context.Context, cfg *Config) *Service { + ctx, cancel := context.WithCancel(ctx) return &Service{ ctx: ctx, cancel: cancel, @@ -192,7 +192,7 @@ func (s *Service) Resync() error { // set it to false since we are syncing again s.synced = false defer func() { s.synced = true }() // Reset it at the end of the method. - headState, err := s.chain.HeadState(context.Background()) + headState, err := s.chain.HeadState(s.ctx) if err != nil { return errors.Wrap(err, "could not retrieve head state") } diff --git a/beacon-chain/sync/pending_attestations_queue.go b/beacon-chain/sync/pending_attestations_queue.go index a9c2b855cb..68cc2f3e0e 100644 --- a/beacon-chain/sync/pending_attestations_queue.go +++ b/beacon-chain/sync/pending_attestations_queue.go @@ -23,12 +23,11 @@ var processPendingAttsPeriod = slotutil.DivideSlotBy(2 /* twice per slot */) // This processes pending attestation queues on every `processPendingAttsPeriod`. func (s *Service) processPendingAttsQueue() { - ctx := context.Background() // Prevents multiple queue processing goroutines (invoked by RunEvery) from contending for data. mutex := new(sync.Mutex) runutil.RunEvery(s.ctx, processPendingAttsPeriod, func() { mutex.Lock() - if err := s.processPendingAtts(ctx); err != nil { + if err := s.processPendingAtts(s.ctx); err != nil { log.WithError(err).Debugf("Could not process pending attestation: %v", err) } mutex.Unlock() diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index 3a1e45e9cf..9008634c64 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -28,12 +28,11 @@ const numOfTries = 5 // processes pending blocks queue on every processPendingBlocksPeriod func (s *Service) processPendingBlocksQueue() { - ctx := context.Background() // Prevents multiple queue processing goroutines (invoked by RunEvery) from contending for data. locker := new(sync.Mutex) runutil.RunEvery(s.ctx, processPendingBlocksPeriod, func() { locker.Lock() - if err := s.processPendingBlocks(ctx); err != nil { + if err := s.processPendingBlocks(s.ctx); err != nil { log.WithError(err).Debug("Failed to process pending blocks") } locker.Unlock() diff --git a/beacon-chain/sync/rpc.go b/beacon-chain/sync/rpc.go index a827472bc4..cf585ef789 100644 --- a/beacon-chain/sync/rpc.go +++ b/beacon-chain/sync/rpc.go @@ -62,7 +62,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) { topic := baseTopic + s.p2p.Encoding().ProtocolSuffix() log := log.WithField("topic", topic) s.p2p.SetStreamHandler(topic, func(stream network.Stream) { - ctx, cancel := context.WithTimeout(context.Background(), ttfbTimeout) + ctx, cancel := context.WithTimeout(s.ctx, ttfbTimeout) defer cancel() defer func() { if err := helpers.FullClose(stream); err != nil && err.Error() != mux.ErrReset.Error() { diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 7b90698bc5..6ad3e1ea5e 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -111,9 +111,9 @@ type Service struct { } // NewRegularSync service. -func NewRegularSync(cfg *Config) *Service { +func NewRegularSync(ctx context.Context, cfg *Config) *Service { rLimiter := newRateLimiter(cfg.P2P) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) r := &Service{ ctx: ctx, cancel: cancel, diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 3c1d62459f..467bd09188 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -110,7 +110,7 @@ func (s *Service) subscribeWithBase(base proto.Message, topic string, validator // Pipeline decodes the incoming subscription data, runs the validation, and handles the // message. pipeline := func(msg *pubsub.Message) { - ctx, cancel := context.WithTimeout(context.Background(), pubsubMessageTimeout) + ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout) defer cancel() ctx, span := trace.StartSpan(ctx, "sync.pubsub") defer span.End() diff --git a/beacon-chain/sync/validate_beacon_blocks.go b/beacon-chain/sync/validate_beacon_blocks.go index 47ba857085..f8eeb1b427 100644 --- a/beacon-chain/sync/validate_beacon_blocks.go +++ b/beacon-chain/sync/validate_beacon_blocks.go @@ -142,7 +142,7 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms return pubsub.ValidationReject } - parentState, err = state.ProcessSlots(context.Background(), parentState, blk.Block.Slot) + parentState, err = state.ProcessSlots(ctx, parentState, blk.Block.Slot) if err != nil { log.Errorf("Could not advance slot to calculate proposer index: %v", err) return pubsub.ValidationIgnore diff --git a/fuzz/rpc_status_fuzz.go b/fuzz/rpc_status_fuzz.go index 7be32979c0..8fffada5f1 100644 --- a/fuzz/rpc_status_fuzz.go +++ b/fuzz/rpc_status_fuzz.go @@ -26,7 +26,7 @@ func init() { logrus.SetLevel(logrus.PanicLevel) var err error - p, err = p2p.NewService(&p2p.Config{ + p, err = p2p.NewService(context.Background(), &p2p.Config{ NoDiscovery: true, }) if err != nil { @@ -45,7 +45,7 @@ func init() { if err := p.Connect(info); err != nil { panic(errors.Wrap(err, "could not connect to peer")) } - sync.NewRegularSync(&sync.Config{ + sync.NewRegularSync(context.Background(), &sync.Config{ P2P: p, DB: nil, AttPool: nil, diff --git a/shared/event/subscription.go b/shared/event/subscription.go index 903e13892a..61a0e6889f 100644 --- a/shared/event/subscription.go +++ b/shared/event/subscription.go @@ -152,7 +152,7 @@ func (s *resubscribeSub) subscribe() Subscription { retry: for { s.lastTry = mclockutil.Now() - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.TODO()) go func() { rsub, err := s.fn(ctx) sub = rsub diff --git a/shared/mock/beacon_validator_client_mock.go b/shared/mock/beacon_validator_client_mock.go index a1894af3ba..b438493620 100644 --- a/shared/mock/beacon_validator_client_mock.go +++ b/shared/mock/beacon_validator_client_mock.go @@ -6,12 +6,13 @@ package mock import ( context "context" + reflect "reflect" + types "github.com/gogo/protobuf/types" gomock "github.com/golang/mock/gomock" eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" grpc "google.golang.org/grpc" metadata "google.golang.org/grpc/metadata" - reflect "reflect" ) // MockBeaconNodeValidatorClient is a mock of BeaconNodeValidatorClient interface diff --git a/slasher/node/node.go b/slasher/node/node.go index 29fffdb6b4..e8bad47a01 100644 --- a/slasher/node/node.go +++ b/slasher/node/node.go @@ -75,7 +75,7 @@ func NewSlasherNode(cliCtx *cli.Context) (*SlasherNode, error) { cmd.ConfigureSlasher(cliCtx) registry := shared.NewServiceRegistry() - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(cliCtx.Context) slasher := &SlasherNode{ cliCtx: cliCtx, ctx: ctx, diff --git a/validator/accounts/v1/status.go b/validator/accounts/v1/status.go index c61fc6fb94..c7e091aa58 100644 --- a/validator/accounts/v1/status.go +++ b/validator/accounts/v1/status.go @@ -24,8 +24,8 @@ type ValidatorStatusMetadata struct { } // RunStatusCommand is the entry point to the `validator status` command. -func RunStatusCommand(pubKeys [][]byte, beaconNodeRPCProvider ethpb.BeaconNodeValidatorClient) error { - statuses, err := FetchAccountStatuses(context.Background(), beaconNodeRPCProvider, pubKeys) +func RunStatusCommand(ctx context.Context, pubKeys [][]byte, beaconNodeRPCProvider ethpb.BeaconNodeValidatorClient) error { + statuses, err := FetchAccountStatuses(ctx, beaconNodeRPCProvider, pubKeys) if err != nil { return errors.Wrap(err, "could not fetch account statuses from the beacon node") } diff --git a/validator/accounts/v2/accounts_deposit.go b/validator/accounts/v2/accounts_deposit.go index 0baa94dacb..77c2d7214b 100644 --- a/validator/accounts/v2/accounts_deposit.go +++ b/validator/accounts/v2/accounts_deposit.go @@ -1,7 +1,6 @@ package v2 import ( - "context" "fmt" "os" "strings" @@ -60,7 +59,7 @@ func SendDepositCli(cliCtx *cli.Context) error { } func createDepositConfig(cliCtx *cli.Context, km *derived.Keymanager) (*derived.SendDepositConfig, error) { - pubKeysBytes, err := km.FetchValidatingPublicKeys(context.Background()) + pubKeysBytes, err := km.FetchValidatingPublicKeys(cliCtx.Context) if err != nil { return nil, errors.Wrap(err, "could not fetch validating public keys") } diff --git a/validator/accounts/v2/accounts_import.go b/validator/accounts/v2/accounts_import.go index bce31b41e8..269cb35912 100644 --- a/validator/accounts/v2/accounts_import.go +++ b/validator/accounts/v2/accounts_import.go @@ -76,7 +76,6 @@ type ImportAccountsConfig struct { // new accounts into the Prysm validator wallet. This uses the CLI to extract // values necessary to run the function. func ImportAccountsCli(cliCtx *cli.Context) error { - ctx := context.Background() au := aurora.NewAurora(true) wallet, err := OpenWalletOrElseCli(cliCtx, func(cliCtx *cli.Context) (*Wallet, error) { cfg, err := extractWalletCreationConfigFromCli(cliCtx, v2keymanager.Direct) @@ -137,7 +136,7 @@ func ImportAccountsCli(cliCtx *cli.Context) error { // specify this value in their filename. sort.Sort(byDerivationPath(filesInDir)) for _, name := range filesInDir { - keystore, err := readKeystoreFile(ctx, filepath.Join(keysDir, name)) + keystore, err := readKeystoreFile(cliCtx.Context, filepath.Join(keysDir, name)) if err != nil && strings.Contains(err.Error(), "could not decode keystore json") { continue } else if err != nil { @@ -146,7 +145,7 @@ func ImportAccountsCli(cliCtx *cli.Context) error { keystoresImported = append(keystoresImported, keystore) } } else { - keystore, err := readKeystoreFile(ctx, keysDir) + keystore, err := readKeystoreFile(cliCtx.Context, keysDir) if err != nil { return errors.Wrap(err, "could not import keystore") } diff --git a/validator/accounts/v2/accounts_list.go b/validator/accounts/v2/accounts_list.go index eee13f890e..fea79e95b1 100644 --- a/validator/accounts/v2/accounts_list.go +++ b/validator/accounts/v2/accounts_list.go @@ -41,7 +41,7 @@ func ListAccountsCli(cliCtx *cli.Context) error { if !ok { return errors.New("could not assert keymanager interface to concrete type") } - if err := listDirectKeymanagerAccounts(showDepositData, km); err != nil { + if err := listDirectKeymanagerAccounts(cliCtx.Context, showDepositData, km); err != nil { return errors.Wrap(err, "could not list validator accounts with direct keymanager") } case v2keymanager.Derived: @@ -49,7 +49,7 @@ func ListAccountsCli(cliCtx *cli.Context) error { if !ok { return errors.New("could not assert keymanager interface to concrete type") } - if err := listDerivedKeymanagerAccounts(showDepositData, km); err != nil { + if err := listDerivedKeymanagerAccounts(cliCtx.Context, showDepositData, km); err != nil { return errors.Wrap(err, "could not list validator accounts with derived keymanager") } case v2keymanager.Remote: @@ -57,7 +57,7 @@ func ListAccountsCli(cliCtx *cli.Context) error { if !ok { return errors.New("could not assert keymanager interface to concrete type") } - if err := listRemoteKeymanagerAccounts(wallet, km, km.KeymanagerOpts()); err != nil { + if err := listRemoteKeymanagerAccounts(cliCtx.Context, wallet, km, km.KeymanagerOpts()); err != nil { return errors.Wrap(err, "could not list validator accounts with remote keymanager") } default: @@ -67,6 +67,7 @@ func ListAccountsCli(cliCtx *cli.Context) error { } func listDirectKeymanagerAccounts( + ctx context.Context, showDepositData bool, keymanager *direct.Keymanager, ) error { @@ -89,7 +90,6 @@ func listDirectKeymanagerAccounts( "by running `validator accounts-v2 list --show-deposit-data"), ) - ctx := context.Background() pubKeys, err := keymanager.FetchValidatingPublicKeys(ctx) if err != nil { return errors.Wrap(err, "could not fetch validating public keys") @@ -113,13 +113,13 @@ func listDirectKeymanagerAccounts( } func listDerivedKeymanagerAccounts( + ctx context.Context, showDepositData bool, keymanager *derived.Keymanager, ) error { au := aurora.NewAurora(true) fmt.Printf("(keymanager kind) %s\n", au.BrightGreen("derived, (HD) hierarchical-deterministic").Bold()) fmt.Printf("(derivation format) %s\n", au.BrightGreen(keymanager.KeymanagerOpts().DerivedPathStructure).Bold()) - ctx := context.Background() validatingPubKeys, err := keymanager.FetchValidatingPublicKeys(ctx) if err != nil { return errors.Wrap(err, "could not fetch validating public keys") @@ -178,6 +178,7 @@ func listDerivedKeymanagerAccounts( } func listRemoteKeymanagerAccounts( + ctx context.Context, wallet *Wallet, keymanager v2keymanager.IKeymanager, opts *remote.KeymanagerOpts, @@ -188,7 +189,6 @@ func listRemoteKeymanagerAccounts( "(configuration file path) %s\n", au.BrightGreen(filepath.Join(wallet.AccountsDir(), KeymanagerConfigFileName)).Bold(), ) - ctx := context.Background() fmt.Println(" ") fmt.Printf("%s\n", au.BrightGreen("Configuration options").Bold()) fmt.Println(opts) diff --git a/validator/accounts/v2/accounts_list_test.go b/validator/accounts/v2/accounts_list_test.go index 2b9f7ff6e5..91cf785849 100644 --- a/validator/accounts/v2/accounts_list_test.go +++ b/validator/accounts/v2/accounts_list_test.go @@ -70,7 +70,7 @@ func TestListAccounts_DirectKeymanager(t *testing.T) { os.Stdout = w // We call the list direct keymanager accounts function. - require.NoError(t, listDirectKeymanagerAccounts(true /* show deposit data */, keymanager)) + require.NoError(t, listDirectKeymanagerAccounts(context.Background(), true /* show deposit data */, keymanager)) require.NoError(t, w.Close()) out, err := ioutil.ReadAll(r) @@ -201,7 +201,7 @@ func TestListAccounts_DerivedKeymanager(t *testing.T) { os.Stdout = w // We call the list direct keymanager accounts function. - require.NoError(t, listDerivedKeymanagerAccounts(true /* show deposit data */, keymanager)) + require.NoError(t, listDerivedKeymanagerAccounts(cliCtx.Context, true /* show deposit data */, keymanager)) require.NoError(t, w.Close()) out, err := ioutil.ReadAll(r) @@ -334,7 +334,7 @@ func TestListAccounts_RemoteKeymanager(t *testing.T) { }, } // We call the list remote keymanager accounts function. - require.NoError(t, listRemoteKeymanagerAccounts(wallet, km, km.opts)) + require.NoError(t, listRemoteKeymanagerAccounts(context.Background(), wallet, km, km.opts)) require.NoError(t, w.Close()) out, err := ioutil.ReadAll(r) diff --git a/validator/client/attest.go b/validator/client/attest.go index 2d5d23b2c6..8b9a5e41ae 100644 --- a/validator/client/attest.go +++ b/validator/client/attest.go @@ -188,7 +188,7 @@ func (v *validator) signAtt(ctx context.Context, pubKey [48]byte, data *ethpb.At if protectingKeymanager, supported := v.keyManager.(keymanager.ProtectingKeyManager); supported { sig, err = protectingKeymanager.SignAttestation(pubKey, bytesutil.ToBytes32(domain.SignatureDomain), data) } else { - sig, err = v.keyManager.Sign(pubKey, root) + sig, err = v.keyManager.Sign(ctx, pubKey, root) } } if err != nil { diff --git a/validator/client/attest_test.go b/validator/client/attest_test.go index 10fb51ce79..a04620c0b0 100644 --- a/validator/client/attest_test.go +++ b/validator/client/attest_test.go @@ -142,7 +142,7 @@ func TestAttestToBlockHead_AttestsCorrectly(t *testing.T) { root, err := helpers.ComputeSigningRoot(expectedAttestation.Data, make([]byte, 32)) require.NoError(t, err) - sig, err := validator.keyManager.Sign(validatorPubKey, root) + sig, err := validator.keyManager.Sign(context.Background(), validatorPubKey, root) require.NoError(t, err) expectedAttestation.Signature = sig.Marshal() if !reflect.DeepEqual(generatedAttestation, expectedAttestation) { diff --git a/validator/client/propose.go b/validator/client/propose.go index cfdc24b55e..2befc682d8 100644 --- a/validator/client/propose.go +++ b/validator/client/propose.go @@ -237,7 +237,7 @@ func (v *validator) signBlock(ctx context.Context, pubKey [48]byte, epoch uint64 if err != nil { return nil, errors.Wrap(err, signingRootErr) } - sig, err = v.keyManager.Sign(pubKey, blockRoot) + sig, err = v.keyManager.Sign(ctx, pubKey, blockRoot) if err != nil { return nil, errors.Wrap(err, "could not sign block proposal") } @@ -279,7 +279,7 @@ func (v *validator) signVoluntaryExit(ctx context.Context, pubKey [48]byte, exit return nil, errors.Wrap(err, signExitErr) } } else { - sig, err = v.keyManager.Sign(pubKey, exitRoot) + sig, err = v.keyManager.Sign(ctx, pubKey, exitRoot) if err != nil { return nil, errors.Wrap(err, signExitErr) } diff --git a/validator/client/service.go b/validator/client/service.go index 7ec94b3dbd..3b9eadabfb 100644 --- a/validator/client/service.go +++ b/validator/client/service.go @@ -274,7 +274,7 @@ func (v *validator) signObject( if err != nil { return nil, err } - return v.keyManager.Sign(pubKey, root) + return v.keyManager.Sign(ctx, pubKey, root) } // ConstructDialOptions constructs a list of grpc dial options diff --git a/validator/keymanager/v1/direct.go b/validator/keymanager/v1/direct.go index 9b06a119ea..dffae8affa 100644 --- a/validator/keymanager/v1/direct.go +++ b/validator/keymanager/v1/direct.go @@ -1,6 +1,8 @@ package v1 import ( + "context" + "github.com/prysmaticlabs/prysm/shared/bls" "github.com/prysmaticlabs/prysm/shared/bytesutil" ) @@ -38,7 +40,7 @@ func (km *Direct) FetchValidatingKeys() ([][48]byte, error) { } // Sign signs a message for the validator to broadcast. -func (km *Direct) Sign(pubKey [48]byte, root [32]byte) (bls.Signature, error) { +func (km *Direct) Sign(ctx context.Context, pubKey [48]byte, root [32]byte) (bls.Signature, error) { if secretKey, exists := km.secretKeys[pubKey]; exists { return secretKey.Sign(root[:]), nil } diff --git a/validator/keymanager/v1/direct_test.go b/validator/keymanager/v1/direct_test.go index 70d093aae0..dc9612adfb 100644 --- a/validator/keymanager/v1/direct_test.go +++ b/validator/keymanager/v1/direct_test.go @@ -1,6 +1,7 @@ package v1_test import ( + "context" "testing" "github.com/prysmaticlabs/prysm/shared/bls" @@ -41,7 +42,7 @@ func TestDirectListValidatingKeysMultiple(t *testing.T) { func TestSignNoSuchKey(t *testing.T) { sks := make([]bls.SecretKey, 0) direct := keymanager.NewDirect(sks) - _, err := direct.Sign([48]byte{}, [32]byte{}) + _, err := direct.Sign(context.Background(), [48]byte{}, [32]byte{}) assert.ErrorContains(t, keymanager.ErrNoSuchKey.Error(), err) } @@ -52,7 +53,7 @@ func TestSign(t *testing.T) { pubKey := bytesutil.ToBytes48(sks[0].PublicKey().Marshal()) msg := [32]byte{} - sig, err := direct.Sign(pubKey, msg) + sig, err := direct.Sign(context.Background(), pubKey, msg) require.NoError(t, err) require.Equal(t, true, sig.Verify(sks[0].PublicKey(), bytesutil.FromBytes32(msg)), "Failed to verify generated signature") } diff --git a/validator/keymanager/v1/keymanager.go b/validator/keymanager/v1/keymanager.go index 28e39256e0..d66e601f58 100644 --- a/validator/keymanager/v1/keymanager.go +++ b/validator/keymanager/v1/keymanager.go @@ -1,6 +1,7 @@ package v1 import ( + "context" "errors" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" @@ -22,7 +23,7 @@ type KeyManager interface { FetchValidatingKeys() ([][48]byte, error) // Sign signs a message for the validator to broadcast. // Note that the domain should already be part of the root, but it is passed along for security purposes. - Sign(pubKey [48]byte, root [32]byte) (bls.Signature, error) + Sign(ctx context.Context, pubKey [48]byte, root [32]byte) (bls.Signature, error) } // ProtectingKeyManager provides access to a keymanager that protects its clients from slashing events. diff --git a/validator/keymanager/v1/remote.go b/validator/keymanager/v1/remote.go index 53d3b74ea4..854c8ce5c0 100644 --- a/validator/keymanager/v1/remote.go +++ b/validator/keymanager/v1/remote.go @@ -155,12 +155,12 @@ func (km *Remote) FetchValidatingKeys() ([][48]byte, error) { } // Sign without protection is not supported by remote keymanagers. -func (km *Remote) Sign(pubKey [48]byte, root [32]byte) (bls.Signature, error) { +func (km *Remote) Sign(ctx context.Context, pubKey [48]byte, root [32]byte) (bls.Signature, error) { return nil, errors.New("remote keymanager does not support unprotected signing") } // SignGeneric signs a generic message for the validator to broadcast. -func (km *Remote) SignGeneric(pubKey [48]byte, root [32]byte, domain [32]byte) (bls.Signature, error) { +func (km *Remote) SignGeneric(ctx context.Context, pubKey [48]byte, root [32]byte, domain [32]byte) (bls.Signature, error) { accountInfo, exists := km.accounts[pubKey] if !exists { return nil, ErrNoSuchKey @@ -172,7 +172,7 @@ func (km *Remote) SignGeneric(pubKey [48]byte, root [32]byte, domain [32]byte) ( Data: root[:], Domain: domain[:], } - resp, err := client.Sign(context.Background(), req) + resp, err := client.Sign(ctx, req) if err != nil { return nil, err } @@ -186,7 +186,7 @@ func (km *Remote) SignGeneric(pubKey [48]byte, root [32]byte, domain [32]byte) ( } // SignProposal signs a block proposal for the validator to broadcast. -func (km *Remote) SignProposal(pubKey [48]byte, domain [32]byte, data *ethpb.BeaconBlockHeader) (bls.Signature, error) { +func (km *Remote) SignProposal(ctx context.Context, pubKey [48]byte, domain [32]byte, data *ethpb.BeaconBlockHeader) (bls.Signature, error) { accountInfo, exists := km.accounts[pubKey] if !exists { return nil, ErrNoSuchKey @@ -204,7 +204,7 @@ func (km *Remote) SignProposal(pubKey [48]byte, domain [32]byte, data *ethpb.Bea BodyRoot: data.BodyRoot, }, } - resp, err := client.SignBeaconProposal(context.Background(), req) + resp, err := client.SignBeaconProposal(ctx, req) if err != nil { return nil, err } @@ -218,7 +218,7 @@ func (km *Remote) SignProposal(pubKey [48]byte, domain [32]byte, data *ethpb.Bea } // SignAttestation signs an attestation for the validator to broadcast. -func (km *Remote) SignAttestation(pubKey [48]byte, domain [32]byte, data *ethpb.AttestationData) (bls.Signature, error) { +func (km *Remote) SignAttestation(ctx context.Context, pubKey [48]byte, domain [32]byte, data *ethpb.AttestationData) (bls.Signature, error) { accountInfo, exists := km.accounts[pubKey] if !exists { return nil, ErrNoSuchKey @@ -242,7 +242,7 @@ func (km *Remote) SignAttestation(pubKey [48]byte, domain [32]byte, data *ethpb. }, }, } - resp, err := client.SignBeaconAttestation(context.Background(), req) + resp, err := client.SignBeaconAttestation(ctx, req) if err != nil { return nil, err } @@ -261,7 +261,7 @@ func (km *Remote) RefreshValidatingKeys() error { listAccountsReq := &pb.ListAccountsRequest{ Paths: km.paths, } - resp, err := listerClient.ListAccounts(context.Background(), listAccountsReq) + resp, err := listerClient.ListAccounts(context.TODO(), listAccountsReq) if err != nil { return err } diff --git a/validator/keymanager/v1/wallet.go b/validator/keymanager/v1/wallet.go index 3fb1ee2311..774aa6a125 100644 --- a/validator/keymanager/v1/wallet.go +++ b/validator/keymanager/v1/wallet.go @@ -141,7 +141,7 @@ func (km *Wallet) FetchValidatingKeys() ([][48]byte, error) { } // Sign signs a message for the validator to broadcast. -func (km *Wallet) Sign(pubKey [48]byte, root [32]byte) (bls.Signature, error) { +func (km *Wallet) Sign(ctx context.Context, pubKey [48]byte, root [32]byte) (bls.Signature, error) { account, exists := km.accounts[pubKey] if !exists { return nil, ErrNoSuchKey @@ -151,7 +151,7 @@ func (km *Wallet) Sign(pubKey [48]byte, root [32]byte) (bls.Signature, error) { if !ok { return nil, errors.New("account does not implement the AccountSigner interface") } - sig, err := signer.Sign(context.Background(), root[:]) + sig, err := signer.Sign(ctx, root[:]) if err != nil { return nil, err } diff --git a/validator/keymanager/v2/direct/direct.go b/validator/keymanager/v2/direct/direct.go index fffc7bec54..ba94abadcf 100644 --- a/validator/keymanager/v2/direct/direct.go +++ b/validator/keymanager/v2/direct/direct.go @@ -349,7 +349,7 @@ func (dr *Keymanager) Sign(ctx context.Context, req *validatorpb.SignRequest) (b } func (dr *Keymanager) initializeAccountKeystore(ctx context.Context) error { - encoded, err := dr.wallet.ReadFileAtPath(context.Background(), AccountsPath, accountsKeystoreFileName) + encoded, err := dr.wallet.ReadFileAtPath(ctx, AccountsPath, accountsKeystoreFileName) if err != nil && strings.Contains(err.Error(), "no files found") { // If there are no keys to initialize at all, just exit. return nil diff --git a/validator/main.go b/validator/main.go index 6b71f77437..9f74a212a4 100644 --- a/validator/main.go +++ b/validator/main.go @@ -196,7 +196,7 @@ contract in order to activate the validator client`, return err } } - ctx, cancel := context.WithTimeout(context.Background(), connTimeout) + ctx, cancel := context.WithTimeout(cliCtx.Context, connTimeout) defer cancel() dialOpts := client.ConstructDialOptions( cliCtx.Int(cmd.GrpcMaxCallRecvMsgSizeFlag.Name), @@ -211,7 +211,7 @@ contract in order to activate the validator client`, log.WithError(err).Errorf("Failed to dial beacon node endpoint at %s", endpoint) return err } - err = v1.RunStatusCommand(pubKeys, ethpb.NewBeaconNodeValidatorClient(conn)) + err = v1.RunStatusCommand(ctx, pubKeys, ethpb.NewBeaconNodeValidatorClient(conn)) if closed := conn.Close(); closed != nil { log.WithError(closed).Error("Could not close connection to beacon node") } @@ -259,7 +259,7 @@ contract in order to activate the validator client`, sources := strings.Split(passedSources, ",") target := cliCtx.String(flags.TargetDirectory.Name) - if err := v1.Merge(context.Background(), sources, target); err != nil { + if err := v1.Merge(cliCtx.Context, sources, target); err != nil { log.WithError(err).Error("Merging validator data failed") } else { log.Info("Merge completed successfully") @@ -279,7 +279,7 @@ contract in order to activate the validator client`, source := cliCtx.String(flags.SourceDirectory.Name) target := cliCtx.String(flags.TargetDirectory.Name) - if err := v1.Split(context.Background(), source, target); err != nil { + if err := v1.Split(cliCtx.Context, source, target); err != nil { log.WithError(err).Error("Splitting validator data failed") } else { log.Info("Split completed successfully") diff --git a/validator/node/node.go b/validator/node/node.go index 3406f99bbf..a5d56fd41f 100644 --- a/validator/node/node.go +++ b/validator/node/node.go @@ -4,7 +4,6 @@ package node import ( - "context" "fmt" "io/ioutil" "os" @@ -329,7 +328,7 @@ func (s *ValidatorClient) registerClientService( if err := s.services.FetchService(&sp); err == nil { protector = sp } - v, err := client.NewValidatorService(context.Background(), &client.Config{ + v, err := client.NewValidatorService(s.cliCtx.Context, &client.Config{ Endpoint: endpoint, DataDir: dataDir, KeyManager: keyManager, @@ -363,7 +362,7 @@ func (s *ValidatorClient) registerSlasherClientService() error { maxCallRecvMsgSize := s.cliCtx.Int(cmd.GrpcMaxCallRecvMsgSizeFlag.Name) grpcRetries := s.cliCtx.Uint(flags.GrpcRetriesFlag.Name) grpcRetryDelay := s.cliCtx.Duration(flags.GrpcRetryDelayFlag.Name) - sp, err := slashing_protection.NewSlashingProtectionService(context.Background(), &slashing_protection.Config{ + sp, err := slashing_protection.NewSlashingProtectionService(s.cliCtx.Context, &slashing_protection.Config{ Endpoint: endpoint, CertFlag: cert, GrpcMaxCallRecvMsgSizeFlag: maxCallRecvMsgSize, @@ -385,7 +384,7 @@ func (s *ValidatorClient) registerRPCService(cliCtx *cli.Context) error { rpcHost := cliCtx.String(flags.RPCHost.Name) rpcPort := cliCtx.Int(flags.RPCPort.Name) nodeGatewayEndpoint := cliCtx.String(flags.BeaconRPCGatewayProviderFlag.Name) - server := rpc.NewServer(context.Background(), &rpc.Config{ + server := rpc.NewServer(cliCtx.Context, &rpc.Config{ ValDB: s.db, Host: rpcHost, Port: fmt.Sprintf("%d", rpcPort), @@ -407,7 +406,7 @@ func (s *ValidatorClient) registerRPCGatewayService(cliCtx *cli.Context) error { gatewayAddress := fmt.Sprintf("%s:%d", gatewayHost, gatewayPort) allowedOrigins := strings.Split(cliCtx.String(flags.GPRCGatewayCorsDomain.Name), ",") gatewaySrv := gateway.New( - context.Background(), + cliCtx.Context, rpcAddr, gatewayAddress, allowedOrigins,