mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-04-23 03:00:50 -04:00
Add Handle confirm process.
This commit is contained in:
@@ -13,6 +13,8 @@ import (
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"modernc.org/mathutil"
|
||||
|
||||
"scroll-tech/common/utils"
|
||||
|
||||
"scroll-tech/database/orm"
|
||||
|
||||
bridge_abi "scroll-tech/bridge/abi"
|
||||
@@ -65,14 +67,48 @@ func NewLayer1Relayer(ctx context.Context, db orm.L1MessageOrm, cfg *config.Rela
|
||||
confirmationCh: sender.ConfirmChan(),
|
||||
}
|
||||
|
||||
if err = layer1.checkSubmittedMessages(); err != nil {
|
||||
log.Error("failed to init layer1 submitted tx", "err", err)
|
||||
// Deal with broken transactions.
|
||||
if err = layer1.prepare(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return layer1, nil
|
||||
}
|
||||
|
||||
// prepare to run check logic and until it's finished.
|
||||
func (r *Layer1Relayer) prepare(ctx context.Context) error {
|
||||
go func(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case cfm := <-r.confirmationCh:
|
||||
if !cfm.IsSuccessful {
|
||||
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
|
||||
} else {
|
||||
// @todo handle db error
|
||||
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, orm.MsgConfirmed, cfm.TxHash.String())
|
||||
if err != nil {
|
||||
log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err)
|
||||
}
|
||||
log.Info("transaction confirmed in layer2", "confirmation", cfm)
|
||||
}
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
|
||||
if err := r.checkSubmittedMessages(); err != nil {
|
||||
log.Error("failed to init layer1 submitted tx", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Wait forever util sender is empty.
|
||||
utils.TryTimes(-1, func() bool {
|
||||
return r.sender.PendingCount() == 0
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Layer1Relayer) checkSubmittedMessages() error {
|
||||
var blockNumber uint64
|
||||
BEGIN:
|
||||
|
||||
@@ -9,6 +9,8 @@ import (
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
|
||||
"scroll-tech/common/utils"
|
||||
|
||||
"scroll-tech/database"
|
||||
"scroll-tech/database/orm"
|
||||
|
||||
@@ -78,24 +80,51 @@ func NewLayer2Relayer(ctx context.Context, db database.OrmFactory, cfg *config.R
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
if err = layer2.checkSubmittedMessages(); err != nil {
|
||||
log.Error("failed to init layer2 submitted tx", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = layer2.checkCommittingBatches(); err != nil {
|
||||
log.Error("failed to init layer2 committed tx", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = layer2.checkFinalizingBatches(); err != nil {
|
||||
log.Error("failed to init layer2 finalized tx", "err", err)
|
||||
// Deal with broken transactions.
|
||||
if err = layer2.prepare(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return layer2, nil
|
||||
}
|
||||
|
||||
// prepare to run check logic and until it's finished.
|
||||
func (r *Layer2Relayer) prepare(ctx context.Context) error {
|
||||
go func(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case confirmation := <-r.messageCh:
|
||||
r.handleConfirmation(confirmation)
|
||||
case confirmation := <-r.rollupCh:
|
||||
r.handleConfirmation(confirmation)
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
|
||||
if err := r.checkSubmittedMessages(); err != nil {
|
||||
log.Error("failed to init layer2 submitted tx", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err := r.checkCommittingBatches(); err != nil {
|
||||
log.Error("failed to init layer2 committed tx", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err := r.checkFinalizingBatches(); err != nil {
|
||||
log.Error("failed to init layer2 finalized tx", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Wait forever until message sender and roller sender are empty.
|
||||
utils.TryTimes(-1, func() bool {
|
||||
return r.messageSender.PendingCount() == 0 && r.rollupSender.PendingCount() == 0
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start the relayer process
|
||||
func (r *Layer2Relayer) Start() {
|
||||
loop := func(ctx context.Context, f func()) {
|
||||
|
||||
@@ -150,6 +150,11 @@ func NewSender(ctx context.Context, config *config.SenderConfig, privs []*ecdsa.
|
||||
return sender, nil
|
||||
}
|
||||
|
||||
// PendingCount return the current pending txs num.
|
||||
func (s *Sender) PendingCount() int64 {
|
||||
return atomic.LoadInt64(&s.pendingNum)
|
||||
}
|
||||
|
||||
// PendingLimit return the maximum pendingTxs can handle.
|
||||
func (s *Sender) PendingLimit() int64 {
|
||||
return s.config.PendingLimit
|
||||
|
||||
@@ -4,7 +4,7 @@ import "time"
|
||||
|
||||
// TryTimes try run several times until the function return true.
|
||||
func TryTimes(times int, run func() bool) {
|
||||
for i := 0; i < times; i++ {
|
||||
for i := 0; times == -1 || i < times; i++ {
|
||||
if run() {
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user