From 252c9a76ef59ef834ec73ea7ec1d8e1ca217b145 Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Fri, 5 Jan 2024 22:43:59 +0800 Subject: [PATCH] feat(gas-oracle & rollup-relayer): graceful shutdown --- rollup/cmd/gas_oracle/app/app.go | 44 +++++++++++------ rollup/cmd/rollup_relayer/app/app.go | 43 +++++++++++------ rollup/conf/config.json | 8 +++- rollup/internal/config/relayer.go | 4 ++ .../internal/controller/relayer/l1_relayer.go | 4 ++ .../internal/controller/relayer/l2_relayer.go | 6 +++ rollup/internal/controller/sender/sender.go | 48 +++++++++++++++++++ 7 files changed, 126 insertions(+), 31 deletions(-) diff --git a/rollup/cmd/gas_oracle/app/app.go b/rollup/cmd/gas_oracle/app/app.go index 230f18019..a5b0bfa63 100644 --- a/rollup/cmd/gas_oracle/app/app.go +++ b/rollup/cmd/gas_oracle/app/app.go @@ -50,18 +50,13 @@ func action(ctx *cli.Context) error { if err != nil { log.Crit("failed to load config file", "config file", cfgFile, "error", err) } - subCtx, cancel := context.WithCancel(ctx.Context) + instanceCtx, instanceCancel := context.WithCancel(ctx.Context) + loopCtx, loopCancel := context.WithCancel(ctx.Context) // Init db connection db, err := database.InitDB(cfg.DBConfig) if err != nil { log.Crit("failed to init db connection", "err", err) } - defer func() { - cancel() - if err = database.CloseDB(db); err != nil { - log.Error("can not close ormFactory", "error", err) - } - }() registry := prometheus.DefaultRegisterer observability.Server(ctx, db) @@ -79,20 +74,20 @@ func action(ctx *cli.Context) error { return err } - l1watcher := watcher.NewL1WatcherClient(ctx.Context, l1client, cfg.L1Config.StartHeight, cfg.L1Config.Confirmations, cfg.L1Config.L1MessageQueueAddress, cfg.L1Config.ScrollChainContractAddress, db, registry) + l1watcher := watcher.NewL1WatcherClient(instanceCtx, l1client, cfg.L1Config.StartHeight, cfg.L1Config.Confirmations, cfg.L1Config.L1MessageQueueAddress, cfg.L1Config.ScrollChainContractAddress, db, registry) - l1relayer, err := relayer.NewLayer1Relayer(ctx.Context, db, cfg.L1Config.RelayerConfig, registry) + l1relayer, err := relayer.NewLayer1Relayer(instanceCtx, db, cfg.L1Config.RelayerConfig, registry) if err != nil { log.Error("failed to create new l1 relayer", "config file", cfgFile, "error", err) return err } - l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, false /* initGenesis */, registry) + l2relayer, err := relayer.NewLayer2Relayer(instanceCtx, l2client, db, cfg.L2Config.RelayerConfig, false /* initGenesis */, registry) if err != nil { log.Error("failed to create new l2 relayer", "config file", cfgFile, "error", err) return err } // Start l1 watcher process - go utils.LoopWithContext(subCtx, 10*time.Second, func(ctx context.Context) { + go utils.LoopWithContext(loopCtx, 10*time.Second, func(ctx context.Context) { // Fetch the latest block number to decrease the delay when fetching gas prices // Use latest block number - 1 to prevent frequent reorg number, loopErr := butils.GetLatestConfirmedBlockNumber(ctx, l1client, rpc.LatestBlockNumber) @@ -106,9 +101,30 @@ func action(ctx *cli.Context) error { } }) - // Start l1relayer process - go utils.Loop(subCtx, 10*time.Second, l1relayer.ProcessGasPriceOracle) - go utils.Loop(subCtx, 2*time.Second, l2relayer.ProcessGasPriceOracle) + // Goroutines that may send transactions periodically. + go utils.Loop(loopCtx, 10*time.Second, l1relayer.ProcessGasPriceOracle) + go utils.Loop(loopCtx, 2*time.Second, l2relayer.ProcessGasPriceOracle) + + defer func() { + // Initiate the graceful shutdown process. + log.Info("Graceful shutdown initiated") + + // Prevent new transactions by cancelling the loop context. + loopCancel() + + // Close relayers to ensure all pending transactions are processed. + // This includes any in-flight transactions that have not yet been confirmed. + l1relayer.Close() + l2relayer.Close() + + // Halt confirmation signal handling by cancelling the instance context. + instanceCancel() + + // Close the database connection. + if err = database.CloseDB(db); err != nil { + log.Error("Failed to close database connection", "error", err) + } + }() // Finish start all message relayer functions log.Info("Start gas-oracle successfully") diff --git a/rollup/cmd/rollup_relayer/app/app.go b/rollup/cmd/rollup_relayer/app/app.go index 63213fd29..5bf4ddc44 100644 --- a/rollup/cmd/rollup_relayer/app/app.go +++ b/rollup/cmd/rollup_relayer/app/app.go @@ -50,18 +50,13 @@ func action(ctx *cli.Context) error { log.Crit("failed to load config file", "config file", cfgFile, "error", err) } - subCtx, cancel := context.WithCancel(ctx.Context) + instanceCtx, instanceCancel := context.WithCancel(ctx.Context) + loopCtx, loopCancel := context.WithCancel(ctx.Context) // Init db connection db, err := database.InitDB(cfg.DBConfig) if err != nil { log.Crit("failed to init db connection", "err", err) } - defer func() { - cancel() - if err = database.CloseDB(db); err != nil { - log.Error("can not close ormFactory", "error", err) - } - }() registry := prometheus.DefaultRegisterer observability.Server(ctx, db) @@ -80,23 +75,23 @@ func action(ctx *cli.Context) error { return err } - chunkProposer := watcher.NewChunkProposer(subCtx, cfg.L2Config.ChunkProposerConfig, db, registry) + chunkProposer := watcher.NewChunkProposer(instanceCtx, cfg.L2Config.ChunkProposerConfig, db, registry) if err != nil { log.Error("failed to create chunkProposer", "config file", cfgFile, "error", err) return err } - batchProposer := watcher.NewBatchProposer(subCtx, cfg.L2Config.BatchProposerConfig, db, registry) + batchProposer := watcher.NewBatchProposer(instanceCtx, cfg.L2Config.BatchProposerConfig, db, registry) if err != nil { log.Error("failed to create batchProposer", "config file", cfgFile, "error", err) return err } - l2watcher := watcher.NewL2WatcherClient(subCtx, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessengerAddress, + l2watcher := watcher.NewL2WatcherClient(instanceCtx, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessengerAddress, cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, db, registry) // Watcher loop to fetch missing blocks - go utils.LoopWithContext(subCtx, 2*time.Second, func(ctx context.Context) { + go utils.LoopWithContext(loopCtx, 2*time.Second, func(ctx context.Context) { number, loopErr := butils.GetLatestConfirmedBlockNumber(ctx, l2client, cfg.L2Config.Confirmations) if loopErr != nil { log.Error("failed to get block number", "err", loopErr) @@ -105,13 +100,31 @@ func action(ctx *cli.Context) error { l2watcher.TryFetchRunningMissingBlocks(number) }) - go utils.Loop(subCtx, 2*time.Second, chunkProposer.TryProposeChunk) + // Goroutines that may send transactions periodically. + go utils.Loop(loopCtx, 2*time.Second, chunkProposer.TryProposeChunk) + go utils.Loop(loopCtx, 10*time.Second, batchProposer.TryProposeBatch) + go utils.Loop(loopCtx, 2*time.Second, l2relayer.ProcessPendingBatches) + go utils.Loop(loopCtx, 15*time.Second, l2relayer.ProcessCommittedBatches) - go utils.Loop(subCtx, 10*time.Second, batchProposer.TryProposeBatch) + defer func() { + // Initiate the graceful shutdown process. + log.Info("Graceful shutdown initiated") - go utils.Loop(subCtx, 2*time.Second, l2relayer.ProcessPendingBatches) + // Prevent new transactions by cancelling the loop context. + loopCancel() - go utils.Loop(subCtx, 15*time.Second, l2relayer.ProcessCommittedBatches) + // Close relayers to ensure all pending transactions are processed. + // This includes any in-flight transactions that have not yet been confirmed. + l2relayer.Close() + + // Halt confirmation signal handling by cancelling the instance context. + instanceCancel() + + // Close the database connection. + if err = database.CloseDB(db); err != nil { + log.Error("Failed to close database connection", "error", err) + } + }() // Finish start all rollup relayer functions. log.Info("Start rollup-relayer successfully") diff --git a/rollup/conf/config.json b/rollup/conf/config.json index c02abe006..b1e84090b 100644 --- a/rollup/conf/config.json +++ b/rollup/conf/config.json @@ -18,7 +18,9 @@ "max_gas_price": 10000000000, "tx_type": "LegacyTx", "min_balance": 100000000000000000000, - "pending_limit": 10 + "pending_limit": 10, + "max_close_attempts": 12, + "close_attempt_interval_ms": 10000 }, "gas_oracle_config": { "min_gas_price": 0, @@ -46,7 +48,9 @@ "max_gas_price": 10000000000, "tx_type": "LegacyTx", "min_balance": 100000000000000000000, - "pending_limit": 10 + "pending_limit": 10, + "max_close_attempts": 12, + "close_attempt_interval_ms": 10000 }, "gas_oracle_config": { "min_gas_price": 0, diff --git a/rollup/internal/config/relayer.go b/rollup/internal/config/relayer.go index 9df5e18a7..334daf002 100644 --- a/rollup/internal/config/relayer.go +++ b/rollup/internal/config/relayer.go @@ -35,6 +35,10 @@ type SenderConfig struct { CheckBalanceTime uint64 `json:"check_balance_time"` // The sender's pending count limit. PendingLimit int `json:"pending_limit"` + // The maximum number of attempts to close the sender. + MaxCloseAttempts int `json:"max_close_attempts"` + // The interval (in seconds) between each close attempt. + CloseAttemptIntervalMs int `json:"close_attempt_interval_ms"` } // ChainMonitor this config is used to get batch status from chain_monitor API. diff --git a/rollup/internal/controller/relayer/l1_relayer.go b/rollup/internal/controller/relayer/l1_relayer.go index 401fc8a46..550ff34db 100644 --- a/rollup/internal/controller/relayer/l1_relayer.go +++ b/rollup/internal/controller/relayer/l1_relayer.go @@ -161,3 +161,7 @@ func (r *Layer1Relayer) handleConfirmLoop(ctx context.Context) { } } } + +func (r *Layer1Relayer) Close() { + r.gasOracleSender.Close() +} diff --git a/rollup/internal/controller/relayer/l2_relayer.go b/rollup/internal/controller/relayer/l2_relayer.go index fdcf63792..7f26ca519 100644 --- a/rollup/internal/controller/relayer/l2_relayer.go +++ b/rollup/internal/controller/relayer/l2_relayer.go @@ -720,3 +720,9 @@ func (r *Layer2Relayer) handleConfirmLoop(ctx context.Context) { } } } + +func (r *Layer2Relayer) Close() { + r.commitSender.Close() + r.finalizeSender.Close() + r.gasOracleSender.Close() +} diff --git a/rollup/internal/controller/sender/sender.go b/rollup/internal/controller/sender/sender.go index b496b43b5..bd4473efd 100644 --- a/rollup/internal/controller/sender/sender.go +++ b/rollup/internal/controller/sender/sender.go @@ -145,6 +145,24 @@ func NewSender(ctx context.Context, config *config.SenderConfig, priv *ecdsa.Pri } sender.metrics = initSenderMetrics(reg) + log.Info("Creating new sender", + "Service", service, + "Name", name, + "Endpoint", config.Endpoint, + "ChainID", chainID.Uint64(), + "TxType", config.TxType, + "MaxGasPrice", config.MaxGasPrice, + "MinBalance", config.MinBalance.Uint64(), + "EscalateBlocks", config.EscalateBlocks, + "Confirmations", config.Confirmations, + "EscalateMultiple", fmt.Sprintf("%d/%d", config.EscalateMultipleNum, config.EscalateMultipleDen), + "CheckPendingTimeSec", config.CheckPendingTime, + "CheckBalanceTimeSec", config.CheckBalanceTime, + "PendingLimit", config.PendingLimit, + "MaxCloseAttempts", config.MaxCloseAttempts, + "CloseAttemptIntervalMs", config.CloseAttemptIntervalMs, + ) + go sender.loop(ctx) return sender, nil @@ -584,3 +602,33 @@ func (s *Sender) loop(ctx context.Context) { } } } + +func (s *Sender) Close() { + for attempt := 0; attempt < s.config.MaxCloseAttempts; attempt++ { + if s.pendingTxs.IsEmpty() { + break + } + log.Info("Attempting to close Sender: pending transactions unhandled.", + "sender", s.auth.From.String(), + "attempt", attempt+1, + "txCount", s.pendingTxs.Count(), + ) + for item := range s.pendingTxs.IterBuffered() { + key, pending := item.Key, item.Val + log.Info("Handling pending transaction", + "keyInMap", key, + "from", s.auth.From.String(), + "hash", pending.tx.Hash().String(), + "nonce", pending.tx.Nonce(), + ) + } + time.Sleep(time.Duration(s.config.CloseAttemptIntervalMs) * time.Millisecond) + } + if !s.pendingTxs.IsEmpty() { + log.Warn("Sender closing with unhandled pending transactions.", + "sender", s.auth.From.String(), + "unhandledTxCount", s.pendingTxs.Count(), + ) + } + close(s.stopCh) +}