mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-13 16:08:04 -05:00
Compare commits
18 Commits
prealpha-v
...
goerli
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
97dded9619 | ||
|
|
f48762bf33 | ||
|
|
42190feb6c | ||
|
|
56080204c5 | ||
|
|
2674dfaf69 | ||
|
|
5bffb151a3 | ||
|
|
fadaec7add | ||
|
|
16576b6f53 | ||
|
|
aa885f068f | ||
|
|
1f764a579d | ||
|
|
91ee767669 | ||
|
|
7eac41691e | ||
|
|
d9516890b0 | ||
|
|
ddb96bb732 | ||
|
|
e419dd8d5c | ||
|
|
c99c65bdfd | ||
|
|
18fd7f56a8 | ||
|
|
a319dc1cff |
@@ -72,15 +72,20 @@ func NewLayer1Relayer(ctx context.Context, ethClient *ethclient.Client, l1Confir
|
||||
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
|
||||
func (r *Layer1Relayer) ProcessSavedEvents() {
|
||||
// msgs are sorted by nonce in increasing order
|
||||
msgs, err := r.db.GetL1MessagesByStatus(orm.MsgPending)
|
||||
msgs, err := r.db.GetL1MessagesByStatus(orm.MsgPending, 100)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch unprocessed L1 messages", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(msgs) > 0 {
|
||||
log.Info("Processing L1 messages", "count", len(msgs))
|
||||
}
|
||||
|
||||
for _, msg := range msgs {
|
||||
if err = r.processSavedEvent(msg); err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("failed to process event", "err", err)
|
||||
log.Error("failed to process event", "msg.msgHash", msg.MsgHash, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -108,7 +113,13 @@ func (r *Layer1Relayer) processSavedEvent(msg *orm.L1Message) error {
|
||||
return err
|
||||
}
|
||||
|
||||
hash, err := r.sender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
|
||||
hash, err := r.sender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data, 0)
|
||||
if err != nil && err.Error() == "execution reverted: Message expired" {
|
||||
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, orm.MsgExpired)
|
||||
}
|
||||
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
|
||||
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, orm.MsgConfirmed)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -123,6 +134,8 @@ func (r *Layer1Relayer) processSavedEvent(msg *orm.L1Message) error {
|
||||
|
||||
// Start the relayer process
|
||||
func (r *Layer1Relayer) Start() {
|
||||
log.Info("Starting l1/relayer")
|
||||
|
||||
go func() {
|
||||
// trigger by timer
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
|
||||
@@ -81,6 +81,8 @@ func NewWatcher(ctx context.Context, client *ethclient.Client, startHeight uint6
|
||||
|
||||
// Start the Watcher module.
|
||||
func (w *Watcher) Start() {
|
||||
log.Info("Starting l1/watcher")
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
@@ -40,8 +40,7 @@ func newBatchProposer(cfg *config.BatchProposerConfig, orm database.OrmFactory)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *batchProposer) tryProposeBatch(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
func (w *batchProposer) tryProposeBatch() {
|
||||
w.mutex.Lock()
|
||||
defer w.mutex.Unlock()
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"math/big"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/core/types"
|
||||
@@ -50,10 +49,7 @@ func testBatchProposer(t *testing.T) {
|
||||
BatchTimeSec: 1,
|
||||
BatchBlocksLimit: 100,
|
||||
}, db)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
proposer.tryProposeBatch(&wg)
|
||||
wg.Wait()
|
||||
proposer.tryProposeBatch()
|
||||
|
||||
infos, err := db.GetUnbatchedBlocks(map[string]interface{}{},
|
||||
fmt.Sprintf("order by number ASC LIMIT %d", 100))
|
||||
|
||||
@@ -107,7 +107,8 @@ func (r *Layer2Relayer) ProcessSavedEvents(wg *sync.WaitGroup) {
|
||||
// msgs are sorted by nonce in increasing order
|
||||
msgs, err := r.db.GetL2Messages(
|
||||
map[string]interface{}{"status": orm.MsgPending},
|
||||
fmt.Sprintf("AND height<=%d ORDER BY nonce ASC LIMIT %d", batch.EndBlockNumber, processMsgLimit),
|
||||
fmt.Sprintf("AND height<=%d", batch.EndBlockNumber),
|
||||
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -165,7 +166,13 @@ func (r *Layer2Relayer) processSavedEvent(msg *orm.L2Message, index uint64) erro
|
||||
return err
|
||||
}
|
||||
|
||||
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
|
||||
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data, 0)
|
||||
if err != nil && err.Error() == "execution reverted: Message expired" {
|
||||
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgExpired)
|
||||
}
|
||||
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
|
||||
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgConfirmed)
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send relayMessageWithProof tx to layer1 ", "msg.height", msg.Height, "msg.MsgHash", msg.MsgHash, "err", err)
|
||||
@@ -262,7 +269,26 @@ func (r *Layer2Relayer) ProcessPendingBatches(wg *sync.WaitGroup) {
|
||||
|
||||
txID := id + "-commit"
|
||||
// add suffix `-commit` to avoid duplication with finalize tx in unit tests
|
||||
hash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
|
||||
hash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data, 0)
|
||||
|
||||
if err != nil && err.Error() == "execution reverted: Parent batch hasn't been committed" {
|
||||
|
||||
// check parent is committing
|
||||
batches, err = r.db.GetBlockBatches(map[string]interface{}{"end_block_hash": batch.ParentHash})
|
||||
if err != nil || len(batches) == 0 {
|
||||
log.Error("Failed to get parent batch from db", "batch_id", id, "parent_hash", batch.ParentHash, "err", err)
|
||||
return
|
||||
}
|
||||
parentBatch := batches[0]
|
||||
|
||||
if parentBatch.RollupStatus >= orm.RollupCommitting {
|
||||
// retry with manual gas estimation
|
||||
gasLimit := estimateCommitBatchGas(len(data), len(layer2Batch.Blocks))
|
||||
hash, err = r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data, gasLimit)
|
||||
log.Info("commitBatch tx resent with manual gas estimation ", "id", id, "index", batch.Index, "gasLimit", gasLimit, "hash", hash.String(), "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send commitBatch tx to layer1 ", "id", id, "index", batch.Index, "err", err)
|
||||
@@ -284,9 +310,11 @@ func (r *Layer2Relayer) ProcessCommittedBatches(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
// set skipped batches in a single db operation
|
||||
if err := r.db.UpdateSkippedBatches(); err != nil {
|
||||
if count, err := r.db.UpdateSkippedBatches(); err != nil {
|
||||
log.Error("UpdateSkippedBatches failed", "err", err)
|
||||
// continue anyway
|
||||
} else if count > 0 {
|
||||
log.Info("Skipping batches", "count", count)
|
||||
}
|
||||
|
||||
// batches are sorted by batch index in increasing order
|
||||
@@ -366,7 +394,7 @@ func (r *Layer2Relayer) ProcessCommittedBatches(wg *sync.WaitGroup) {
|
||||
|
||||
txID := id + "-finalize"
|
||||
// add suffix `-finalize` to avoid duplication with commit tx in unit tests
|
||||
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
|
||||
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data, 0)
|
||||
hash := &txHash
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
@@ -393,6 +421,8 @@ func (r *Layer2Relayer) ProcessCommittedBatches(wg *sync.WaitGroup) {
|
||||
|
||||
// Start the relayer process
|
||||
func (r *Layer2Relayer) Start() {
|
||||
log.Info("Starting l2/relayer")
|
||||
|
||||
go func() {
|
||||
// trigger by timer
|
||||
ticker := time.NewTicker(time.Second)
|
||||
@@ -464,3 +494,12 @@ func (r *Layer2Relayer) handleConfirmation(confirmation *sender.Confirmation) {
|
||||
}
|
||||
log.Info("transaction confirmed in layer1", "type", transactionType, "confirmation", confirmation)
|
||||
}
|
||||
|
||||
func estimateCommitBatchGas(callDataLength int, numBlocks int) uint64 {
|
||||
gasLimit := uint64(0)
|
||||
gasLimit += 16 * uint64(callDataLength) // calldata cost
|
||||
gasLimit += 4*2100 + 3*22100 // fixed cost per batch
|
||||
gasLimit += 4 * 22100 * uint64(numBlocks) // cost per block in batch
|
||||
gasLimit = gasLimit * 12 / 10 // apply multiplier
|
||||
return gasLimit
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"math/big"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
geth "github.com/scroll-tech/go-ethereum"
|
||||
@@ -77,41 +76,91 @@ func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmat
|
||||
|
||||
// Start the Listening process
|
||||
func (w *WatcherClient) Start() {
|
||||
log.Info("Starting l2/watcher")
|
||||
|
||||
go func() {
|
||||
if reflect.ValueOf(w.orm).IsNil() {
|
||||
panic("must run L2 watcher with DB")
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
ctx, cancel := context.WithCancel(w.ctx)
|
||||
|
||||
for ; true; <-ticker.C {
|
||||
select {
|
||||
case <-w.stopCh:
|
||||
return
|
||||
// trace fetcher loop
|
||||
go func(ctx context.Context) {
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
default:
|
||||
// get current height
|
||||
number, err := w.BlockNumber(w.ctx)
|
||||
if err != nil {
|
||||
log.Error("failed to get_BlockNumber", "err", err)
|
||||
continue
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
// get current height
|
||||
number, err := w.BlockNumber(ctx)
|
||||
if err != nil {
|
||||
log.Error("failed to get_BlockNumber", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if number >= w.confirmations {
|
||||
number = number - w.confirmations
|
||||
} else {
|
||||
number = 0
|
||||
}
|
||||
|
||||
w.tryFetchRunningMissingBlocks(ctx, number)
|
||||
}
|
||||
|
||||
if number >= w.confirmations {
|
||||
number = number - w.confirmations
|
||||
} else {
|
||||
number = 0
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(3)
|
||||
go w.tryFetchRunningMissingBlocks(w.ctx, &wg, number)
|
||||
go w.FetchContractEvent(&wg, number)
|
||||
go w.batchProposer.tryProposeBatch(&wg)
|
||||
wg.Wait()
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
|
||||
// event fetcher loop
|
||||
go func(ctx context.Context) {
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
// get current height
|
||||
number, err := w.BlockNumber(ctx)
|
||||
if err != nil {
|
||||
log.Error("failed to get_BlockNumber", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if number >= w.confirmations {
|
||||
number = number - w.confirmations
|
||||
} else {
|
||||
number = 0
|
||||
}
|
||||
|
||||
w.FetchContractEvent(number)
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
|
||||
// batch proposer loop
|
||||
go func(ctx context.Context) {
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
w.batchProposer.tryProposeBatch()
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
|
||||
<-w.stopCh
|
||||
cancel()
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -123,8 +172,7 @@ func (w *WatcherClient) Stop() {
|
||||
const blockTracesFetchLimit = uint64(10)
|
||||
|
||||
// try fetch missing blocks if inconsistent
|
||||
func (w *WatcherClient) tryFetchRunningMissingBlocks(ctx context.Context, wg *sync.WaitGroup, blockHeight uint64) {
|
||||
defer wg.Done()
|
||||
func (w *WatcherClient) tryFetchRunningMissingBlocks(ctx context.Context, blockHeight uint64) {
|
||||
// Get newest block in DB. must have blocks at that time.
|
||||
// Don't use "block_trace" table "trace" column's BlockTrace.Number,
|
||||
// because it might be empty if the corresponding rollup_result is finalized/finalization_skipped
|
||||
@@ -181,8 +229,7 @@ func (w *WatcherClient) getAndStoreBlockTraces(ctx context.Context, from, to uin
|
||||
const contractEventsBlocksFetchLimit = int64(10)
|
||||
|
||||
// FetchContractEvent pull latest event logs from given contract address and save in DB
|
||||
func (w *WatcherClient) FetchContractEvent(wg *sync.WaitGroup, blockHeight uint64) {
|
||||
defer wg.Done()
|
||||
func (w *WatcherClient) FetchContractEvent(blockHeight uint64) {
|
||||
defer func() {
|
||||
log.Info("l2 watcher fetchContractEvent", "w.processedMsgHeight", w.processedMsgHeight)
|
||||
}()
|
||||
|
||||
@@ -44,7 +44,7 @@ func testCreateNewWatcherAndStop(t *testing.T) {
|
||||
numTransactions := 3
|
||||
toAddress := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
|
||||
for i := 0; i < numTransactions; i++ {
|
||||
_, err = newSender.SendTransaction(strconv.Itoa(1000+i), &toAddress, big.NewInt(1000000000), nil)
|
||||
_, err = newSender.SendTransaction(strconv.Itoa(1000+i), &toAddress, big.NewInt(1000000000), nil, 0)
|
||||
assert.NoError(t, err)
|
||||
<-newSender.ConfirmChan()
|
||||
}
|
||||
|
||||
@@ -154,18 +154,21 @@ func (s *Sender) NumberOfAccounts() int {
|
||||
return len(s.auths.accounts)
|
||||
}
|
||||
|
||||
func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, value *big.Int, data []byte) (*FeeData, error) {
|
||||
// estimate gas limit
|
||||
gasLimit, err := s.client.EstimateGas(s.ctx, geth.CallMsg{From: auth.From, To: target, Value: value, Data: data})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, value *big.Int, data []byte, gasLimit uint64) (*FeeData, error) {
|
||||
if gasLimit == 0 {
|
||||
// estimate gas limit
|
||||
var err error
|
||||
gasLimit, err = s.client.EstimateGas(s.ctx, geth.CallMsg{From: auth.From, To: target, Value: value, Data: data})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
gasLimit = gasLimit * 15 / 10 // 50% extra gas to void out of gas error
|
||||
}
|
||||
gasLimit = gasLimit * 15 / 10 // 50% extra gas to void out of gas error
|
||||
|
||||
// @todo change it when Scroll enable EIP1559
|
||||
if s.config.TxType != DynamicFeeTxType {
|
||||
// estimate gas price
|
||||
var gasPrice *big.Int
|
||||
gasPrice, err = s.client.SuggestGasPrice(s.ctx)
|
||||
gasPrice, err := s.client.SuggestGasPrice(s.ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -189,7 +192,7 @@ func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, val
|
||||
}
|
||||
|
||||
// SendTransaction send a signed L2tL1 transaction.
|
||||
func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.Int, data []byte) (hash common.Hash, err error) {
|
||||
func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.Int, data []byte, gasLimit uint64) (hash common.Hash, err error) {
|
||||
// We occupy the ID, in case some other threads call with the same ID in the same time
|
||||
if _, loaded := s.pendingTxs.LoadOrStore(ID, nil); loaded {
|
||||
return common.Hash{}, fmt.Errorf("has the repeat tx ID, ID: %s", ID)
|
||||
@@ -213,9 +216,10 @@ func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.I
|
||||
tx *types.Transaction
|
||||
)
|
||||
// estimate gas fee
|
||||
if feeData, err = s.getFeeData(auth, target, value, data); err != nil {
|
||||
if feeData, err = s.getFeeData(auth, target, value, data, gasLimit); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if tx, err = s.createAndSendTx(auth, feeData, target, value, data, nil); err == nil {
|
||||
// add pending transaction to queue
|
||||
pending := &PendingTransaction{
|
||||
|
||||
@@ -87,7 +87,7 @@ func testBatchSender(t *testing.T, batchSize int) {
|
||||
for i := 0; i < TXBatch; i++ {
|
||||
toAddr := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
|
||||
id := strconv.Itoa(i + index*1000)
|
||||
_, err := newSender.SendTransaction(id, &toAddr, big.NewInt(1), nil)
|
||||
_, err := newSender.SendTransaction(id, &toAddr, big.NewInt(1), nil, 0)
|
||||
if errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
<-time.After(time.Second)
|
||||
continue
|
||||
|
||||
@@ -26,7 +26,7 @@ func testRelayL2MessageSucceed(t *testing.T) {
|
||||
defer db.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(4)
|
||||
wg.Add(3)
|
||||
|
||||
prepareContracts(t)
|
||||
|
||||
@@ -55,7 +55,7 @@ func testRelayL2MessageSucceed(t *testing.T) {
|
||||
}
|
||||
|
||||
// l2 watch process events
|
||||
l2Watcher.FetchContractEvent(&wg, sendReceipt.BlockNumber.Uint64())
|
||||
l2Watcher.FetchContractEvent(sendReceipt.BlockNumber.Uint64())
|
||||
|
||||
// check db status
|
||||
msg, err := db.GetL2MessageByNonce(nonce.Uint64())
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
var tag = "prealpha-v11.6"
|
||||
var tag = "prealpha-v11.14"
|
||||
|
||||
var commit = func() string {
|
||||
if info, ok := debug.ReadBuildInfo(); ok {
|
||||
|
||||
@@ -330,6 +330,10 @@ func (m *Manager) CollectProofs(sess *session) {
|
||||
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
|
||||
m.mu.Lock()
|
||||
defer func() {
|
||||
// TODO: remove the clean-up, rollers report healthy status.
|
||||
for pk := range sess.info.Rollers {
|
||||
m.freeTaskIDForRoller(pk, sess.info.ID)
|
||||
}
|
||||
delete(m.sessions, sess.info.ID)
|
||||
m.mu.Unlock()
|
||||
}()
|
||||
|
||||
@@ -20,7 +20,7 @@ create table l1_message
|
||||
);
|
||||
|
||||
comment
|
||||
on column l1_message.status is 'undefined, pending, submitted, confirmed';
|
||||
on column l1_message.status is 'undefined, pending, submitted, confirmed, failed, expired';
|
||||
|
||||
create unique index l1_message_hash_uindex
|
||||
on l1_message (msg_hash);
|
||||
|
||||
@@ -21,7 +21,7 @@ create table l2_message
|
||||
);
|
||||
|
||||
comment
|
||||
on column l2_message.status is 'undefined, pending, submitted, confirmed';
|
||||
on column l2_message.status is 'undefined, pending, submitted, confirmed, failed, expired';
|
||||
|
||||
create unique index l2_message_hash_uindex
|
||||
on l2_message (msg_hash);
|
||||
|
||||
@@ -260,7 +260,7 @@ func (o *blockBatchOrm) GetPendingBatches(limit uint64) ([]string, error) {
|
||||
}
|
||||
|
||||
func (o *blockBatchOrm) GetLatestFinalizedBatch() (*BlockBatch, error) {
|
||||
row := o.db.QueryRowx(`SELECT * FROM block_batch WHERE rollup_status = $1 OR rollup_status = $2 ORDER BY index DESC;`, RollupFinalized, RollupFinalizationSkipped)
|
||||
row := o.db.QueryRowx(`select * from block_batch where index = (select max(index) from block_batch where rollup_status = $1);`, RollupFinalized)
|
||||
batch := &BlockBatch{}
|
||||
if err := row.StructScan(batch); err != nil {
|
||||
return nil, err
|
||||
@@ -407,7 +407,16 @@ func (o *blockBatchOrm) GetAssignedBatchIDs() ([]string, error) {
|
||||
return ids, rows.Close()
|
||||
}
|
||||
|
||||
func (o *blockBatchOrm) UpdateSkippedBatches() error {
|
||||
_, err := o.db.Exec(o.db.Rebind("update block_batch set rollup_status = ? where (proving_status = ? or proving_status = ?) and rollup_status = ?;"), RollupFinalizationSkipped, ProvingTaskSkipped, ProvingTaskFailed, RollupCommitted)
|
||||
return err
|
||||
func (o *blockBatchOrm) UpdateSkippedBatches() (int64, error) {
|
||||
res, err := o.db.Exec(o.db.Rebind("update block_batch set rollup_status = ? where (proving_status = ? or proving_status = ?) and rollup_status = ?;"), RollupFinalizationSkipped, ProvingTaskSkipped, ProvingTaskFailed, RollupCommitted)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
count, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
@@ -28,6 +28,9 @@ const (
|
||||
|
||||
// MsgFailed represents the from_layer message status is failed
|
||||
MsgFailed
|
||||
|
||||
// MsgExpired represents the from_layer message status is expired
|
||||
MsgExpired
|
||||
)
|
||||
|
||||
// L1Message is structure of stored layer1 bridge message
|
||||
@@ -151,7 +154,7 @@ type BlockBatchOrm interface {
|
||||
UpdateCommitTxHashAndRollupStatus(ctx context.Context, id string, commitTxHash string, status RollupStatus) error
|
||||
UpdateFinalizeTxHashAndRollupStatus(ctx context.Context, id string, finalizeTxHash string, status RollupStatus) error
|
||||
GetAssignedBatchIDs() ([]string, error)
|
||||
UpdateSkippedBatches() error
|
||||
UpdateSkippedBatches() (int64, error)
|
||||
|
||||
GetCommitTxHash(id string) (sql.NullString, error) // for unit tests only
|
||||
GetFinalizeTxHash(id string) (sql.NullString, error) // for unit tests only
|
||||
@@ -161,7 +164,7 @@ type BlockBatchOrm interface {
|
||||
type L1MessageOrm interface {
|
||||
GetL1MessageByNonce(nonce uint64) (*L1Message, error)
|
||||
GetL1MessageByMsgHash(msgHash string) (*L1Message, error)
|
||||
GetL1MessagesByStatus(status MsgStatus) ([]*L1Message, error)
|
||||
GetL1MessagesByStatus(status MsgStatus, limit uint64) ([]*L1Message, error)
|
||||
GetL1ProcessedNonce() (int64, error)
|
||||
SaveL1Messages(ctx context.Context, messages []*L1Message) error
|
||||
UpdateLayer2Hash(ctx context.Context, msgHash string, layer2Hash string) error
|
||||
|
||||
@@ -45,8 +45,8 @@ func (m *l1MessageOrm) GetL1MessageByNonce(nonce uint64) (*L1Message, error) {
|
||||
}
|
||||
|
||||
// GetL1MessagesByStatus fetch list of unprocessed messages given msg status
|
||||
func (m *l1MessageOrm) GetL1MessagesByStatus(status MsgStatus) ([]*L1Message, error) {
|
||||
rows, err := m.db.Queryx(`SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, status FROM l1_message WHERE status = $1 ORDER BY nonce ASC;`, status)
|
||||
func (m *l1MessageOrm) GetL1MessagesByStatus(status MsgStatus, limit uint64) ([]*L1Message, error) {
|
||||
rows, err := m.db.Queryx(`SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, status FROM l1_message WHERE status = $1 ORDER BY nonce ASC LIMIT $2;`, status, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user