diff --git a/beacon-chain/db/kv/regen_historical_states.go b/beacon-chain/db/kv/regen_historical_states.go index d49e6747f3..20f046b28f 100644 --- a/beacon-chain/db/kv/regen_historical_states.go +++ b/beacon-chain/db/kv/regen_historical_states.go @@ -68,6 +68,11 @@ func (kv *Store) regenHistoricalStates(ctx context.Context) error { return err } for i := lastArchivedIndex; i <= lastSavedBlockArchivedIndex; i++ { + // This is an expensive operation, so we check if the context was canceled + // at any point in the iteration. + if err := ctx.Err(); err != nil { + return err + } targetSlot := startSlot + slotsPerArchivedPoint filter := filters.NewFilter().SetStartSlot(startSlot + 1).SetEndSlot(targetSlot) blocks, err := kv.Blocks(ctx, filter) diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 1dad1bce60..affe3f24d2 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -59,7 +59,9 @@ const testSkipPowFlag = "test-skip-pow" // full PoS node. It handles the lifecycle of the entire system and registers // services to a service registry. type BeaconNode struct { - ctx *cli.Context + cliCtx *cli.Context + ctx context.Context + cancel context.CancelFunc services *shared.ServiceRegistry lock sync.RWMutex stop chan struct{} // Channel to wait for termination notifications. @@ -78,23 +80,26 @@ type BeaconNode struct { // NewBeaconNode creates a new node instance, sets up configuration options, and registers // every required service to the node. -func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) { +func NewBeaconNode(cliCtx *cli.Context) (*BeaconNode, error) { if err := tracing.Setup( "beacon-chain", // service name - ctx.String(cmd.TracingProcessNameFlag.Name), - ctx.String(cmd.TracingEndpointFlag.Name), - ctx.Float64(cmd.TraceSampleFractionFlag.Name), - ctx.Bool(cmd.EnableTracingFlag.Name), + cliCtx.String(cmd.TracingProcessNameFlag.Name), + cliCtx.String(cmd.TracingEndpointFlag.Name), + cliCtx.Float64(cmd.TraceSampleFractionFlag.Name), + cliCtx.Bool(cmd.EnableTracingFlag.Name), ); err != nil { return nil, err } - featureconfig.ConfigureBeaconChain(ctx) - flags.ConfigureGlobalFlags(ctx) + featureconfig.ConfigureBeaconChain(cliCtx) + flags.ConfigureGlobalFlags(cliCtx) registry := shared.NewServiceRegistry() + ctx, cancel := context.WithCancel(cliCtx) beacon := &BeaconNode{ + cliCtx: cliCtx, ctx: ctx, + cancel: cancel, services: registry, stop: make(chan struct{}), stateFeed: new(event.Feed), @@ -106,17 +111,17 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) { stateSummaryCache: cache.NewStateSummaryCache(), } - if err := beacon.startDB(ctx); err != nil { + if err := beacon.startDB(cliCtx); err != nil { return nil, err } beacon.startStateGen() - if err := beacon.registerP2P(ctx); err != nil { + if err := beacon.registerP2P(cliCtx); err != nil { return nil, err } - if err := beacon.registerPOWChainService(ctx); err != nil { + if err := beacon.registerPOWChainService(); err != nil { return nil, err } @@ -124,38 +129,38 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) { return nil, err } - if err := beacon.registerInteropServices(ctx); err != nil { + if err := beacon.registerInteropServices(); err != nil { return nil, err } beacon.startForkChoice() - if err := beacon.registerBlockchainService(ctx); err != nil { + if err := beacon.registerBlockchainService(); err != nil { return nil, err } - if err := beacon.registerInitialSyncService(ctx); err != nil { + if err := beacon.registerInitialSyncService(); err != nil { return nil, err } - if err := beacon.registerSyncService(ctx); err != nil { + if err := beacon.registerSyncService(); err != nil { return nil, err } - if err := beacon.registerRPCService(ctx); err != nil { + if err := beacon.registerRPCService(); err != nil { return nil, err } - if err := beacon.registerGRPCGateway(ctx); err != nil { + if err := beacon.registerGRPCGateway(); err != nil { return nil, err } - if err := beacon.registerArchiverService(ctx); err != nil { + if err := beacon.registerArchiverService(); err != nil { return nil, err } - if !ctx.Bool(cmd.DisableMonitoringFlag.Name) { - if err := beacon.registerPrometheusService(ctx); err != nil { + if !cliCtx.Bool(cmd.DisableMonitoringFlag.Name) { + if err := beacon.registerPrometheusService(); err != nil { return nil, err } } @@ -197,7 +202,7 @@ func (b *BeaconNode) Start() { defer signal.Stop(sigc) <-sigc log.Info("Got interrupt, shutting down...") - debug.Exit(b.ctx) // Ensure trace and CPU profile data are flushed. + debug.Exit(b.cliCtx) // Ensure trace and CPU profile data are flushed. go b.Close() for i := 10; i > 0; i-- { <-sigc @@ -218,6 +223,7 @@ func (b *BeaconNode) Close() { defer b.lock.Unlock() log.Info("Stopping beacon node") + b.cancel() // Cancel the beacon node struct's context. b.services.StopAll() if err := b.db.Close(); err != nil { log.Errorf("Failed to close database: %v", err) @@ -230,11 +236,11 @@ func (b *BeaconNode) startForkChoice() { b.forkChoiceStore = f } -func (b *BeaconNode) startDB(ctx *cli.Context) error { - baseDir := ctx.String(cmd.DataDirFlag.Name) +func (b *BeaconNode) startDB(cliCtx *cli.Context) error { + baseDir := cliCtx.String(cmd.DataDirFlag.Name) dbPath := path.Join(baseDir, beaconChainDBName) - clearDB := ctx.Bool(cmd.ClearDB.Name) - forceClearDB := ctx.Bool(cmd.ForceClearDB.Name) + clearDB := cliCtx.Bool(cmd.ClearDB.Name) + forceClearDB := cliCtx.Bool(cmd.ForceClearDB.Name) d, err := db.NewDB(dbPath, b.stateSummaryCache) if err != nil { @@ -260,7 +266,7 @@ func (b *BeaconNode) startDB(ctx *cli.Context) error { return err } } else { - if err := d.HistoricalStatesDeleted(ctx); err != nil { + if err := d.HistoricalStatesDeleted(b.ctx); err != nil { return err } } @@ -275,9 +281,9 @@ func (b *BeaconNode) startStateGen() { b.stateGen = stategen.New(b.db, b.stateSummaryCache) } -func (b *BeaconNode) registerP2P(ctx *cli.Context) error { +func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error { // Bootnode ENR may be a filepath to an ENR file. - bootnodeAddrs := strings.Split(ctx.String(cmd.BootstrapNode.Name), ",") + bootnodeAddrs := strings.Split(cliCtx.String(cmd.BootstrapNode.Name), ",") for i, addr := range bootnodeAddrs { if filepath.Ext(addr) == ".enr" { b, err := ioutil.ReadFile(addr) @@ -288,31 +294,31 @@ func (b *BeaconNode) registerP2P(ctx *cli.Context) error { } } - datadir := ctx.String(cmd.DataDirFlag.Name) + datadir := cliCtx.String(cmd.DataDirFlag.Name) if datadir == "" { datadir = cmd.DefaultDataDir() } svc, err := p2p.NewService(&p2p.Config{ - NoDiscovery: ctx.Bool(cmd.NoDiscovery.Name), - StaticPeers: sliceutil.SplitCommaSeparated(ctx.StringSlice(cmd.StaticPeers.Name)), + NoDiscovery: cliCtx.Bool(cmd.NoDiscovery.Name), + StaticPeers: sliceutil.SplitCommaSeparated(cliCtx.StringSlice(cmd.StaticPeers.Name)), BootstrapNodeAddr: bootnodeAddrs, - RelayNodeAddr: ctx.String(cmd.RelayNode.Name), + RelayNodeAddr: cliCtx.String(cmd.RelayNode.Name), DataDir: datadir, - LocalIP: ctx.String(cmd.P2PIP.Name), - HostAddress: ctx.String(cmd.P2PHost.Name), - HostDNS: ctx.String(cmd.P2PHostDNS.Name), - PrivateKey: ctx.String(cmd.P2PPrivKey.Name), - MetaDataDir: ctx.String(cmd.P2PMetadata.Name), - TCPPort: ctx.Uint(cmd.P2PTCPPort.Name), - UDPPort: ctx.Uint(cmd.P2PUDPPort.Name), - MaxPeers: ctx.Uint(cmd.P2PMaxPeers.Name), - WhitelistCIDR: ctx.String(cmd.P2PWhitelist.Name), - EnableUPnP: ctx.Bool(cmd.EnableUPnPFlag.Name), - DisableDiscv5: ctx.Bool(flags.DisableDiscv5.Name), - Encoding: ctx.String(cmd.P2PEncoding.Name), + LocalIP: cliCtx.String(cmd.P2PIP.Name), + HostAddress: cliCtx.String(cmd.P2PHost.Name), + HostDNS: cliCtx.String(cmd.P2PHostDNS.Name), + PrivateKey: cliCtx.String(cmd.P2PPrivKey.Name), + MetaDataDir: cliCtx.String(cmd.P2PMetadata.Name), + TCPPort: cliCtx.Uint(cmd.P2PTCPPort.Name), + UDPPort: cliCtx.Uint(cmd.P2PUDPPort.Name), + MaxPeers: cliCtx.Uint(cmd.P2PMaxPeers.Name), + WhitelistCIDR: cliCtx.String(cmd.P2PWhitelist.Name), + EnableUPnP: cliCtx.Bool(cmd.EnableUPnPFlag.Name), + DisableDiscv5: cliCtx.Bool(flags.DisableDiscv5.Name), + Encoding: cliCtx.String(cmd.P2PEncoding.Name), StateNotifier: b, - PubSub: ctx.String(cmd.P2PPubsub.Name), + PubSub: cliCtx.String(cmd.P2PPubsub.Name), }) if err != nil { return err @@ -320,7 +326,7 @@ func (b *BeaconNode) registerP2P(ctx *cli.Context) error { return b.services.RegisterService(svc) } -func (b *BeaconNode) fetchP2P(ctx *cli.Context) p2p.P2P { +func (b *BeaconNode) fetchP2P() p2p.P2P { var p *p2p.Service if err := b.services.FetchService(&p); err != nil { panic(err) @@ -329,7 +335,7 @@ func (b *BeaconNode) fetchP2P(ctx *cli.Context) p2p.P2P { } func (b *BeaconNode) registerAttestationPool() error { - s, err := attestations.NewService(context.Background(), &attestations.Config{ + s, err := attestations.NewService(b.ctx, &attestations.Config{ Pool: b.attestationPool, }) if err != nil { @@ -338,7 +344,7 @@ func (b *BeaconNode) registerAttestationPool() error { return b.services.RegisterService(s) } -func (b *BeaconNode) registerBlockchainService(ctx *cli.Context) error { +func (b *BeaconNode) registerBlockchainService() error { var web3Service *powchain.Service if err := b.services.FetchService(&web3Service); err != nil { return err @@ -349,15 +355,15 @@ func (b *BeaconNode) registerBlockchainService(ctx *cli.Context) error { return err } - maxRoutines := ctx.Int64(cmd.MaxGoroutines.Name) - blockchainService, err := blockchain.NewService(context.Background(), &blockchain.Config{ + maxRoutines := b.cliCtx.Int64(cmd.MaxGoroutines.Name) + blockchainService, err := blockchain.NewService(b.ctx, &blockchain.Config{ BeaconDB: b.db, DepositCache: b.depositCache, ChainStartFetcher: web3Service, AttPool: b.attestationPool, ExitPool: b.exitPool, SlashingPool: b.slashingsPool, - P2p: b.fetchP2P(ctx), + P2p: b.fetchP2P(), MaxRoutines: maxRoutines, StateNotifier: b, ForkChoiceStore: b.forkChoiceStore, @@ -370,11 +376,11 @@ func (b *BeaconNode) registerBlockchainService(ctx *cli.Context) error { return b.services.RegisterService(blockchainService) } -func (b *BeaconNode) registerPOWChainService(cliCtx *cli.Context) error { - if cliCtx.Bool(testSkipPowFlag) { +func (b *BeaconNode) registerPOWChainService() error { + if b.cliCtx.Bool(testSkipPowFlag) { return b.services.RegisterService(&powchain.Service{}) } - depAddress := cliCtx.String(flags.DepositContractFlag.Name) + depAddress := b.cliCtx.String(flags.DepositContractFlag.Name) if depAddress == "" { log.Fatal(fmt.Sprintf("%s is required", flags.DepositContractFlag.Name)) } @@ -383,25 +389,24 @@ func (b *BeaconNode) registerPOWChainService(cliCtx *cli.Context) error { log.Fatalf("Invalid deposit contract address given: %s", depAddress) } - ctx := context.Background() cfg := &powchain.Web3ServiceConfig{ - ETH1Endpoint: cliCtx.String(flags.Web3ProviderFlag.Name), - HTTPEndPoint: cliCtx.String(flags.HTTPWeb3ProviderFlag.Name), + ETH1Endpoint: b.cliCtx.String(flags.Web3ProviderFlag.Name), + HTTPEndPoint: b.cliCtx.String(flags.HTTPWeb3ProviderFlag.Name), DepositContract: common.HexToAddress(depAddress), BeaconDB: b.db, DepositCache: b.depositCache, StateNotifier: b, } - web3Service, err := powchain.NewService(ctx, cfg) + web3Service, err := powchain.NewService(b.ctx, cfg) if err != nil { return errors.Wrap(err, "could not register proof-of-work chain web3Service") } - knownContract, err := b.db.DepositContractAddress(ctx) + knownContract, err := b.db.DepositContractAddress(b.ctx) if err != nil { return err } if len(knownContract) == 0 { - if err := b.db.SaveDepositContractAddress(ctx, cfg.DepositContract); err != nil { + if err := b.db.SaveDepositContractAddress(b.ctx, cfg.DepositContract); err != nil { return errors.Wrap(err, "could not save deposit contract") } } @@ -411,7 +416,7 @@ func (b *BeaconNode) registerPOWChainService(cliCtx *cli.Context) error { return b.services.RegisterService(web3Service) } -func (b *BeaconNode) registerSyncService(ctx *cli.Context) error { +func (b *BeaconNode) registerSyncService() error { var web3Service *powchain.Service if err := b.services.FetchService(&web3Service); err != nil { return err @@ -439,7 +444,7 @@ func (b *BeaconNode) registerSyncService(ctx *cli.Context) error { rs := prysmsync.NewRegularSync(&prysmsync.Config{ DB: b.db, - P2P: b.fetchP2P(ctx), + P2P: b.fetchP2P(), Chain: chainService, InitialSync: initSync, StateNotifier: b, @@ -455,7 +460,7 @@ func (b *BeaconNode) registerSyncService(ctx *cli.Context) error { return b.services.RegisterService(rs) } -func (b *BeaconNode) registerInitialSyncService(ctx *cli.Context) error { +func (b *BeaconNode) registerInitialSyncService() error { var chainService *blockchain.Service if err := b.services.FetchService(&chainService); err != nil { return err @@ -465,7 +470,7 @@ func (b *BeaconNode) registerInitialSyncService(ctx *cli.Context) error { is := initialsyncold.NewInitialSync(&initialsyncold.Config{ DB: b.db, Chain: chainService, - P2P: b.fetchP2P(ctx), + P2P: b.fetchP2P(), StateNotifier: b, BlockNotifier: b, }) @@ -475,14 +480,14 @@ func (b *BeaconNode) registerInitialSyncService(ctx *cli.Context) error { is := initialsync.NewInitialSync(&initialsync.Config{ DB: b.db, Chain: chainService, - P2P: b.fetchP2P(ctx), + P2P: b.fetchP2P(), StateNotifier: b, BlockNotifier: b, }) return b.services.RegisterService(is) } -func (b *BeaconNode) registerRPCService(ctx *cli.Context) error { +func (b *BeaconNode) registerRPCService() error { var chainService *blockchain.Service if err := b.services.FetchService(&chainService); err != nil { return err @@ -508,8 +513,8 @@ func (b *BeaconNode) registerRPCService(ctx *cli.Context) error { syncService = initSyncTmp } - genesisValidators := ctx.Uint64(flags.InteropNumValidatorsFlag.Name) - genesisStatePath := ctx.String(flags.InteropGenesisStateFlag.Name) + genesisValidators := b.cliCtx.Uint64(flags.InteropNumValidatorsFlag.Name) + genesisStatePath := b.cliCtx.String(flags.InteropGenesisStateFlag.Name) var depositFetcher depositcache.DepositFetcher var chainStartFetcher powchain.ChainStartFetcher if genesisValidators > 0 || genesisStatePath != "" { @@ -524,22 +529,22 @@ func (b *BeaconNode) registerRPCService(ctx *cli.Context) error { chainStartFetcher = web3Service } - host := ctx.String(flags.RPCHost.Name) - port := ctx.String(flags.RPCPort.Name) - cert := ctx.String(flags.CertFlag.Name) - key := ctx.String(flags.KeyFlag.Name) - slasherCert := ctx.String(flags.SlasherCertFlag.Name) - slasherProvider := ctx.String(flags.SlasherProviderFlag.Name) - - mockEth1DataVotes := ctx.Bool(flags.InteropMockEth1DataVotesFlag.Name) - rpcService := rpc.NewService(context.Background(), &rpc.Config{ + host := b.cliCtx.String(flags.RPCHost.Name) + port := b.cliCtx.String(flags.RPCPort.Name) + cert := b.cliCtx.String(flags.CertFlag.Name) + key := b.cliCtx.String(flags.KeyFlag.Name) + slasherCert := b.cliCtx.String(flags.SlasherCertFlag.Name) + slasherProvider := b.cliCtx.String(flags.SlasherProviderFlag.Name) + mockEth1DataVotes := b.cliCtx.Bool(flags.InteropMockEth1DataVotesFlag.Name) + p2pService := b.fetchP2P() + rpcService := rpc.NewService(b.ctx, &rpc.Config{ Host: host, Port: port, CertFlag: cert, KeyFlag: key, BeaconDB: b.db, - Broadcaster: b.fetchP2P(ctx), - PeersFetcher: b.fetchP2P(ctx), + Broadcaster: p2pService, + PeersFetcher: p2pService, HeadFetcher: chainService, ForkFetcher: chainService, FinalizationFetcher: chainService, @@ -568,7 +573,7 @@ func (b *BeaconNode) registerRPCService(ctx *cli.Context) error { return b.services.RegisterService(rpcService) } -func (b *BeaconNode) registerPrometheusService(ctx *cli.Context) error { +func (b *BeaconNode) registerPrometheusService() error { var additionalHandlers []prometheus.Handler var p *p2p.Service if err := b.services.FetchService(&p); err != nil { @@ -588,7 +593,7 @@ func (b *BeaconNode) registerPrometheusService(ctx *cli.Context) error { additionalHandlers = append(additionalHandlers, prometheus.Handler{Path: "/tree", Handler: c.TreeHandler}) service := prometheus.NewPrometheusService( - fmt.Sprintf(":%d", ctx.Int64(flags.MonitoringPortFlag.Name)), + fmt.Sprintf(":%d", b.cliCtx.Int64(flags.MonitoringPortFlag.Name)), b.services, additionalHandlers..., ) @@ -597,24 +602,32 @@ func (b *BeaconNode) registerPrometheusService(ctx *cli.Context) error { return b.services.RegisterService(service) } -func (b *BeaconNode) registerGRPCGateway(ctx *cli.Context) error { - gatewayPort := ctx.Int(flags.GRPCGatewayPort.Name) +func (b *BeaconNode) registerGRPCGateway() error { + gatewayPort := b.cliCtx.Int(flags.GRPCGatewayPort.Name) if gatewayPort > 0 { - selfAddress := fmt.Sprintf("127.0.0.1:%d", ctx.Int(flags.RPCPort.Name)) + selfAddress := fmt.Sprintf("127.0.0.1:%d", b.cliCtx.Int(flags.RPCPort.Name)) gatewayAddress := fmt.Sprintf("0.0.0.0:%d", gatewayPort) - allowedOrigins := strings.Split(ctx.String(flags.GPRCGatewayCorsDomain.Name), ",") - return b.services.RegisterService(gateway.New(context.Background(), selfAddress, gatewayAddress, nil /*optional mux*/, allowedOrigins)) + allowedOrigins := strings.Split(b.cliCtx.String(flags.GPRCGatewayCorsDomain.Name), ",") + return b.services.RegisterService( + gateway.New( + b.ctx, + selfAddress, + gatewayAddress, + nil, /*optional mux*/ + allowedOrigins, + ), + ) } return nil } -func (b *BeaconNode) registerInteropServices(ctx *cli.Context) error { - genesisTime := ctx.Uint64(flags.InteropGenesisTimeFlag.Name) - genesisValidators := ctx.Uint64(flags.InteropNumValidatorsFlag.Name) - genesisStatePath := ctx.String(flags.InteropGenesisStateFlag.Name) +func (b *BeaconNode) registerInteropServices() error { + genesisTime := b.cliCtx.Uint64(flags.InteropGenesisTimeFlag.Name) + genesisValidators := b.cliCtx.Uint64(flags.InteropNumValidatorsFlag.Name) + genesisStatePath := b.cliCtx.String(flags.InteropGenesisStateFlag.Name) if genesisValidators > 0 || genesisStatePath != "" { - svc := interopcoldstart.NewColdStartService(context.Background(), &interopcoldstart.Config{ + svc := interopcoldstart.NewColdStartService(b.ctx, &interopcoldstart.Config{ GenesisTime: genesisTime, NumValidators: genesisValidators, BeaconDB: b.db, @@ -627,7 +640,7 @@ func (b *BeaconNode) registerInteropServices(ctx *cli.Context) error { return nil } -func (b *BeaconNode) registerArchiverService(ctx *cli.Context) error { +func (b *BeaconNode) registerArchiverService() error { if !flags.Get().EnableArchive { return nil } @@ -635,7 +648,7 @@ func (b *BeaconNode) registerArchiverService(ctx *cli.Context) error { if err := b.services.FetchService(&chainService); err != nil { return err } - svc := archiver.NewArchiverService(context.Background(), &archiver.Config{ + svc := archiver.NewArchiverService(b.ctx, &archiver.Config{ BeaconDB: b.db, HeadFetcher: chainService, ParticipationFetcher: chainService,