feat(gas-oracle & rollup-relayer): graceful shutdown

This commit is contained in:
colinlyguo
2024-01-05 22:43:59 +08:00
parent b96e8778a5
commit 252c9a76ef
7 changed files with 126 additions and 31 deletions

View File

@@ -50,18 +50,13 @@ func action(ctx *cli.Context) error {
if err != nil {
log.Crit("failed to load config file", "config file", cfgFile, "error", err)
}
subCtx, cancel := context.WithCancel(ctx.Context)
instanceCtx, instanceCancel := context.WithCancel(ctx.Context)
loopCtx, loopCancel := context.WithCancel(ctx.Context)
// Init db connection
db, err := database.InitDB(cfg.DBConfig)
if err != nil {
log.Crit("failed to init db connection", "err", err)
}
defer func() {
cancel()
if err = database.CloseDB(db); err != nil {
log.Error("can not close ormFactory", "error", err)
}
}()
registry := prometheus.DefaultRegisterer
observability.Server(ctx, db)
@@ -79,20 +74,20 @@ func action(ctx *cli.Context) error {
return err
}
l1watcher := watcher.NewL1WatcherClient(ctx.Context, l1client, cfg.L1Config.StartHeight, cfg.L1Config.Confirmations, cfg.L1Config.L1MessageQueueAddress, cfg.L1Config.ScrollChainContractAddress, db, registry)
l1watcher := watcher.NewL1WatcherClient(instanceCtx, l1client, cfg.L1Config.StartHeight, cfg.L1Config.Confirmations, cfg.L1Config.L1MessageQueueAddress, cfg.L1Config.ScrollChainContractAddress, db, registry)
l1relayer, err := relayer.NewLayer1Relayer(ctx.Context, db, cfg.L1Config.RelayerConfig, registry)
l1relayer, err := relayer.NewLayer1Relayer(instanceCtx, db, cfg.L1Config.RelayerConfig, registry)
if err != nil {
log.Error("failed to create new l1 relayer", "config file", cfgFile, "error", err)
return err
}
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, false /* initGenesis */, registry)
l2relayer, err := relayer.NewLayer2Relayer(instanceCtx, l2client, db, cfg.L2Config.RelayerConfig, false /* initGenesis */, registry)
if err != nil {
log.Error("failed to create new l2 relayer", "config file", cfgFile, "error", err)
return err
}
// Start l1 watcher process
go utils.LoopWithContext(subCtx, 10*time.Second, func(ctx context.Context) {
go utils.LoopWithContext(loopCtx, 10*time.Second, func(ctx context.Context) {
// Fetch the latest block number to decrease the delay when fetching gas prices
// Use latest block number - 1 to prevent frequent reorg
number, loopErr := butils.GetLatestConfirmedBlockNumber(ctx, l1client, rpc.LatestBlockNumber)
@@ -106,9 +101,30 @@ func action(ctx *cli.Context) error {
}
})
// Start l1relayer process
go utils.Loop(subCtx, 10*time.Second, l1relayer.ProcessGasPriceOracle)
go utils.Loop(subCtx, 2*time.Second, l2relayer.ProcessGasPriceOracle)
// Goroutines that may send transactions periodically.
go utils.Loop(loopCtx, 10*time.Second, l1relayer.ProcessGasPriceOracle)
go utils.Loop(loopCtx, 2*time.Second, l2relayer.ProcessGasPriceOracle)
defer func() {
// Initiate the graceful shutdown process.
log.Info("Graceful shutdown initiated")
// Prevent new transactions by cancelling the loop context.
loopCancel()
// Close relayers to ensure all pending transactions are processed.
// This includes any in-flight transactions that have not yet been confirmed.
l1relayer.Close()
l2relayer.Close()
// Halt confirmation signal handling by cancelling the instance context.
instanceCancel()
// Close the database connection.
if err = database.CloseDB(db); err != nil {
log.Error("Failed to close database connection", "error", err)
}
}()
// Finish start all message relayer functions
log.Info("Start gas-oracle successfully")

View File

@@ -50,18 +50,13 @@ func action(ctx *cli.Context) error {
log.Crit("failed to load config file", "config file", cfgFile, "error", err)
}
subCtx, cancel := context.WithCancel(ctx.Context)
instanceCtx, instanceCancel := context.WithCancel(ctx.Context)
loopCtx, loopCancel := context.WithCancel(ctx.Context)
// Init db connection
db, err := database.InitDB(cfg.DBConfig)
if err != nil {
log.Crit("failed to init db connection", "err", err)
}
defer func() {
cancel()
if err = database.CloseDB(db); err != nil {
log.Error("can not close ormFactory", "error", err)
}
}()
registry := prometheus.DefaultRegisterer
observability.Server(ctx, db)
@@ -80,23 +75,23 @@ func action(ctx *cli.Context) error {
return err
}
chunkProposer := watcher.NewChunkProposer(subCtx, cfg.L2Config.ChunkProposerConfig, db, registry)
chunkProposer := watcher.NewChunkProposer(instanceCtx, cfg.L2Config.ChunkProposerConfig, db, registry)
if err != nil {
log.Error("failed to create chunkProposer", "config file", cfgFile, "error", err)
return err
}
batchProposer := watcher.NewBatchProposer(subCtx, cfg.L2Config.BatchProposerConfig, db, registry)
batchProposer := watcher.NewBatchProposer(instanceCtx, cfg.L2Config.BatchProposerConfig, db, registry)
if err != nil {
log.Error("failed to create batchProposer", "config file", cfgFile, "error", err)
return err
}
l2watcher := watcher.NewL2WatcherClient(subCtx, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessengerAddress,
l2watcher := watcher.NewL2WatcherClient(instanceCtx, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessengerAddress,
cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, db, registry)
// Watcher loop to fetch missing blocks
go utils.LoopWithContext(subCtx, 2*time.Second, func(ctx context.Context) {
go utils.LoopWithContext(loopCtx, 2*time.Second, func(ctx context.Context) {
number, loopErr := butils.GetLatestConfirmedBlockNumber(ctx, l2client, cfg.L2Config.Confirmations)
if loopErr != nil {
log.Error("failed to get block number", "err", loopErr)
@@ -105,13 +100,31 @@ func action(ctx *cli.Context) error {
l2watcher.TryFetchRunningMissingBlocks(number)
})
go utils.Loop(subCtx, 2*time.Second, chunkProposer.TryProposeChunk)
// Goroutines that may send transactions periodically.
go utils.Loop(loopCtx, 2*time.Second, chunkProposer.TryProposeChunk)
go utils.Loop(loopCtx, 10*time.Second, batchProposer.TryProposeBatch)
go utils.Loop(loopCtx, 2*time.Second, l2relayer.ProcessPendingBatches)
go utils.Loop(loopCtx, 15*time.Second, l2relayer.ProcessCommittedBatches)
go utils.Loop(subCtx, 10*time.Second, batchProposer.TryProposeBatch)
defer func() {
// Initiate the graceful shutdown process.
log.Info("Graceful shutdown initiated")
go utils.Loop(subCtx, 2*time.Second, l2relayer.ProcessPendingBatches)
// Prevent new transactions by cancelling the loop context.
loopCancel()
go utils.Loop(subCtx, 15*time.Second, l2relayer.ProcessCommittedBatches)
// Close relayers to ensure all pending transactions are processed.
// This includes any in-flight transactions that have not yet been confirmed.
l2relayer.Close()
// Halt confirmation signal handling by cancelling the instance context.
instanceCancel()
// Close the database connection.
if err = database.CloseDB(db); err != nil {
log.Error("Failed to close database connection", "error", err)
}
}()
// Finish start all rollup relayer functions.
log.Info("Start rollup-relayer successfully")

View File

@@ -18,7 +18,9 @@
"max_gas_price": 10000000000,
"tx_type": "LegacyTx",
"min_balance": 100000000000000000000,
"pending_limit": 10
"pending_limit": 10,
"max_close_attempts": 12,
"close_attempt_interval_ms": 10000
},
"gas_oracle_config": {
"min_gas_price": 0,
@@ -46,7 +48,9 @@
"max_gas_price": 10000000000,
"tx_type": "LegacyTx",
"min_balance": 100000000000000000000,
"pending_limit": 10
"pending_limit": 10,
"max_close_attempts": 12,
"close_attempt_interval_ms": 10000
},
"gas_oracle_config": {
"min_gas_price": 0,

View File

@@ -35,6 +35,10 @@ type SenderConfig struct {
CheckBalanceTime uint64 `json:"check_balance_time"`
// The sender's pending count limit.
PendingLimit int `json:"pending_limit"`
// The maximum number of attempts to close the sender.
MaxCloseAttempts int `json:"max_close_attempts"`
// The interval (in seconds) between each close attempt.
CloseAttemptIntervalMs int `json:"close_attempt_interval_ms"`
}
// ChainMonitor this config is used to get batch status from chain_monitor API.

View File

@@ -161,3 +161,7 @@ func (r *Layer1Relayer) handleConfirmLoop(ctx context.Context) {
}
}
}
func (r *Layer1Relayer) Close() {
r.gasOracleSender.Close()
}

View File

@@ -720,3 +720,9 @@ func (r *Layer2Relayer) handleConfirmLoop(ctx context.Context) {
}
}
}
func (r *Layer2Relayer) Close() {
r.commitSender.Close()
r.finalizeSender.Close()
r.gasOracleSender.Close()
}

View File

@@ -145,6 +145,24 @@ func NewSender(ctx context.Context, config *config.SenderConfig, priv *ecdsa.Pri
}
sender.metrics = initSenderMetrics(reg)
log.Info("Creating new sender",
"Service", service,
"Name", name,
"Endpoint", config.Endpoint,
"ChainID", chainID.Uint64(),
"TxType", config.TxType,
"MaxGasPrice", config.MaxGasPrice,
"MinBalance", config.MinBalance.Uint64(),
"EscalateBlocks", config.EscalateBlocks,
"Confirmations", config.Confirmations,
"EscalateMultiple", fmt.Sprintf("%d/%d", config.EscalateMultipleNum, config.EscalateMultipleDen),
"CheckPendingTimeSec", config.CheckPendingTime,
"CheckBalanceTimeSec", config.CheckBalanceTime,
"PendingLimit", config.PendingLimit,
"MaxCloseAttempts", config.MaxCloseAttempts,
"CloseAttemptIntervalMs", config.CloseAttemptIntervalMs,
)
go sender.loop(ctx)
return sender, nil
@@ -584,3 +602,33 @@ func (s *Sender) loop(ctx context.Context) {
}
}
}
func (s *Sender) Close() {
for attempt := 0; attempt < s.config.MaxCloseAttempts; attempt++ {
if s.pendingTxs.IsEmpty() {
break
}
log.Info("Attempting to close Sender: pending transactions unhandled.",
"sender", s.auth.From.String(),
"attempt", attempt+1,
"txCount", s.pendingTxs.Count(),
)
for item := range s.pendingTxs.IterBuffered() {
key, pending := item.Key, item.Val
log.Info("Handling pending transaction",
"keyInMap", key,
"from", s.auth.From.String(),
"hash", pending.tx.Hash().String(),
"nonce", pending.tx.Nonce(),
)
}
time.Sleep(time.Duration(s.config.CloseAttemptIntervalMs) * time.Millisecond)
}
if !s.pendingTxs.IsEmpty() {
log.Warn("Sender closing with unhandled pending transactions.",
"sender", s.auth.From.String(),
"unhandledTxCount", s.pendingTxs.Count(),
)
}
close(s.stopCh)
}