mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-10 06:28:04 -05:00
fix(sender): enforce single instance for each sender type (#1104)
This commit is contained in:
@@ -5,7 +5,7 @@ import (
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
var tag = "v4.3.60"
|
||||
var tag = "v4.3.61"
|
||||
|
||||
var commit = func() string {
|
||||
if info, ok := debug.ReadBuildInfo(); ok {
|
||||
|
||||
@@ -79,11 +79,11 @@ func action(ctx *cli.Context) error {
|
||||
|
||||
l1watcher := watcher.NewL1WatcherClient(ctx.Context, l1client, cfg.L1Config.StartHeight, cfg.L1Config.Confirmations, cfg.L1Config.L1MessageQueueAddress, cfg.L1Config.ScrollChainContractAddress, db, registry)
|
||||
|
||||
l1relayer, err := relayer.NewLayer1Relayer(ctx.Context, db, cfg.L1Config.RelayerConfig, registry)
|
||||
l1relayer, err := relayer.NewLayer1Relayer(ctx.Context, db, cfg.L1Config.RelayerConfig, relayer.ServiceTypeL1GasOracle, registry)
|
||||
if err != nil {
|
||||
log.Crit("failed to create new l1 relayer", "config file", cfgFile, "error", err)
|
||||
}
|
||||
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, false /* initGenesis */, registry)
|
||||
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, false /* initGenesis */, relayer.ServiceTypeL2GasOracle, registry)
|
||||
if err != nil {
|
||||
log.Crit("failed to create new l2 relayer", "config file", cfgFile, "error", err)
|
||||
}
|
||||
|
||||
@@ -73,7 +73,7 @@ func action(ctx *cli.Context) error {
|
||||
}
|
||||
|
||||
initGenesis := ctx.Bool(utils.ImportGenesisFlag.Name)
|
||||
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, initGenesis, registry)
|
||||
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, initGenesis, relayer.ServiceTypeL2RollupRelayer, registry)
|
||||
if err != nil {
|
||||
log.Crit("failed to create l2 relayer", "config file", cfgFile, "error", err)
|
||||
}
|
||||
|
||||
@@ -14,3 +14,17 @@ var (
|
||||
// ErrExecutionRevertedAlreadySuccessExecuted error of Message was already successfully executed
|
||||
ErrExecutionRevertedAlreadySuccessExecuted = errors.New("execution reverted: Message was already successfully executed")
|
||||
)
|
||||
|
||||
// ServiceType defines the various types of services within the relayer.
|
||||
type ServiceType int
|
||||
|
||||
const (
|
||||
// ServiceTypeUnknown indicates an unknown service type.
|
||||
ServiceTypeUnknown ServiceType = iota
|
||||
// ServiceTypeL2RollupRelayer indicates the service is a Layer 2 rollup relayer.
|
||||
ServiceTypeL2RollupRelayer
|
||||
// ServiceTypeL1GasOracle indicates the service is a Layer 1 gas oracle.
|
||||
ServiceTypeL1GasOracle
|
||||
// ServiceTypeL2GasOracle indicates the service is a Layer 2 gas oracle.
|
||||
ServiceTypeL2GasOracle
|
||||
)
|
||||
|
||||
@@ -42,16 +42,24 @@ type Layer1Relayer struct {
|
||||
}
|
||||
|
||||
// NewLayer1Relayer will return a new instance of Layer1RelayerClient
|
||||
func NewLayer1Relayer(ctx context.Context, db *gorm.DB, cfg *config.RelayerConfig, reg prometheus.Registerer) (*Layer1Relayer, error) {
|
||||
gasOracleSender, err := sender.NewSender(ctx, cfg.SenderConfig, cfg.GasOracleSenderPrivateKey, "l1_relayer", "gas_oracle_sender", types.SenderTypeL1GasOracle, db, reg)
|
||||
if err != nil {
|
||||
addr := crypto.PubkeyToAddress(cfg.GasOracleSenderPrivateKey.PublicKey)
|
||||
return nil, fmt.Errorf("new gas oracle sender failed for address %s, err: %v", addr.Hex(), err)
|
||||
}
|
||||
func NewLayer1Relayer(ctx context.Context, db *gorm.DB, cfg *config.RelayerConfig, serviceType ServiceType, reg prometheus.Registerer) (*Layer1Relayer, error) {
|
||||
var gasOracleSender *sender.Sender
|
||||
var err error
|
||||
|
||||
// Ensure test features aren't enabled on the mainnet.
|
||||
if gasOracleSender.GetChainID() == big.NewInt(1) && cfg.EnableTestEnvBypassFeatures {
|
||||
return nil, fmt.Errorf("cannot enable test env features in mainnet")
|
||||
switch serviceType {
|
||||
case ServiceTypeL1GasOracle:
|
||||
gasOracleSender, err = sender.NewSender(ctx, cfg.SenderConfig, cfg.GasOracleSenderPrivateKey, "l1_relayer", "gas_oracle_sender", types.SenderTypeL1GasOracle, db, reg)
|
||||
if err != nil {
|
||||
addr := crypto.PubkeyToAddress(cfg.GasOracleSenderPrivateKey.PublicKey)
|
||||
return nil, fmt.Errorf("new gas oracle sender failed for address %s, err: %v", addr.Hex(), err)
|
||||
}
|
||||
|
||||
// Ensure test features aren't enabled on the scroll mainnet.
|
||||
if gasOracleSender.GetChainID().Cmp(big.NewInt(534352)) == 0 && cfg.EnableTestEnvBypassFeatures {
|
||||
return nil, fmt.Errorf("cannot enable test env features in mainnet")
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid service type for l1_relayer: %v", serviceType)
|
||||
}
|
||||
|
||||
var minGasPrice uint64
|
||||
@@ -78,7 +86,13 @@ func NewLayer1Relayer(ctx context.Context, db *gorm.DB, cfg *config.RelayerConfi
|
||||
|
||||
l1Relayer.metrics = initL1RelayerMetrics(reg)
|
||||
|
||||
go l1Relayer.handleConfirmLoop(ctx)
|
||||
switch serviceType {
|
||||
case ServiceTypeL1GasOracle:
|
||||
go l1Relayer.handleL1GasOracleConfirmLoop(ctx)
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid service type for l1_relayer: %v", serviceType)
|
||||
}
|
||||
|
||||
return l1Relayer, nil
|
||||
}
|
||||
|
||||
@@ -158,7 +172,7 @@ func (r *Layer1Relayer) handleConfirmation(cfm *sender.Confirmation) {
|
||||
log.Info("Transaction confirmed in layer2", "confirmation", cfm)
|
||||
}
|
||||
|
||||
func (r *Layer1Relayer) handleConfirmLoop(ctx context.Context) {
|
||||
func (r *Layer1Relayer) handleL1GasOracleConfirmLoop(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@@ -35,7 +35,7 @@ func setupL1RelayerDB(t *testing.T) *gorm.DB {
|
||||
func testCreateNewL1Relayer(t *testing.T) {
|
||||
db := setupL1RelayerDB(t)
|
||||
defer database.CloseDB(db)
|
||||
relayer, err := NewLayer1Relayer(context.Background(), db, cfg.L2Config.RelayerConfig, nil)
|
||||
relayer, err := NewLayer1Relayer(context.Background(), db, cfg.L2Config.RelayerConfig, ServiceTypeL1GasOracle, nil)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, relayer)
|
||||
}
|
||||
@@ -56,7 +56,7 @@ func testL1RelayerGasOracleConfirm(t *testing.T) {
|
||||
l1Cfg := cfg.L1Config
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
l1Relayer, err := NewLayer1Relayer(ctx, db, l1Cfg.RelayerConfig, nil)
|
||||
l1Relayer, err := NewLayer1Relayer(ctx, db, l1Cfg.RelayerConfig, ServiceTypeL1GasOracle, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Simulate message confirmations.
|
||||
@@ -88,7 +88,7 @@ func testL1RelayerProcessGasPriceOracle(t *testing.T) {
|
||||
l1Cfg := cfg.L1Config
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
l1Relayer, err := NewLayer1Relayer(ctx, db, l1Cfg.RelayerConfig, nil)
|
||||
l1Relayer, err := NewLayer1Relayer(ctx, db, l1Cfg.RelayerConfig, ServiceTypeL1GasOracle, nil)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, l1Relayer)
|
||||
|
||||
|
||||
@@ -62,27 +62,43 @@ type Layer2Relayer struct {
|
||||
}
|
||||
|
||||
// NewLayer2Relayer will return a new instance of Layer2RelayerClient
|
||||
func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db *gorm.DB, cfg *config.RelayerConfig, initGenesis bool, reg prometheus.Registerer) (*Layer2Relayer, error) {
|
||||
commitSender, err := sender.NewSender(ctx, cfg.SenderConfig, cfg.CommitSenderPrivateKey, "l2_relayer", "commit_sender", types.SenderTypeCommitBatch, db, reg)
|
||||
if err != nil {
|
||||
addr := crypto.PubkeyToAddress(cfg.CommitSenderPrivateKey.PublicKey)
|
||||
return nil, fmt.Errorf("new commit sender failed for address %s, err: %w", addr.Hex(), err)
|
||||
}
|
||||
finalizeSender, err := sender.NewSender(ctx, cfg.SenderConfig, cfg.FinalizeSenderPrivateKey, "l2_relayer", "finalize_sender", types.SenderTypeFinalizeBatch, db, reg)
|
||||
if err != nil {
|
||||
addr := crypto.PubkeyToAddress(cfg.FinalizeSenderPrivateKey.PublicKey)
|
||||
return nil, fmt.Errorf("new finalize sender failed for address %s, err: %w", addr.Hex(), err)
|
||||
}
|
||||
func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db *gorm.DB, cfg *config.RelayerConfig, initGenesis bool, serviceType ServiceType, reg prometheus.Registerer) (*Layer2Relayer, error) {
|
||||
var gasOracleSender, commitSender, finalizeSender *sender.Sender
|
||||
var err error
|
||||
|
||||
gasOracleSender, err := sender.NewSender(ctx, cfg.SenderConfig, cfg.GasOracleSenderPrivateKey, "l2_relayer", "gas_oracle_sender", types.SenderTypeL2GasOracle, db, reg)
|
||||
if err != nil {
|
||||
addr := crypto.PubkeyToAddress(cfg.GasOracleSenderPrivateKey.PublicKey)
|
||||
return nil, fmt.Errorf("new gas oracle sender failed for address %s, err: %w", addr.Hex(), err)
|
||||
}
|
||||
switch serviceType {
|
||||
case ServiceTypeL2GasOracle:
|
||||
gasOracleSender, err = sender.NewSender(ctx, cfg.SenderConfig, cfg.GasOracleSenderPrivateKey, "l2_relayer", "gas_oracle_sender", types.SenderTypeL2GasOracle, db, reg)
|
||||
if err != nil {
|
||||
addr := crypto.PubkeyToAddress(cfg.GasOracleSenderPrivateKey.PublicKey)
|
||||
return nil, fmt.Errorf("new gas oracle sender failed for address %s, err: %w", addr.Hex(), err)
|
||||
}
|
||||
|
||||
// Ensure test features aren't enabled on the mainnet.
|
||||
if commitSender.GetChainID() == big.NewInt(1) && cfg.EnableTestEnvBypassFeatures {
|
||||
return nil, fmt.Errorf("cannot enable test env features in mainnet")
|
||||
// Ensure test features aren't enabled on the ethereum mainnet.
|
||||
if gasOracleSender.GetChainID().Cmp(big.NewInt(1)) == 0 && cfg.EnableTestEnvBypassFeatures {
|
||||
return nil, fmt.Errorf("cannot enable test env features in mainnet")
|
||||
}
|
||||
|
||||
case ServiceTypeL2RollupRelayer:
|
||||
commitSender, err = sender.NewSender(ctx, cfg.SenderConfig, cfg.CommitSenderPrivateKey, "l2_relayer", "commit_sender", types.SenderTypeCommitBatch, db, reg)
|
||||
if err != nil {
|
||||
addr := crypto.PubkeyToAddress(cfg.CommitSenderPrivateKey.PublicKey)
|
||||
return nil, fmt.Errorf("new commit sender failed for address %s, err: %w", addr.Hex(), err)
|
||||
}
|
||||
|
||||
finalizeSender, err = sender.NewSender(ctx, cfg.SenderConfig, cfg.FinalizeSenderPrivateKey, "l2_relayer", "finalize_sender", types.SenderTypeFinalizeBatch, db, reg)
|
||||
if err != nil {
|
||||
addr := crypto.PubkeyToAddress(cfg.FinalizeSenderPrivateKey.PublicKey)
|
||||
return nil, fmt.Errorf("new finalize sender failed for address %s, err: %w", addr.Hex(), err)
|
||||
}
|
||||
|
||||
// Ensure test features aren't enabled on the ethereum mainnet.
|
||||
if commitSender.GetChainID().Cmp(big.NewInt(1)) == 0 && cfg.EnableTestEnvBypassFeatures {
|
||||
return nil, fmt.Errorf("cannot enable test env features in mainnet")
|
||||
}
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid service type for l2_relayer: %v", serviceType)
|
||||
}
|
||||
|
||||
var minGasPrice uint64
|
||||
@@ -133,7 +149,15 @@ func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db *gorm.
|
||||
}
|
||||
layer2Relayer.metrics = initL2RelayerMetrics(reg)
|
||||
|
||||
go layer2Relayer.handleConfirmLoop(ctx)
|
||||
switch serviceType {
|
||||
case ServiceTypeL2GasOracle:
|
||||
go layer2Relayer.handleL2GasOracleConfirmLoop(ctx)
|
||||
case ServiceTypeL2RollupRelayer:
|
||||
go layer2Relayer.handleL2RollupRelayerConfirmLoop(ctx)
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid service type for l2_relayer: %v", serviceType)
|
||||
}
|
||||
|
||||
return layer2Relayer, nil
|
||||
}
|
||||
|
||||
@@ -673,7 +697,18 @@ func (r *Layer2Relayer) handleConfirmation(cfm *sender.Confirmation) {
|
||||
log.Info("Transaction confirmed in layer1", "confirmation", cfm)
|
||||
}
|
||||
|
||||
func (r *Layer2Relayer) handleConfirmLoop(ctx context.Context) {
|
||||
func (r *Layer2Relayer) handleL2GasOracleConfirmLoop(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case cfm := <-r.gasOracleSender.ConfirmChan():
|
||||
r.handleConfirmation(cfm)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Layer2Relayer) handleL2RollupRelayerConfirmLoop(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -682,8 +717,6 @@ func (r *Layer2Relayer) handleConfirmLoop(ctx context.Context) {
|
||||
r.handleConfirmation(cfm)
|
||||
case cfm := <-r.finalizeSender.ConfirmChan():
|
||||
r.handleConfirmation(cfm)
|
||||
case cfm := <-r.gasOracleSender.ConfirmChan():
|
||||
r.handleConfirmation(cfm)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ func setupL2RelayerDB(t *testing.T) *gorm.DB {
|
||||
func testCreateNewRelayer(t *testing.T) {
|
||||
db := setupL2RelayerDB(t)
|
||||
defer database.CloseDB(db)
|
||||
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig, false, nil)
|
||||
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig, false, ServiceTypeL2RollupRelayer, nil)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, relayer)
|
||||
}
|
||||
@@ -48,7 +48,7 @@ func testL2RelayerProcessPendingBatches(t *testing.T) {
|
||||
defer database.CloseDB(db)
|
||||
|
||||
l2Cfg := cfg.L2Config
|
||||
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, nil)
|
||||
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, ServiceTypeL2RollupRelayer, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
l2BlockOrm := orm.NewL2Block(db)
|
||||
@@ -82,7 +82,7 @@ func testL2RelayerProcessCommittedBatches(t *testing.T) {
|
||||
defer database.CloseDB(db)
|
||||
|
||||
l2Cfg := cfg.L2Config
|
||||
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, nil)
|
||||
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, ServiceTypeL2RollupRelayer, nil)
|
||||
assert.NoError(t, err)
|
||||
batchMeta := &types.BatchMeta{
|
||||
StartChunkIndex: 0,
|
||||
@@ -128,7 +128,7 @@ func testL2RelayerFinalizeTimeoutBatches(t *testing.T) {
|
||||
l2Cfg := cfg.L2Config
|
||||
l2Cfg.RelayerConfig.EnableTestEnvBypassFeatures = true
|
||||
l2Cfg.RelayerConfig.FinalizeBatchWithoutProofTimeoutSec = 0
|
||||
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, nil)
|
||||
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig, false, ServiceTypeL2RollupRelayer, nil)
|
||||
assert.NoError(t, err)
|
||||
batchMeta := &types.BatchMeta{
|
||||
StartChunkIndex: 0,
|
||||
@@ -160,7 +160,7 @@ func testL2RelayerCommitConfirm(t *testing.T) {
|
||||
l2Cfg := cfg.L2Config
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, nil)
|
||||
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, ServiceTypeL2RollupRelayer, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Simulate message confirmations.
|
||||
@@ -214,7 +214,7 @@ func testL2RelayerFinalizeConfirm(t *testing.T) {
|
||||
l2Cfg := cfg.L2Config
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, nil)
|
||||
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, ServiceTypeL2RollupRelayer, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Simulate message confirmations.
|
||||
@@ -287,7 +287,7 @@ func testL2RelayerGasOracleConfirm(t *testing.T) {
|
||||
l2Cfg := cfg.L2Config
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, nil)
|
||||
l2Relayer, err := NewLayer2Relayer(ctx, l2Cli, db, l2Cfg.RelayerConfig, false, ServiceTypeL2GasOracle, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Simulate message confirmations.
|
||||
@@ -326,7 +326,7 @@ func testLayer2RelayerProcessGasPriceOracle(t *testing.T) {
|
||||
db := setupL2RelayerDB(t)
|
||||
defer database.CloseDB(db)
|
||||
|
||||
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig, false, nil)
|
||||
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig, false, ServiceTypeL2GasOracle, nil)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, relayer)
|
||||
|
||||
@@ -439,7 +439,7 @@ func testGetBatchStatusByIndex(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
|
||||
cfg.L2Config.RelayerConfig.ChainMonitor.Enabled = true
|
||||
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig, false, nil)
|
||||
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig, false, ServiceTypeL2RollupRelayer, nil)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, relayer)
|
||||
|
||||
|
||||
@@ -298,7 +298,7 @@ func (s *Sender) resetNonce(ctx context.Context) {
|
||||
s.auth.Nonce = big.NewInt(int64(nonce))
|
||||
}
|
||||
|
||||
func (s *Sender) resubmitTransaction(auth *bind.TransactOpts, tx *gethTypes.Transaction, baseFee uint64) (*gethTypes.Transaction, error) {
|
||||
func (s *Sender) resubmitTransaction(tx *gethTypes.Transaction, baseFee uint64) (*gethTypes.Transaction, error) {
|
||||
escalateMultipleNum := new(big.Int).SetUint64(s.config.EscalateMultipleNum)
|
||||
escalateMultipleDen := new(big.Int).SetUint64(s.config.EscalateMultipleDen)
|
||||
maxGasPrice := new(big.Int).SetUint64(s.config.MaxGasPrice)
|
||||
@@ -306,7 +306,7 @@ func (s *Sender) resubmitTransaction(auth *bind.TransactOpts, tx *gethTypes.Tran
|
||||
txInfo := map[string]interface{}{
|
||||
"tx_hash": tx.Hash().String(),
|
||||
"tx_type": s.config.TxType,
|
||||
"from": auth.From.String(),
|
||||
"from": s.auth.From.String(),
|
||||
"nonce": tx.Nonce(),
|
||||
}
|
||||
|
||||
@@ -375,7 +375,7 @@ func (s *Sender) resubmitTransaction(auth *bind.TransactOpts, tx *gethTypes.Tran
|
||||
txInfo["adjusted_gas_fee_cap"] = gasFeeCap.Uint64()
|
||||
}
|
||||
|
||||
log.Info("Transaction gas adjustment details", "txInfo", txInfo)
|
||||
log.Info("Transaction gas adjustment details", "service", s.service, "name", s.name, "txInfo", txInfo)
|
||||
|
||||
nonce := tx.Nonce()
|
||||
s.metrics.resubmitTransactionTotal.WithLabelValues(s.service, s.name).Inc()
|
||||
@@ -434,7 +434,8 @@ func (s *Sender) checkPendingTransaction() {
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Error("db transaction failed", "err", err)
|
||||
log.Error("db transaction failed after receiving confirmation", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// send confirm message
|
||||
@@ -456,10 +457,12 @@ func (s *Sender) checkPendingTransaction() {
|
||||
}
|
||||
if status == types.TxStatusConfirmedFailed {
|
||||
log.Warn("transaction already marked as failed, skipping resubmission", "hash", tx.Hash().String())
|
||||
return
|
||||
continue
|
||||
}
|
||||
|
||||
log.Info("resubmit transaction",
|
||||
"service", s.service,
|
||||
"name", s.name,
|
||||
"hash", tx.Hash().String(),
|
||||
"from", s.auth.From.String(),
|
||||
"nonce", tx.Nonce(),
|
||||
@@ -467,7 +470,7 @@ func (s *Sender) checkPendingTransaction() {
|
||||
"currentBlockNumber", blockNumber,
|
||||
"escalateBlocks", s.config.EscalateBlocks)
|
||||
|
||||
if newTx, err := s.resubmitTransaction(s.auth, tx, baseFee); err != nil {
|
||||
if newTx, err := s.resubmitTransaction(tx, baseFee); err != nil {
|
||||
s.metrics.resubmitTransactionFailedTotal.WithLabelValues(s.service, s.name).Inc()
|
||||
log.Error("failed to resubmit transaction", "context ID", txnToCheck.ContextID, "sender meta", s.getSenderMeta(), "from", s.auth.From.String(), "nonce", newTx.Nonce(), "err", err)
|
||||
} else {
|
||||
@@ -477,13 +480,14 @@ func (s *Sender) checkPendingTransaction() {
|
||||
return fmt.Errorf("failed to update status of transaction with hash %s to TxStatusReplaced, err: %w", tx.Hash().String(), err)
|
||||
}
|
||||
// Record the new transaction that has replaced the original one.
|
||||
if err := s.pendingTransactionOrm.InsertPendingTransaction(s.ctx, txnToCheck.ContextID, s.getSenderMeta(), newTx, txnToCheck.SubmitBlockNumber, dbTX); err != nil {
|
||||
return fmt.Errorf("failed to insert new pending transaction with context ID: %s, nonce: %d, hash: %v, err: %w", txnToCheck.ContextID, newTx.Nonce(), newTx.Hash().String(), err)
|
||||
if err := s.pendingTransactionOrm.InsertPendingTransaction(s.ctx, txnToCheck.ContextID, s.getSenderMeta(), newTx, blockNumber, dbTX); err != nil {
|
||||
return fmt.Errorf("failed to insert new pending transaction with context ID: %s, nonce: %d, hash: %v, previous block number: %v, current block number: %v, err: %w", txnToCheck.ContextID, newTx.Nonce(), newTx.Hash().String(), txnToCheck.SubmitBlockNumber, blockNumber, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Error("db transaction failed", "err", err)
|
||||
log.Error("db transaction failed after resubmitting", "err", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -106,6 +106,7 @@ func TestSender(t *testing.T) {
|
||||
t.Run("test check pending transaction tx confirmed", testCheckPendingTransactionTxConfirmed)
|
||||
t.Run("test check pending transaction resubmit tx confirmed", testCheckPendingTransactionResubmitTxConfirmed)
|
||||
t.Run("test check pending transaction replaced tx confirmed", testCheckPendingTransactionReplacedTxConfirmed)
|
||||
t.Run("test check pending transaction multiple times with only one transaction pending", testCheckPendingTransactionTxMultipleTimesWithOnlyOneTxPending)
|
||||
}
|
||||
|
||||
func testNewSender(t *testing.T) {
|
||||
@@ -219,7 +220,7 @@ func testResubmitZeroGasPriceTransaction(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, tx)
|
||||
// Increase at least 1 wei in gas price, gas tip cap and gas fee cap.
|
||||
_, err = s.resubmitTransaction(s.auth, tx, 0)
|
||||
_, err = s.resubmitTransaction(tx, 0)
|
||||
assert.NoError(t, err)
|
||||
s.Stop()
|
||||
}
|
||||
@@ -278,7 +279,7 @@ func testResubmitNonZeroGasPriceTransaction(t *testing.T) {
|
||||
tx, err := s.createAndSendTx(feeData, &common.Address{}, big.NewInt(0), nil, nil)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, tx)
|
||||
_, err = s.resubmitTransaction(s.auth, tx, 0)
|
||||
_, err = s.resubmitTransaction(tx, 0)
|
||||
assert.NoError(t, err)
|
||||
s.Stop()
|
||||
}
|
||||
@@ -306,7 +307,7 @@ func testResubmitUnderpricedTransaction(t *testing.T) {
|
||||
tx, err := s.createAndSendTx(feeData, &common.Address{}, big.NewInt(0), nil, nil)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, tx)
|
||||
_, err = s.resubmitTransaction(s.auth, tx, 0)
|
||||
_, err = s.resubmitTransaction(tx, 0)
|
||||
assert.Error(t, err, "replacement transaction underpriced")
|
||||
s.Stop()
|
||||
}
|
||||
@@ -328,7 +329,7 @@ func testResubmitTransactionWithRisingBaseFee(t *testing.T) {
|
||||
// bump the basefee by 10x
|
||||
baseFeePerGas *= 10
|
||||
// resubmit and check that the gas fee has been adjusted accordingly
|
||||
newTx, err := s.resubmitTransaction(s.auth, tx, baseFeePerGas)
|
||||
newTx, err := s.resubmitTransaction(tx, baseFeePerGas)
|
||||
assert.NoError(t, err)
|
||||
|
||||
escalateMultipleNum := new(big.Int).SetUint64(s.config.EscalateMultipleNum)
|
||||
@@ -504,3 +505,46 @@ func testCheckPendingTransactionReplacedTxConfirmed(t *testing.T) {
|
||||
patchGuard.Reset()
|
||||
}
|
||||
}
|
||||
|
||||
func testCheckPendingTransactionTxMultipleTimesWithOnlyOneTxPending(t *testing.T) {
|
||||
for _, txType := range txTypes {
|
||||
sqlDB, err := db.DB()
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, migrate.ResetDB(sqlDB))
|
||||
|
||||
cfgCopy := *cfg.L1Config.RelayerConfig.SenderConfig
|
||||
cfgCopy.TxType = txType
|
||||
cfgCopy.EscalateBlocks = 0
|
||||
s, err := NewSender(context.Background(), &cfgCopy, privateKey, "test", "test", types.SenderTypeCommitBatch, db, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = s.SendTransaction("test", &common.Address{}, big.NewInt(0), nil, 0)
|
||||
assert.NoError(t, err)
|
||||
|
||||
txs, err := s.pendingTransactionOrm.GetPendingOrReplacedTransactionsBySenderType(context.Background(), s.senderType, 1)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, txs, 1)
|
||||
assert.Equal(t, types.TxStatusPending, txs[0].Status)
|
||||
assert.Equal(t, types.SenderTypeCommitBatch, txs[0].SenderType)
|
||||
|
||||
patchGuard := gomonkey.ApplyMethodFunc(s.client, "TransactionReceipt", func(_ context.Context, hash common.Hash) (*gethTypes.Receipt, error) {
|
||||
return nil, fmt.Errorf("simulated transaction receipt error")
|
||||
})
|
||||
|
||||
for i := 1; i <= 6; i++ {
|
||||
s.checkPendingTransaction()
|
||||
assert.NoError(t, err)
|
||||
|
||||
txs, err = s.pendingTransactionOrm.GetPendingOrReplacedTransactionsBySenderType(context.Background(), s.senderType, 100)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, txs, i+1)
|
||||
for j := 0; j < i; j++ {
|
||||
assert.Equal(t, types.TxStatusReplaced, txs[j].Status)
|
||||
}
|
||||
assert.Equal(t, types.TxStatusPending, txs[i].Status)
|
||||
}
|
||||
|
||||
s.Stop()
|
||||
patchGuard.Reset()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ func testImportL1GasPrice(t *testing.T) {
|
||||
l1Cfg := rollupApp.Config.L1Config
|
||||
|
||||
// Create L1Relayer
|
||||
l1Relayer, err := relayer.NewLayer1Relayer(context.Background(), db, l1Cfg.RelayerConfig, nil)
|
||||
l1Relayer, err := relayer.NewLayer1Relayer(context.Background(), db, l1Cfg.RelayerConfig, relayer.ServiceTypeL1GasOracle, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Create L1Watcher
|
||||
@@ -67,7 +67,7 @@ func testImportL2GasPrice(t *testing.T) {
|
||||
prepareContracts(t)
|
||||
|
||||
l2Cfg := rollupApp.Config.L2Config
|
||||
l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig, false, nil)
|
||||
l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig, false, relayer.ServiceTypeL2GasOracle, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// add fake chunk
|
||||
|
||||
@@ -27,7 +27,7 @@ func testCommitAndFinalizeGenesisBatch(t *testing.T) {
|
||||
prepareContracts(t)
|
||||
|
||||
l2Cfg := rollupApp.Config.L2Config
|
||||
l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig, true, nil)
|
||||
l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig, true, relayer.ServiceTypeL2RollupRelayer, nil)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, l2Relayer)
|
||||
|
||||
@@ -56,7 +56,7 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {
|
||||
|
||||
// Create L2Relayer
|
||||
l2Cfg := rollupApp.Config.L2Config
|
||||
l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig, false, nil)
|
||||
l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig, false, relayer.ServiceTypeL2RollupRelayer, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Create L1Watcher
|
||||
|
||||
Reference in New Issue
Block a user