mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-12 07:28:08 -05:00
Compare commits
39 Commits
bump-versi
...
maskpp/res
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f9530faf2 | ||
|
|
ce7962a1f2 | ||
|
|
20e345453e | ||
|
|
8548d6b25d | ||
|
|
d0cd0f654d | ||
|
|
2422bb8953 | ||
|
|
db957af22d | ||
|
|
a6ff1d01dd | ||
|
|
6aa02874c0 | ||
|
|
714343c2b8 | ||
|
|
a26cc23c9a | ||
|
|
7f1eddb6f1 | ||
|
|
40f703e87d | ||
|
|
7bd80f1656 | ||
|
|
a7a4115fb6 | ||
|
|
6c3f506e98 | ||
|
|
b589ebeaba | ||
|
|
631915480b | ||
|
|
ee15881a04 | ||
|
|
80058c55d7 | ||
|
|
fa0c8f1ba2 | ||
|
|
fd65128ec1 | ||
|
|
fd17b869cd | ||
|
|
3297e1ef9c | ||
|
|
828b0592f7 | ||
|
|
0265e1a1fe | ||
|
|
ddb6bf36f4 | ||
|
|
bce93a020d | ||
|
|
5a336bd5a9 | ||
|
|
857442fb6b | ||
|
|
1fc0477f43 | ||
|
|
c17f62718e | ||
|
|
5c9dd1bd32 | ||
|
|
e676f5e44a | ||
|
|
0156e42e18 | ||
|
|
7ee7f93f3f | ||
|
|
da096f8dd2 | ||
|
|
5ca3d1a03e | ||
|
|
a00d3e9a69 |
@@ -16,7 +16,8 @@
|
||||
"escalate_multiple_den": 10,
|
||||
"max_gas_price": 10000000000,
|
||||
"tx_type": "LegacyTx",
|
||||
"min_balance": 100000000000000000000
|
||||
"min_balance": 100000000000000000000,
|
||||
"pending_limit": 500
|
||||
},
|
||||
"message_sender_private_keys": [
|
||||
"1212121212121212121212121212121212121212121212121212121212121212"
|
||||
@@ -39,7 +40,8 @@
|
||||
"escalate_multiple_den": 10,
|
||||
"max_gas_price": 10000000000,
|
||||
"tx_type": "LegacyTx",
|
||||
"min_balance": 100000000000000000000
|
||||
"min_balance": 100000000000000000000,
|
||||
"pending_limit": 500
|
||||
},
|
||||
"message_sender_private_keys": [
|
||||
"1212121212121212121212121212121212121212121212121212121212121212"
|
||||
|
||||
@@ -30,7 +30,8 @@ type SenderConfig struct {
|
||||
// The transaction type to use: LegacyTx, AccessListTx, DynamicFeeTx
|
||||
TxType string `json:"tx_type"`
|
||||
// The min balance set for check and set balance for sender's accounts.
|
||||
MinBalance *big.Int `json:"min_balance,omitempty"`
|
||||
MinBalance *big.Int `json:"min_balance,omitempty"`
|
||||
PendingLimit int64 `json:"pending_limit,omitempty"`
|
||||
}
|
||||
|
||||
// RelayerConfig loads relayer configuration items.
|
||||
|
||||
@@ -3,15 +3,17 @@ package l1
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"time"
|
||||
|
||||
// not sure if this will make problems when relay with l1geth
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/accounts/abi"
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/crypto"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"modernc.org/mathutil"
|
||||
|
||||
"scroll-tech/common/utils"
|
||||
|
||||
"scroll-tech/database/orm"
|
||||
|
||||
@@ -55,7 +57,7 @@ func NewLayer1Relayer(ctx context.Context, db orm.L1MessageOrm, cfg *config.Rela
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Layer1Relayer{
|
||||
layer1 := &Layer1Relayer{
|
||||
ctx: ctx,
|
||||
sender: sender,
|
||||
db: db,
|
||||
@@ -63,7 +65,89 @@ func NewLayer1Relayer(ctx context.Context, db orm.L1MessageOrm, cfg *config.Rela
|
||||
cfg: cfg,
|
||||
stopCh: make(chan struct{}),
|
||||
confirmationCh: sender.ConfirmChan(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Deal with broken transactions.
|
||||
if err = layer1.prepare(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return layer1, nil
|
||||
}
|
||||
|
||||
// prepare to run check logic and until it's finished.
|
||||
func (r *Layer1Relayer) prepare(ctx context.Context) error {
|
||||
go func(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case cfm := <-r.confirmationCh:
|
||||
if !cfm.IsSuccessful {
|
||||
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
|
||||
} else {
|
||||
// @todo handle db error
|
||||
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, orm.MsgConfirmed, cfm.TxHash.String())
|
||||
if err != nil {
|
||||
log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err)
|
||||
}
|
||||
log.Info("transaction confirmed in layer2", "confirmation", cfm)
|
||||
}
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
|
||||
if err := r.checkSubmittedMessages(); err != nil {
|
||||
log.Error("failed to init layer1 submitted tx", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Wait forever util sender is empty.
|
||||
utils.TryTimes(-1, func() bool {
|
||||
return r.sender.PendingCount() == 0
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Layer1Relayer) checkSubmittedMessages() error {
|
||||
var blockNumber uint64
|
||||
BEGIN:
|
||||
msgs, err := r.db.GetL1Messages(
|
||||
map[string]interface{}{"status": orm.MsgSubmitted},
|
||||
fmt.Sprintf("AND height > %d", blockNumber),
|
||||
fmt.Sprintf("ORDER BY height ASC LIMIT %d", 100),
|
||||
)
|
||||
if err != nil || len(msgs) == 0 {
|
||||
return err
|
||||
}
|
||||
|
||||
for msg := msgs[0]; len(msgs) > 0; { //nolint:staticcheck
|
||||
// If pending txs pool is full, wait a while and retry.
|
||||
if r.sender.IsFull() {
|
||||
log.Warn("layer1 sender pending tx reaches pending limit")
|
||||
time.Sleep(time.Millisecond * 500)
|
||||
continue
|
||||
}
|
||||
msg, msgs = msgs[0], msgs[1:]
|
||||
|
||||
blockNumber = mathutil.MaxUint64(blockNumber, msg.Height)
|
||||
|
||||
data, err := r.packRelayMessage(msg)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
err = r.sender.LoadOrSendTx(
|
||||
common.HexToHash(msg.Layer2Hash),
|
||||
msg.MsgHash,
|
||||
&r.cfg.MessengerContractAddress,
|
||||
big.NewInt(0),
|
||||
data,
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("failed to load or send l1 submitted tx", "msg hash", msg.MsgHash, "err", err)
|
||||
}
|
||||
}
|
||||
goto BEGIN
|
||||
}
|
||||
|
||||
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
|
||||
@@ -89,7 +173,7 @@ func (r *Layer1Relayer) ProcessSavedEvents() {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Layer1Relayer) processSavedEvent(msg *orm.L1Message) error {
|
||||
func (r *Layer1Relayer) packRelayMessage(msg *orm.L1Message) ([]byte, error) {
|
||||
// @todo add support to relay multiple messages
|
||||
from := common.HexToAddress(msg.Sender)
|
||||
target := common.HexToAddress(msg.Target)
|
||||
@@ -107,9 +191,16 @@ func (r *Layer1Relayer) processSavedEvent(msg *orm.L1Message) error {
|
||||
if err != nil {
|
||||
log.Error("Failed to pack relayMessage", "msg.nonce", msg.Nonce, "msg.height", msg.Height, "err", err)
|
||||
// TODO: need to skip this message by changing its status to MsgError
|
||||
return nil, err
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (r *Layer1Relayer) processSavedEvent(msg *orm.L1Message) error {
|
||||
data, err := r.packRelayMessage(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hash, err := r.sender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
|
||||
if err != nil && err.Error() == "execution reverted: Message expired" {
|
||||
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, orm.MsgExpired)
|
||||
|
||||
@@ -2,28 +2,20 @@ package l2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
// not sure if this will make problems when relay with l1geth
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/accounts/abi"
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"modernc.org/mathutil"
|
||||
|
||||
"scroll-tech/common/utils"
|
||||
|
||||
"scroll-tech/database"
|
||||
"scroll-tech/database/orm"
|
||||
|
||||
bridge_abi "scroll-tech/bridge/abi"
|
||||
"scroll-tech/bridge/config"
|
||||
"scroll-tech/bridge/sender"
|
||||
"scroll-tech/bridge/utils"
|
||||
)
|
||||
|
||||
// Layer2Relayer is responsible for
|
||||
@@ -38,13 +30,11 @@ type Layer2Relayer struct {
|
||||
db database.OrmFactory
|
||||
cfg *config.RelayerConfig
|
||||
|
||||
messageSender *sender.Sender
|
||||
messageCh <-chan *sender.Confirmation
|
||||
l1MessengerABI *abi.ABI
|
||||
messageSender *sender.Sender
|
||||
messageCh <-chan *sender.Confirmation
|
||||
|
||||
rollupSender *sender.Sender
|
||||
rollupCh <-chan *sender.Confirmation
|
||||
l1RollupABI *abi.ABI
|
||||
|
||||
// A list of processing message.
|
||||
// key(string): confirmation ID, value(string): layer2 hash.
|
||||
@@ -76,326 +66,65 @@ func NewLayer2Relayer(ctx context.Context, db database.OrmFactory, cfg *config.R
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Layer2Relayer{
|
||||
layer2 := &Layer2Relayer{
|
||||
ctx: ctx,
|
||||
db: db,
|
||||
messageSender: messageSender,
|
||||
messageCh: messageSender.ConfirmChan(),
|
||||
l1MessengerABI: bridge_abi.L1MessengerMetaABI,
|
||||
rollupSender: rollupSender,
|
||||
rollupCh: rollupSender.ConfirmChan(),
|
||||
l1RollupABI: bridge_abi.RollupMetaABI,
|
||||
cfg: cfg,
|
||||
processingMessage: sync.Map{},
|
||||
processingCommitment: sync.Map{},
|
||||
processingFinalization: sync.Map{},
|
||||
stopCh: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Deal with broken transactions.
|
||||
if err = layer2.prepare(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return layer2, nil
|
||||
}
|
||||
|
||||
const processMsgLimit = 100
|
||||
|
||||
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
|
||||
func (r *Layer2Relayer) ProcessSavedEvents() {
|
||||
batch, err := r.db.GetLatestFinalizedBatch()
|
||||
if err != nil {
|
||||
log.Error("GetLatestFinalizedBatch failed", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// msgs are sorted by nonce in increasing order
|
||||
msgs, err := r.db.GetL2Messages(
|
||||
map[string]interface{}{"status": orm.MsgPending},
|
||||
fmt.Sprintf("AND height<=%d", batch.EndBlockNumber),
|
||||
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch unprocessed L2 messages", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// process messages in batches
|
||||
batchSize := mathutil.Min((runtime.GOMAXPROCS(0)+1)/2, r.messageSender.NumberOfAccounts())
|
||||
for size := 0; len(msgs) > 0; msgs = msgs[size:] {
|
||||
if size = len(msgs); size > batchSize {
|
||||
size = batchSize
|
||||
}
|
||||
var g errgroup.Group
|
||||
for _, msg := range msgs[:size] {
|
||||
msg := msg
|
||||
g.Go(func() error {
|
||||
return r.processSavedEvent(msg, batch.Index)
|
||||
})
|
||||
}
|
||||
if err := g.Wait(); err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("failed to process l2 saved event", "err", err)
|
||||
// prepare to run check logic and until it's finished.
|
||||
func (r *Layer2Relayer) prepare(ctx context.Context) error {
|
||||
go func(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case confirmation := <-r.messageCh:
|
||||
r.handleConfirmation(confirmation)
|
||||
case confirmation := <-r.rollupCh:
|
||||
r.handleConfirmation(confirmation)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
|
||||
func (r *Layer2Relayer) processSavedEvent(msg *orm.L2Message, index uint64) error {
|
||||
// @todo fetch merkle proof from l2geth
|
||||
log.Info("Processing L2 Message", "msg.nonce", msg.Nonce, "msg.height", msg.Height)
|
||||
|
||||
proof := bridge_abi.IL1ScrollMessengerL2MessageProof{
|
||||
BlockHeight: big.NewInt(int64(msg.Height)),
|
||||
BatchIndex: big.NewInt(0).SetUint64(index),
|
||||
MerkleProof: make([]byte, 0),
|
||||
}
|
||||
from := common.HexToAddress(msg.Sender)
|
||||
target := common.HexToAddress(msg.Target)
|
||||
value, ok := big.NewInt(0).SetString(msg.Value, 10)
|
||||
if !ok {
|
||||
// @todo maybe panic?
|
||||
log.Error("Failed to parse message value", "msg.nonce", msg.Nonce, "msg.height", msg.Height)
|
||||
// TODO: need to skip this message by changing its status to MsgError
|
||||
}
|
||||
fee, _ := big.NewInt(0).SetString(msg.Fee, 10)
|
||||
deadline := big.NewInt(int64(msg.Deadline))
|
||||
msgNonce := big.NewInt(int64(msg.Nonce))
|
||||
calldata := common.Hex2Bytes(msg.Calldata)
|
||||
data, err := r.l1MessengerABI.Pack("relayMessageWithProof", from, target, value, fee, deadline, msgNonce, calldata, proof)
|
||||
if err != nil {
|
||||
log.Error("Failed to pack relayMessageWithProof", "msg.nonce", msg.Nonce, "err", err)
|
||||
// TODO: need to skip this message by changing its status to MsgError
|
||||
if err := r.checkSubmittedMessages(); err != nil {
|
||||
log.Error("failed to init layer2 submitted tx", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
|
||||
if err != nil && err.Error() == "execution reverted: Message expired" {
|
||||
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgExpired)
|
||||
}
|
||||
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
|
||||
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgConfirmed)
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send relayMessageWithProof tx to layer1 ", "msg.height", msg.Height, "msg.MsgHash", msg.MsgHash, "err", err)
|
||||
}
|
||||
if err := r.checkCommittingBatches(); err != nil {
|
||||
log.Error("failed to init layer2 committed tx", "err", err)
|
||||
return err
|
||||
}
|
||||
log.Info("relayMessageWithProof to layer1", "msgHash", msg.MsgHash, "txhash", hash.String())
|
||||
|
||||
// save status in db
|
||||
// @todo handle db error
|
||||
err = r.db.UpdateLayer2StatusAndLayer1Hash(r.ctx, msg.MsgHash, orm.MsgSubmitted, hash.String())
|
||||
if err != nil {
|
||||
log.Error("UpdateLayer2StatusAndLayer1Hash failed", "msgHash", msg.MsgHash, "err", err)
|
||||
if err := r.checkFinalizingBatches(); err != nil {
|
||||
log.Error("failed to init layer2 finalized tx", "err", err)
|
||||
return err
|
||||
}
|
||||
r.processingMessage.Store(msg.MsgHash, msg.MsgHash)
|
||||
|
||||
// Wait forever until message sender and roller sender are empty.
|
||||
utils.TryTimes(-1, func() bool {
|
||||
return r.messageSender.PendingCount() == 0 && r.rollupSender.PendingCount() == 0
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// ProcessPendingBatches submit batch data to layer 1 rollup contract
|
||||
func (r *Layer2Relayer) ProcessPendingBatches() {
|
||||
// batches are sorted by batch index in increasing order
|
||||
batchesInDB, err := r.db.GetPendingBatches(1)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch pending L2 batches", "err", err)
|
||||
return
|
||||
}
|
||||
if len(batchesInDB) == 0 {
|
||||
return
|
||||
}
|
||||
id := batchesInDB[0]
|
||||
// @todo add support to relay multiple batches
|
||||
|
||||
batches, err := r.db.GetBlockBatches(map[string]interface{}{"id": id})
|
||||
if err != nil || len(batches) == 0 {
|
||||
log.Error("Failed to GetBlockBatches", "batch_id", id, "err", err)
|
||||
return
|
||||
}
|
||||
batch := batches[0]
|
||||
|
||||
traces, err := r.db.GetBlockTraces(map[string]interface{}{"batch_id": id}, "ORDER BY number ASC")
|
||||
if err != nil || len(traces) == 0 {
|
||||
log.Error("Failed to GetBlockTraces", "batch_id", id, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
layer2Batch := &bridge_abi.IZKRollupLayer2Batch{
|
||||
BatchIndex: batch.Index,
|
||||
ParentHash: common.HexToHash(batch.ParentHash),
|
||||
Blocks: make([]bridge_abi.IZKRollupLayer2BlockHeader, len(traces)),
|
||||
}
|
||||
|
||||
parentHash := common.HexToHash(batch.ParentHash)
|
||||
for i, trace := range traces {
|
||||
layer2Batch.Blocks[i] = bridge_abi.IZKRollupLayer2BlockHeader{
|
||||
BlockHash: trace.Header.Hash(),
|
||||
ParentHash: parentHash,
|
||||
BaseFee: trace.Header.BaseFee,
|
||||
StateRoot: trace.StorageTrace.RootAfter,
|
||||
BlockHeight: trace.Header.Number.Uint64(),
|
||||
GasUsed: 0,
|
||||
Timestamp: trace.Header.Time,
|
||||
ExtraData: make([]byte, 0),
|
||||
Txs: make([]bridge_abi.IZKRollupLayer2Transaction, len(trace.Transactions)),
|
||||
}
|
||||
for j, tx := range trace.Transactions {
|
||||
layer2Batch.Blocks[i].Txs[j] = bridge_abi.IZKRollupLayer2Transaction{
|
||||
Caller: tx.From,
|
||||
Nonce: tx.Nonce,
|
||||
Gas: tx.Gas,
|
||||
GasPrice: tx.GasPrice.ToInt(),
|
||||
Value: tx.Value.ToInt(),
|
||||
Data: common.Hex2Bytes(tx.Data),
|
||||
R: tx.R.ToInt(),
|
||||
S: tx.S.ToInt(),
|
||||
V: tx.V.ToInt().Uint64(),
|
||||
}
|
||||
if tx.To != nil {
|
||||
layer2Batch.Blocks[i].Txs[j].Target = *tx.To
|
||||
}
|
||||
layer2Batch.Blocks[i].GasUsed += trace.ExecutionResults[j].Gas
|
||||
}
|
||||
|
||||
// for next iteration
|
||||
parentHash = layer2Batch.Blocks[i].BlockHash
|
||||
}
|
||||
|
||||
data, err := r.l1RollupABI.Pack("commitBatch", layer2Batch)
|
||||
if err != nil {
|
||||
log.Error("Failed to pack commitBatch", "id", id, "index", batch.Index, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
txID := id + "-commit"
|
||||
// add suffix `-commit` to avoid duplication with finalize tx in unit tests
|
||||
hash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send commitBatch tx to layer1 ", "id", id, "index", batch.Index, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
log.Info("commitBatch in layer1", "batch_id", id, "index", batch.Index, "hash", hash)
|
||||
|
||||
// record and sync with db, @todo handle db error
|
||||
err = r.db.UpdateCommitTxHashAndRollupStatus(r.ctx, id, hash.String(), orm.RollupCommitting)
|
||||
if err != nil {
|
||||
log.Error("UpdateCommitTxHashAndRollupStatus failed", "id", id, "index", batch.Index, "err", err)
|
||||
}
|
||||
r.processingCommitment.Store(txID, id)
|
||||
}
|
||||
|
||||
// ProcessCommittedBatches submit proof to layer 1 rollup contract
|
||||
func (r *Layer2Relayer) ProcessCommittedBatches() {
|
||||
// set skipped batches in a single db operation
|
||||
if count, err := r.db.UpdateSkippedBatches(); err != nil {
|
||||
log.Error("UpdateSkippedBatches failed", "err", err)
|
||||
// continue anyway
|
||||
} else if count > 0 {
|
||||
log.Info("Skipping batches", "count", count)
|
||||
}
|
||||
|
||||
// batches are sorted by batch index in increasing order
|
||||
batches, err := r.db.GetCommittedBatches(1)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch committed L2 batches", "err", err)
|
||||
return
|
||||
}
|
||||
if len(batches) == 0 {
|
||||
return
|
||||
}
|
||||
id := batches[0]
|
||||
// @todo add support to relay multiple batches
|
||||
|
||||
status, err := r.db.GetProvingStatusByID(id)
|
||||
if err != nil {
|
||||
log.Error("GetProvingStatusByID failed", "id", id, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
switch status {
|
||||
case orm.ProvingTaskUnassigned, orm.ProvingTaskAssigned:
|
||||
// The proof for this block is not ready yet.
|
||||
return
|
||||
|
||||
case orm.ProvingTaskProved:
|
||||
// It's an intermediate state. The roller manager received the proof but has not verified
|
||||
// the proof yet. We don't roll up the proof until it's verified.
|
||||
return
|
||||
|
||||
case orm.ProvingTaskFailed, orm.ProvingTaskSkipped:
|
||||
// note: this is covered by UpdateSkippedBatches, but we keep it for completeness's sake
|
||||
|
||||
if err = r.db.UpdateRollupStatus(r.ctx, id, orm.RollupFinalizationSkipped); err != nil {
|
||||
log.Warn("UpdateRollupStatus failed", "id", id, "err", err)
|
||||
}
|
||||
|
||||
case orm.ProvingTaskVerified:
|
||||
log.Info("Start to roll up zk proof", "id", id)
|
||||
success := false
|
||||
|
||||
defer func() {
|
||||
// TODO: need to revisit this and have a more fine-grained error handling
|
||||
if !success {
|
||||
log.Info("Failed to upload the proof, change rollup status to FinalizationSkipped", "id", id)
|
||||
if err = r.db.UpdateRollupStatus(r.ctx, id, orm.RollupFinalizationSkipped); err != nil {
|
||||
log.Warn("UpdateRollupStatus failed", "id", id, "err", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
proofBuffer, instanceBuffer, err := r.db.GetVerifiedProofAndInstanceByID(id)
|
||||
if err != nil {
|
||||
log.Warn("fetch get proof by id failed", "id", id, "err", err)
|
||||
return
|
||||
}
|
||||
if proofBuffer == nil || instanceBuffer == nil {
|
||||
log.Warn("proof or instance not ready", "id", id)
|
||||
return
|
||||
}
|
||||
if len(proofBuffer)%32 != 0 {
|
||||
log.Error("proof buffer has wrong length", "id", id, "length", len(proofBuffer))
|
||||
return
|
||||
}
|
||||
if len(instanceBuffer)%32 != 0 {
|
||||
log.Warn("instance buffer has wrong length", "id", id, "length", len(instanceBuffer))
|
||||
return
|
||||
}
|
||||
|
||||
proof := utils.BufferToUint256Le(proofBuffer)
|
||||
instance := utils.BufferToUint256Le(instanceBuffer)
|
||||
data, err := r.l1RollupABI.Pack("finalizeBatchWithProof", common.HexToHash(id), proof, instance)
|
||||
if err != nil {
|
||||
log.Error("Pack finalizeBatchWithProof failed", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
txID := id + "-finalize"
|
||||
// add suffix `-finalize` to avoid duplication with commit tx in unit tests
|
||||
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
|
||||
hash := &txHash
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("finalizeBatchWithProof in layer1 failed", "id", id, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
log.Info("finalizeBatchWithProof in layer1", "batch_id", id, "hash", hash)
|
||||
|
||||
// record and sync with db, @todo handle db error
|
||||
err = r.db.UpdateFinalizeTxHashAndRollupStatus(r.ctx, id, hash.String(), orm.RollupFinalizing)
|
||||
if err != nil {
|
||||
log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "batch_id", id, "err", err)
|
||||
}
|
||||
success = true
|
||||
r.processingFinalization.Store(txID, id)
|
||||
|
||||
default:
|
||||
log.Error("encounter unreachable case in ProcessCommittedBatches",
|
||||
"block_status", status,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Start the relayer process
|
||||
func (r *Layer2Relayer) Start() {
|
||||
loop := func(ctx context.Context, f func()) {
|
||||
|
||||
171
bridge/l2/relayer_commit.go
Normal file
171
bridge/l2/relayer_commit.go
Normal file
@@ -0,0 +1,171 @@
|
||||
package l2
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"time"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"modernc.org/mathutil"
|
||||
|
||||
bridge_abi "scroll-tech/bridge/abi"
|
||||
"scroll-tech/bridge/sender"
|
||||
|
||||
"scroll-tech/database/orm"
|
||||
)
|
||||
|
||||
func (r *Layer2Relayer) checkCommittingBatches() error {
|
||||
var batchIndex uint64
|
||||
BEGIN:
|
||||
batches, err := r.db.GetBlockBatches(
|
||||
map[string]interface{}{"rollup_status": orm.RollupCommitting},
|
||||
fmt.Sprintf("AND index > %d", batchIndex),
|
||||
fmt.Sprintf("ORDER BY index ASC LIMIT %d", 10),
|
||||
)
|
||||
if err != nil || len(batches) == 0 {
|
||||
return err
|
||||
}
|
||||
|
||||
for batch := batches[0]; len(batches) > 0; { //nolint:staticcheck
|
||||
// If pending txs pool is full, wait a while and retry.
|
||||
if r.rollupSender.IsFull() {
|
||||
log.Warn("layer2 rollup sender pending committed tx reaches pending limit")
|
||||
time.Sleep(time.Millisecond * 500)
|
||||
continue
|
||||
}
|
||||
batch, batches = batches[0], batches[1:]
|
||||
|
||||
id := batch.ID
|
||||
batchIndex = mathutil.MaxUint64(batchIndex, batch.Index)
|
||||
|
||||
txStr, err := r.db.GetCommitTxHash(id)
|
||||
if err != nil {
|
||||
log.Error("failed to get commit_tx_hash from block_batch", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
_, data, err := r.packCommitBatch(id)
|
||||
if err != nil {
|
||||
log.Error("failed to load or send committed tx", "batch id", id, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
txID := id + "-commit"
|
||||
err = r.rollupSender.LoadOrSendTx(
|
||||
common.HexToHash(txStr.String),
|
||||
txID,
|
||||
&r.cfg.RollupContractAddress,
|
||||
big.NewInt(0),
|
||||
data,
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("failed to load or send tx", "batch id", id, "err", err)
|
||||
} else {
|
||||
r.processingCommitment.Store(txID, id)
|
||||
}
|
||||
}
|
||||
goto BEGIN
|
||||
}
|
||||
|
||||
func (r *Layer2Relayer) packCommitBatch(id string) (*orm.BlockBatch, []byte, error) {
|
||||
batches, err := r.db.GetBlockBatches(map[string]interface{}{"id": id})
|
||||
if err != nil || len(batches) == 0 {
|
||||
log.Error("Failed to GetBlockBatches", "batch_id", id, "err", err)
|
||||
return nil, nil, err
|
||||
}
|
||||
batch := batches[0]
|
||||
|
||||
traces, err := r.db.GetBlockTraces(map[string]interface{}{"batch_id": id}, "ORDER BY number ASC")
|
||||
if err != nil || len(traces) == 0 {
|
||||
log.Error("Failed to GetBlockTraces", "batch_id", id, "err", err)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
layer2Batch := &bridge_abi.IZKRollupLayer2Batch{
|
||||
BatchIndex: batch.Index,
|
||||
ParentHash: common.HexToHash(batch.ParentHash),
|
||||
Blocks: make([]bridge_abi.IZKRollupLayer2BlockHeader, len(traces)),
|
||||
}
|
||||
|
||||
parentHash := common.HexToHash(batch.ParentHash)
|
||||
for i, trace := range traces {
|
||||
layer2Batch.Blocks[i] = bridge_abi.IZKRollupLayer2BlockHeader{
|
||||
BlockHash: trace.Header.Hash(),
|
||||
ParentHash: parentHash,
|
||||
BaseFee: trace.Header.BaseFee,
|
||||
StateRoot: trace.StorageTrace.RootAfter,
|
||||
BlockHeight: trace.Header.Number.Uint64(),
|
||||
GasUsed: 0,
|
||||
Timestamp: trace.Header.Time,
|
||||
ExtraData: make([]byte, 0),
|
||||
Txs: make([]bridge_abi.IZKRollupLayer2Transaction, len(trace.Transactions)),
|
||||
}
|
||||
for j, tx := range trace.Transactions {
|
||||
layer2Batch.Blocks[i].Txs[j] = bridge_abi.IZKRollupLayer2Transaction{
|
||||
Caller: tx.From,
|
||||
Nonce: tx.Nonce,
|
||||
Gas: tx.Gas,
|
||||
GasPrice: tx.GasPrice.ToInt(),
|
||||
Value: tx.Value.ToInt(),
|
||||
Data: common.Hex2Bytes(tx.Data),
|
||||
R: tx.R.ToInt(),
|
||||
S: tx.S.ToInt(),
|
||||
V: tx.V.ToInt().Uint64(),
|
||||
}
|
||||
if tx.To != nil {
|
||||
layer2Batch.Blocks[i].Txs[j].Target = *tx.To
|
||||
}
|
||||
layer2Batch.Blocks[i].GasUsed += trace.ExecutionResults[j].Gas
|
||||
}
|
||||
|
||||
// for next iteration
|
||||
parentHash = layer2Batch.Blocks[i].BlockHash
|
||||
}
|
||||
|
||||
data, err := bridge_abi.RollupMetaABI.Pack("commitBatch", layer2Batch)
|
||||
if err != nil {
|
||||
log.Error("Failed to pack commitBatch", "id", id, "index", batch.Index, "err", err)
|
||||
return nil, nil, err
|
||||
}
|
||||
return batch, data, nil
|
||||
}
|
||||
|
||||
// ProcessPendingBatches submit batch data to layer 1 rollup contract
|
||||
func (r *Layer2Relayer) ProcessPendingBatches() {
|
||||
// batches are sorted by batch index in increasing order
|
||||
batchesInDB, err := r.db.GetPendingBatches(1)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch pending L2 batches", "err", err)
|
||||
return
|
||||
}
|
||||
if len(batchesInDB) == 0 {
|
||||
return
|
||||
}
|
||||
id := batchesInDB[0]
|
||||
// @todo add support to relay multiple batches
|
||||
|
||||
batch, data, err := r.packCommitBatch(id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
txID := id + "-commit"
|
||||
// add suffix `-commit` to avoid duplication with finalize tx in unit tests
|
||||
hash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send commitBatch tx to layer1 ", "id", id, "index", batch.Index, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
log.Info("commitBatch in layer1", "batch_id", id, "index", batch.Index, "hash", hash)
|
||||
|
||||
// record and sync with db, @todo handle db error
|
||||
err = r.db.UpdateCommitTxHashAndRollupStatus(r.ctx, id, hash.String(), orm.RollupCommitting)
|
||||
if err != nil {
|
||||
log.Error("UpdateCommitTxHashAndRollupStatus failed", "id", id, "index", batch.Index, "err", err)
|
||||
}
|
||||
r.processingCommitment.Store(txID, id)
|
||||
}
|
||||
195
bridge/l2/relayer_finalize.go
Normal file
195
bridge/l2/relayer_finalize.go
Normal file
@@ -0,0 +1,195 @@
|
||||
package l2
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"time"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"modernc.org/mathutil"
|
||||
|
||||
bridge_abi "scroll-tech/bridge/abi"
|
||||
"scroll-tech/bridge/sender"
|
||||
"scroll-tech/bridge/utils"
|
||||
|
||||
"scroll-tech/database/orm"
|
||||
)
|
||||
|
||||
func (r *Layer2Relayer) checkFinalizingBatches() error {
|
||||
var (
|
||||
batchLimit = 10
|
||||
batchIndex uint64
|
||||
)
|
||||
BEGIN:
|
||||
batches, err := r.db.GetBlockBatches(
|
||||
map[string]interface{}{"rollup_status": orm.RollupFinalizing},
|
||||
fmt.Sprintf("AND index > %d", batchIndex),
|
||||
fmt.Sprintf("ORDER BY index ASC LIMIT %d", batchLimit),
|
||||
)
|
||||
if err != nil || len(batches) == 0 {
|
||||
return err
|
||||
}
|
||||
|
||||
for batch := batches[0]; len(batches) > 0; { //nolint:staticcheck
|
||||
// If pending txs pool is full, wait a while and retry.
|
||||
if r.rollupSender.IsFull() {
|
||||
log.Warn("layer2 rollup sender pending finalized tx reaches pending limit")
|
||||
time.Sleep(time.Millisecond * 500)
|
||||
continue
|
||||
}
|
||||
batch, batches = batches[0], batches[1:]
|
||||
|
||||
id := batch.ID
|
||||
batchIndex = mathutil.MaxUint64(batchIndex, batch.Index)
|
||||
|
||||
txStr, err := r.db.GetFinalizeTxHash(id)
|
||||
if err != nil {
|
||||
log.Error("failed to get finalize_tx_hash from block_batch", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
data, err := r.packFinalizeBatch(id)
|
||||
if err != nil {
|
||||
log.Error("failed to pack finalize data", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
txID := id + "-finalize"
|
||||
err = r.rollupSender.LoadOrSendTx(
|
||||
common.HexToHash(txStr.String),
|
||||
txID,
|
||||
&r.cfg.RollupContractAddress,
|
||||
big.NewInt(0),
|
||||
data,
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("failed to load or send finalized tx", "batch id", id, "err", err)
|
||||
} else {
|
||||
r.processingFinalization.Store(txID, id)
|
||||
}
|
||||
}
|
||||
goto BEGIN
|
||||
}
|
||||
|
||||
func (r *Layer2Relayer) packFinalizeBatch(id string) ([]byte, error) {
|
||||
proofBuffer, instanceBuffer, err := r.db.GetVerifiedProofAndInstanceByID(id)
|
||||
if err != nil {
|
||||
log.Warn("fetch get proof by id failed", "id", id, "err", err)
|
||||
return nil, err
|
||||
}
|
||||
if proofBuffer == nil || instanceBuffer == nil {
|
||||
log.Warn("proof or instance not ready", "id", id)
|
||||
return nil, err
|
||||
}
|
||||
if len(proofBuffer)%32 != 0 {
|
||||
log.Error("proof buffer has wrong length", "id", id, "length", len(proofBuffer))
|
||||
return nil, err
|
||||
}
|
||||
if len(instanceBuffer)%32 != 0 {
|
||||
log.Warn("instance buffer has wrong length", "id", id, "length", len(instanceBuffer))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
proof := utils.BufferToUint256Le(proofBuffer)
|
||||
instance := utils.BufferToUint256Le(instanceBuffer)
|
||||
data, err := bridge_abi.RollupMetaABI.Pack("finalizeBatchWithProof", common.HexToHash(id), proof, instance)
|
||||
if err != nil {
|
||||
log.Error("Pack finalizeBatchWithProof failed", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// ProcessCommittedBatches submit proof to layer 1 rollup contract
|
||||
func (r *Layer2Relayer) ProcessCommittedBatches() {
|
||||
// set skipped batches in a single db operation
|
||||
if count, err := r.db.UpdateSkippedBatches(); err != nil {
|
||||
log.Error("UpdateSkippedBatches failed", "err", err)
|
||||
// continue anyway
|
||||
} else if count > 0 {
|
||||
log.Info("Skipping batches", "count", count)
|
||||
}
|
||||
|
||||
// batches are sorted by batch index in increasing order
|
||||
batches, err := r.db.GetCommittedBatches(1)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch committed L2 batches", "err", err)
|
||||
return
|
||||
}
|
||||
if len(batches) == 0 {
|
||||
return
|
||||
}
|
||||
id := batches[0]
|
||||
// @todo add support to relay multiple batches
|
||||
|
||||
status, err := r.db.GetProvingStatusByID(id)
|
||||
if err != nil {
|
||||
log.Error("GetProvingStatusByID failed", "id", id, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
switch status {
|
||||
case orm.ProvingTaskUnassigned, orm.ProvingTaskAssigned:
|
||||
// The proof for this block is not ready yet.
|
||||
return
|
||||
|
||||
case orm.ProvingTaskProved:
|
||||
// It's an intermediate state. The roller manager received the proof but has not verified
|
||||
// the proof yet. We don't roll up the proof until it's verified.
|
||||
return
|
||||
|
||||
case orm.ProvingTaskFailed, orm.ProvingTaskSkipped:
|
||||
// note: this is covered by UpdateSkippedBatches, but we keep it for completeness's sake
|
||||
|
||||
if err = r.db.UpdateRollupStatus(r.ctx, id, orm.RollupFinalizationSkipped); err != nil {
|
||||
log.Warn("UpdateRollupStatus failed", "id", id, "err", err)
|
||||
}
|
||||
|
||||
case orm.ProvingTaskVerified:
|
||||
log.Info("Start to roll up zk proof", "id", id)
|
||||
success := false
|
||||
|
||||
defer func() {
|
||||
// TODO: need to revisit this and have a more fine-grained error handling
|
||||
if !success {
|
||||
log.Info("Failed to upload the proof, change rollup status to FinalizationSkipped", "id", id)
|
||||
if err = r.db.UpdateRollupStatus(r.ctx, id, orm.RollupFinalizationSkipped); err != nil {
|
||||
log.Warn("UpdateRollupStatus failed", "id", id, "err", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Pack finalize data.
|
||||
data, err := r.packFinalizeBatch(id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
txID := id + "-finalize"
|
||||
// add suffix `-finalize` to avoid duplication with commit tx in unit tests
|
||||
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
|
||||
hash := &txHash
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("finalizeBatchWithProof in layer1 failed", "id", id, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
log.Info("finalizeBatchWithProof in layer1", "batch_id", id, "hash", hash)
|
||||
|
||||
// record and sync with db, @todo handle db error
|
||||
err = r.db.UpdateFinalizeTxHashAndRollupStatus(r.ctx, id, hash.String(), orm.RollupFinalizing)
|
||||
if err != nil {
|
||||
log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "batch_id", id, "err", err)
|
||||
}
|
||||
success = true
|
||||
r.processingFinalization.Store(txID, id)
|
||||
|
||||
default:
|
||||
log.Error("encounter unreachable case in ProcessCommittedBatches",
|
||||
"block_status", status,
|
||||
)
|
||||
}
|
||||
}
|
||||
183
bridge/l2/relayer_message.go
Normal file
183
bridge/l2/relayer_message.go
Normal file
@@ -0,0 +1,183 @@
|
||||
package l2
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"modernc.org/mathutil"
|
||||
|
||||
bridge_abi "scroll-tech/bridge/abi"
|
||||
"scroll-tech/bridge/sender"
|
||||
|
||||
"scroll-tech/database/orm"
|
||||
)
|
||||
|
||||
const processMsgLimit = 100
|
||||
|
||||
func (r *Layer2Relayer) checkSubmittedMessages() error {
|
||||
var nonce uint64
|
||||
BEGIN:
|
||||
// msgs are sorted by nonce in increasing order
|
||||
msgs, err := r.db.GetL2Messages(
|
||||
map[string]interface{}{"status": orm.MsgSubmitted},
|
||||
fmt.Sprintf("AND nonce > %d", nonce),
|
||||
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
|
||||
)
|
||||
if err != nil || len(msgs) == 0 {
|
||||
return err
|
||||
}
|
||||
|
||||
var batch *orm.BlockBatch
|
||||
for msg := msgs[0]; len(msgs) > 0; { //nolint:staticcheck
|
||||
// If pending pool is full, wait a while and retry.
|
||||
if r.messageSender.IsFull() {
|
||||
log.Warn("layer2 message tx sender is full")
|
||||
time.Sleep(time.Millisecond * 500)
|
||||
continue
|
||||
}
|
||||
msg, msgs = msgs[0], msgs[1:]
|
||||
nonce = mathutil.MaxUint64(nonce, msg.Nonce)
|
||||
|
||||
// Get batch by block number.
|
||||
if batch == nil || msg.Height < batch.StartBlockNumber || msg.Height > batch.EndBlockNumber {
|
||||
batches, err := r.db.GetBlockBatches(
|
||||
map[string]interface{}{},
|
||||
fmt.Sprintf("AND start_block_number <= %d AND end_block_number >= %d", msg.Height, msg.Height),
|
||||
)
|
||||
// If get batch failed, stop and return immediately.
|
||||
if err != nil || len(batches) == 0 {
|
||||
return err
|
||||
}
|
||||
batch = batches[0]
|
||||
}
|
||||
|
||||
data, err := r.packRelayMessage(msg, batch.Index)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err = r.messageSender.LoadOrSendTx(
|
||||
common.HexToHash(msg.Layer1Hash),
|
||||
msg.MsgHash,
|
||||
&r.cfg.MessengerContractAddress,
|
||||
big.NewInt(0),
|
||||
data,
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("failed to load or send l2 submitted tx", "batch id", batch.ID, "msg hash", msg.MsgHash, "err", err)
|
||||
} else {
|
||||
r.processingMessage.Store(msg.MsgHash, msg.MsgHash)
|
||||
}
|
||||
}
|
||||
goto BEGIN
|
||||
}
|
||||
|
||||
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
|
||||
func (r *Layer2Relayer) ProcessSavedEvents() {
|
||||
batch, err := r.db.GetLatestFinalizedBatch()
|
||||
if err != nil {
|
||||
log.Error("GetLatestFinalizedBatch failed", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// msgs are sorted by nonce in increasing order
|
||||
msgs, err := r.db.GetL2Messages(
|
||||
map[string]interface{}{"status": orm.MsgPending},
|
||||
fmt.Sprintf("AND height<=%d", batch.EndBlockNumber),
|
||||
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch unprocessed L2 messages", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// process messages in batches
|
||||
batchSize := mathutil.Min((runtime.GOMAXPROCS(0)+1)/2, r.messageSender.NumberOfAccounts())
|
||||
for size := 0; len(msgs) > 0; msgs = msgs[size:] {
|
||||
if size = len(msgs); size > batchSize {
|
||||
size = batchSize
|
||||
}
|
||||
var g errgroup.Group
|
||||
for _, msg := range msgs[:size] {
|
||||
msg := msg
|
||||
g.Go(func() error {
|
||||
return r.processSavedEvent(msg, batch.Index)
|
||||
})
|
||||
}
|
||||
if err := g.Wait(); err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("failed to process l2 saved event", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Layer2Relayer) processSavedEvent(msg *orm.L2Message, index uint64) error {
|
||||
data, err := r.packRelayMessage(msg, index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
|
||||
if err != nil && err.Error() == "execution reverted: Message expired" {
|
||||
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgExpired)
|
||||
}
|
||||
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
|
||||
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgConfirmed)
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send relayMessageWithProof tx to layer1 ", "msg.height", msg.Height, "msg.MsgHash", msg.MsgHash, "err", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
log.Info("relayMessageWithProof to layer1", "msgHash", msg.MsgHash, "txhash", hash.String())
|
||||
|
||||
// save status in db
|
||||
// @todo handle db error
|
||||
err = r.db.UpdateLayer2StatusAndLayer1Hash(r.ctx, msg.MsgHash, orm.MsgSubmitted, hash.String())
|
||||
if err != nil {
|
||||
log.Error("UpdateLayer2StatusAndLayer1Hash failed", "msgHash", msg.MsgHash, "err", err)
|
||||
return err
|
||||
}
|
||||
r.processingMessage.Store(msg.MsgHash, msg.MsgHash)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Layer2Relayer) packRelayMessage(msg *orm.L2Message, index uint64) ([]byte, error) {
|
||||
// @todo fetch merkle proof from l2geth
|
||||
log.Info("Processing L2 Message", "msg.nonce", msg.Nonce, "msg.height", msg.Height)
|
||||
|
||||
proof := bridge_abi.IL1ScrollMessengerL2MessageProof{
|
||||
BlockHeight: big.NewInt(int64(msg.Height)),
|
||||
BatchIndex: big.NewInt(0).SetUint64(index),
|
||||
MerkleProof: make([]byte, 0),
|
||||
}
|
||||
from := common.HexToAddress(msg.Sender)
|
||||
target := common.HexToAddress(msg.Target)
|
||||
value, ok := big.NewInt(0).SetString(msg.Value, 10)
|
||||
if !ok {
|
||||
// @todo maybe panic?
|
||||
log.Error("Failed to parse message value", "msg.nonce", msg.Nonce, "msg.height", msg.Height)
|
||||
// TODO: need to skip this message by changing its status to MsgError
|
||||
}
|
||||
fee, _ := big.NewInt(0).SetString(msg.Fee, 10)
|
||||
deadline := big.NewInt(int64(msg.Deadline))
|
||||
msgNonce := big.NewInt(int64(msg.Nonce))
|
||||
calldata := common.Hex2Bytes(msg.Calldata)
|
||||
data, err := bridge_abi.L1MessengerMetaABI.Pack("relayMessageWithProof", from, target, value, fee, deadline, msgNonce, calldata, proof)
|
||||
if err != nil {
|
||||
log.Error("Failed to pack relayMessageWithProof", "msg.nonce", msg.Nonce, "err", err)
|
||||
// TODO: need to skip this message by changing its status to MsgError
|
||||
return nil, err
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
@@ -88,6 +88,7 @@ type Sender struct {
|
||||
|
||||
blockNumber uint64 // Current block number on chain.
|
||||
baseFeePerGas uint64 // Current base fee per gas on chain
|
||||
pendingNum int64 // current pending tx count.
|
||||
pendingTxs sync.Map // Mapping from nonce to pending transaction
|
||||
confirmCh chan *Confirmation
|
||||
|
||||
@@ -149,6 +150,16 @@ func NewSender(ctx context.Context, config *config.SenderConfig, privs []*ecdsa.
|
||||
return sender, nil
|
||||
}
|
||||
|
||||
// PendingCount return the current pending txs num.
|
||||
func (s *Sender) PendingCount() int64 {
|
||||
return atomic.LoadInt64(&s.pendingNum)
|
||||
}
|
||||
|
||||
// PendingLimit return the maximum pendingTxs can handle.
|
||||
func (s *Sender) PendingLimit() int64 {
|
||||
return s.config.PendingLimit
|
||||
}
|
||||
|
||||
// Stop stop the sender module.
|
||||
func (s *Sender) Stop() {
|
||||
close(s.stopCh)
|
||||
@@ -199,16 +210,27 @@ func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, val
|
||||
}, nil
|
||||
}
|
||||
|
||||
// IsFull If pendingTxs pool is full return true.
|
||||
func (s *Sender) IsFull() bool {
|
||||
return atomic.LoadInt64(&s.pendingNum) == s.config.PendingLimit
|
||||
}
|
||||
|
||||
// SendTransaction send a signed L2tL1 transaction.
|
||||
func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.Int, data []byte) (hash common.Hash, err error) {
|
||||
if s.IsFull() {
|
||||
return common.Hash{}, fmt.Errorf("pending txs is full, pending size: %d", s.config.PendingLimit)
|
||||
}
|
||||
// We occupy the ID, in case some other threads call with the same ID in the same time
|
||||
if _, loaded := s.pendingTxs.LoadOrStore(ID, nil); loaded {
|
||||
return common.Hash{}, fmt.Errorf("has the repeat tx ID, ID: %s", ID)
|
||||
}
|
||||
atomic.AddInt64(&s.pendingNum, 1)
|
||||
|
||||
// get
|
||||
auth := s.auths.getAccount()
|
||||
if auth == nil {
|
||||
s.pendingTxs.Delete(ID) // release the ID on failure
|
||||
atomic.AddInt64(&s.pendingNum, -1)
|
||||
return common.Hash{}, ErrNoAvailableAccount
|
||||
}
|
||||
|
||||
@@ -216,6 +238,7 @@ func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.I
|
||||
defer func() {
|
||||
if err != nil {
|
||||
s.pendingTxs.Delete(ID) // release the ID on failure
|
||||
atomic.AddInt64(&s.pendingNum, -1)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -243,6 +266,61 @@ func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.I
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Sender) getTxAndAddr(txHash common.Hash) (*types.Transaction, uint64, common.Address, error) {
|
||||
tx, isPending, err := s.client.TransactionByHash(s.ctx, txHash)
|
||||
if err != nil {
|
||||
return nil, 0, common.Address{}, err
|
||||
}
|
||||
|
||||
sender, err := types.Sender(types.LatestSignerForChainID(s.chainID), tx)
|
||||
if err != nil {
|
||||
return nil, 0, common.Address{}, err
|
||||
}
|
||||
|
||||
if isPending {
|
||||
return tx, s.blockNumber, sender, nil
|
||||
}
|
||||
|
||||
receipt, err := s.client.TransactionReceipt(s.ctx, txHash)
|
||||
if err != nil {
|
||||
return nil, 0, common.Address{}, err
|
||||
}
|
||||
return tx, receipt.BlockNumber.Uint64(), sender, nil
|
||||
}
|
||||
|
||||
// LoadOrSendTx If the tx already exist in chain load it or resend it.
|
||||
func (s *Sender) LoadOrSendTx(destTxHash common.Hash, ID string, target *common.Address, value *big.Int, data []byte) error {
|
||||
tx, blockNumber, from, err := s.getTxAndAddr(destTxHash)
|
||||
// If this tx already exist load it to the pending.
|
||||
if err == nil && tx != nil {
|
||||
auth := s.auths.accounts[from]
|
||||
var feeData *FeeData
|
||||
feeData, err = s.getFeeData(auth, target, value, data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We occupy the ID, in case some other threads call with the same ID in the same time
|
||||
if _, loaded := s.pendingTxs.LoadOrStore(ID, nil); loaded {
|
||||
return fmt.Errorf("has the repeat tx ID, ID: %s", ID)
|
||||
}
|
||||
atomic.AddInt64(&s.pendingNum, 1)
|
||||
s.pendingTxs.Store(ID, &PendingTransaction{
|
||||
tx: tx,
|
||||
id: ID,
|
||||
signer: auth,
|
||||
// Record the transaction's block blockNumber.
|
||||
submitAt: blockNumber,
|
||||
feeData: feeData,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// Tx is dropped from chain node, resend it.
|
||||
_, err = s.SendTransaction(ID, target, value, data)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Sender) createAndSendTx(auth *bind.TransactOpts, feeData *FeeData, target *common.Address, value *big.Int, data []byte, overrideNonce *uint64) (tx *types.Transaction, err error) {
|
||||
var (
|
||||
nonce = auth.Nonce.Uint64()
|
||||
@@ -388,6 +466,7 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
|
||||
if (err == nil) && (receipt != nil) {
|
||||
if receipt.BlockNumber.Uint64() <= confirmed {
|
||||
s.pendingTxs.Delete(key)
|
||||
atomic.AddInt64(&s.pendingNum, -1)
|
||||
// send confirm message
|
||||
s.confirmCh <- &Confirmation{
|
||||
ID: pending.id,
|
||||
@@ -419,6 +498,7 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
|
||||
if strings.Contains(err.Error(), "nonce") {
|
||||
// This key can be deleted
|
||||
s.pendingTxs.Delete(key)
|
||||
atomic.AddInt64(&s.pendingNum, -1)
|
||||
// Try get receipt by the latest replaced tx hash
|
||||
receipt, err := s.client.TransactionReceipt(s.ctx, pending.tx.Hash())
|
||||
if (err == nil) && (receipt != nil) {
|
||||
|
||||
@@ -49,6 +49,7 @@ func TestSender(t *testing.T) {
|
||||
// Setup
|
||||
setupEnv(t)
|
||||
|
||||
t.Run("testLoadOrSendTx", testLoadOrSendTx)
|
||||
t.Run("test 1 account sender", func(t *testing.T) { testBatchSender(t, 1) })
|
||||
t.Run("test 3 account sender", func(t *testing.T) { testBatchSender(t, 3) })
|
||||
t.Run("test 8 account sender", func(t *testing.T) { testBatchSender(t, 8) })
|
||||
@@ -59,6 +60,38 @@ func TestSender(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func testLoadOrSendTx(t *testing.T) {
|
||||
senderCfg := cfg.L1Config.RelayerConfig.SenderConfig
|
||||
senderCfg.Confirmations = 0
|
||||
newSender, err := sender.NewSender(context.Background(), senderCfg, privateKeys)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
newSender2, err := sender.NewSender(context.Background(), senderCfg, privateKeys)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
toAddr := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
|
||||
id := "aaa"
|
||||
|
||||
hash, err := newSender.SendTransaction(id, &toAddr, big.NewInt(0), nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = newSender2.LoadOrSendTx(hash, id, &toAddr, big.NewInt(0), nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
select {
|
||||
case cfm := <-newSender2.ConfirmChan():
|
||||
assert.Equal(t, true, cfm.IsSuccessful)
|
||||
assert.Equal(t, hash, cfm.TxHash)
|
||||
assert.Equal(t, id, cfm.ID)
|
||||
case <-time.After(time.Second * 10):
|
||||
t.Error("testLoadOrSendTx test failed because of timeout")
|
||||
}
|
||||
}
|
||||
|
||||
func testBatchSender(t *testing.T, batchSize int) {
|
||||
for len(privateKeys) < batchSize {
|
||||
priv, err := crypto.GenerateKey()
|
||||
|
||||
@@ -11,6 +11,6 @@ if [ ! -n "${IPC_PATH}" ];then
|
||||
IPC_PATH="/tmp/l1geth_path.ipc"
|
||||
fi
|
||||
|
||||
exec geth --mine --datadir "." --unlock 0 --password "./password" --allow-insecure-unlock --nodiscover \
|
||||
exec geth --mine --datadir "." --unlock 0 --miner.etherbase "0x1c5a77d9fa7ef466951b2f01f724bca3a5820b63" --password "./password" --allow-insecure-unlock --nodiscover \
|
||||
--http --http.addr "0.0.0.0" --http.port 8545 --ws --ws.addr "0.0.0.0" --ws.port 8546 --ipcpath ${IPC_PATH}
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ import "time"
|
||||
|
||||
// TryTimes try run several times until the function return true.
|
||||
func TryTimes(times int, run func() bool) {
|
||||
for i := 0; i < times; i++ {
|
||||
for i := 0; times == -1 || i < times; i++ {
|
||||
if run() {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
var tag = "prealpha-v13.1"
|
||||
var tag = "prealpha-v13.2"
|
||||
|
||||
var commit = func() string {
|
||||
if info, ok := debug.ReadBuildInfo(); ok {
|
||||
|
||||
@@ -236,6 +236,29 @@ func (o *blockBatchOrm) BatchRecordExist(id string) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (o *blockBatchOrm) GetBatchesByRollupStatus(status RollupStatus, limit uint64) ([]string, error) {
|
||||
rows, err := o.db.Queryx(`SELECT id FROM block_batch WHERE rollup_status = $1 ORDER BY index ASC LIMIT $2`, status, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ids []string
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if err = rows.Scan(&id); err != nil {
|
||||
break
|
||||
}
|
||||
ids = append(ids, id)
|
||||
}
|
||||
if len(ids) == 0 || errors.Is(err, sql.ErrNoRows) {
|
||||
// log.Warn("no pending batches in db", "err", err)
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ids, rows.Close()
|
||||
}
|
||||
|
||||
func (o *blockBatchOrm) GetPendingBatches(limit uint64) ([]string, error) {
|
||||
rows, err := o.db.Queryx(`SELECT id FROM block_batch WHERE rollup_status = $1 ORDER BY index ASC LIMIT $2`, RollupPending, limit)
|
||||
if err != nil {
|
||||
|
||||
@@ -46,6 +46,7 @@ type L1Message struct {
|
||||
Target string `json:"target" db:"target"`
|
||||
Calldata string `json:"calldata" db:"calldata"`
|
||||
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
|
||||
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
|
||||
Status MsgStatus `json:"status" db:"status"`
|
||||
}
|
||||
|
||||
@@ -62,6 +63,7 @@ type L2Message struct {
|
||||
Target string `json:"target" db:"target"`
|
||||
Calldata string `json:"calldata" db:"calldata"`
|
||||
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
|
||||
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
|
||||
Status MsgStatus `json:"status" db:"status"`
|
||||
}
|
||||
|
||||
@@ -145,6 +147,7 @@ type BlockBatchOrm interface {
|
||||
ResetProvingStatusFor(before ProvingStatus) error
|
||||
NewBatchInDBTx(dbTx *sqlx.Tx, startBlock *BlockInfo, endBlock *BlockInfo, parentHash string, totalTxNum uint64, gasUsed uint64) (string, error)
|
||||
BatchRecordExist(id string) (bool, error)
|
||||
GetBatchesByRollupStatus(status RollupStatus, limit uint64) ([]string, error)
|
||||
GetPendingBatches(limit uint64) ([]string, error)
|
||||
GetCommittedBatches(limit uint64) ([]string, error)
|
||||
GetRollupStatus(id string) (RollupStatus, error)
|
||||
@@ -166,6 +169,7 @@ type L1MessageOrm interface {
|
||||
GetL1MessageByMsgHash(msgHash string) (*L1Message, error)
|
||||
GetL1MessagesByStatus(status MsgStatus, limit uint64) ([]*L1Message, error)
|
||||
GetL1ProcessedNonce() (int64, error)
|
||||
GetL1Messages(fields map[string]interface{}, args ...string) ([]*L1Message, error)
|
||||
SaveL1Messages(ctx context.Context, messages []*L1Message) error
|
||||
UpdateLayer2Hash(ctx context.Context, msgHash string, layer2Hash string) error
|
||||
UpdateLayer1Status(ctx context.Context, msgHash string, status MsgStatus) error
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
@@ -46,7 +48,7 @@ func (m *l1MessageOrm) GetL1MessageByNonce(nonce uint64) (*L1Message, error) {
|
||||
|
||||
// GetL1MessagesByStatus fetch list of unprocessed messages given msg status
|
||||
func (m *l1MessageOrm) GetL1MessagesByStatus(status MsgStatus, limit uint64) ([]*L1Message, error) {
|
||||
rows, err := m.db.Queryx(`SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, status FROM l1_message WHERE status = $1 ORDER BY nonce ASC LIMIT $2;`, status, limit)
|
||||
rows, err := m.db.Queryx(`SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, layer2_hash, status FROM l1_message WHERE status = $1 ORDER BY nonce ASC LIMIT $2;`, status, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -87,6 +89,37 @@ func (m *l1MessageOrm) GetL1ProcessedNonce() (int64, error) {
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
// GetL1Messages get l1 messages by k-v map and args.
|
||||
func (m *l1MessageOrm) GetL1Messages(fields map[string]interface{}, args ...string) ([]*L1Message, error) {
|
||||
query := "SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, layer2_hash, status FROM l2_message WHERE 1 = 1 "
|
||||
for key := range fields {
|
||||
query += fmt.Sprintf(" AND %s=:%s ", key, key)
|
||||
}
|
||||
query = strings.Join(append([]string{query}, args...), " ")
|
||||
|
||||
db := m.db
|
||||
rows, err := db.NamedQuery(db.Rebind(query), fields)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var msgs []*L1Message
|
||||
for rows.Next() {
|
||||
msg := &L1Message{}
|
||||
if err = rows.StructScan(&msg); err != nil {
|
||||
break
|
||||
}
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
if len(msgs) == 0 || errors.Is(err, sql.ErrNoRows) {
|
||||
// log.Warn("no unprocessed layer2 messages in db", "err", err)
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return msgs, rows.Close()
|
||||
}
|
||||
|
||||
// SaveL1Messages batch save a list of layer1 messages
|
||||
func (m *l1MessageOrm) SaveL1Messages(ctx context.Context, messages []*L1Message) error {
|
||||
if len(messages) == 0 {
|
||||
@@ -107,9 +140,10 @@ func (m *l1MessageOrm) SaveL1Messages(ctx context.Context, messages []*L1Message
|
||||
"deadline": msg.Deadline,
|
||||
"calldata": msg.Calldata,
|
||||
"layer1_hash": msg.Layer1Hash,
|
||||
"layer2_hash": msg.Layer2Hash,
|
||||
}
|
||||
}
|
||||
_, err := m.db.NamedExec(`INSERT INTO public.l1_message (nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash) VALUES (:nonce, :msg_hash, :height, :sender, :target, :value, :fee, :gas_limit, :deadline, :calldata, :layer1_hash);`, messageMaps)
|
||||
_, err := m.db.NamedExec(`INSERT INTO public.l1_message (nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, layer2_hash) VALUES (:nonce, :msg_hash, :height, :sender, :target, :value, :fee, :gas_limit, :deadline, :calldata, :layer1_hash, :layer2_hash);`, messageMaps)
|
||||
if err != nil {
|
||||
nonces := make([]uint64, 0, len(messages))
|
||||
heights := make([]uint64, 0, len(messages))
|
||||
|
||||
@@ -88,9 +88,9 @@ func (m *layer2MessageOrm) GetL2ProcessedNonce() (int64, error) {
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
// GetL2MessagesByStatus fetch list of messages given msg status
|
||||
// GetL2Messages fetch list of messages given msg status
|
||||
func (m *layer2MessageOrm) GetL2Messages(fields map[string]interface{}, args ...string) ([]*L2Message, error) {
|
||||
query := "SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer2_hash FROM l2_message WHERE 1 = 1 "
|
||||
query := "SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer2_hash, layer1_hash FROM l2_message WHERE 1 = 1 "
|
||||
for key := range fields {
|
||||
query += fmt.Sprintf("AND %s=:%s ", key, key)
|
||||
}
|
||||
@@ -138,11 +138,12 @@ func (m *layer2MessageOrm) SaveL2Messages(ctx context.Context, messages []*L2Mes
|
||||
"gas_limit": msg.GasLimit,
|
||||
"deadline": msg.Deadline,
|
||||
"calldata": msg.Calldata,
|
||||
"layer1_hash": msg.Layer1Hash,
|
||||
"layer2_hash": msg.Layer2Hash,
|
||||
}
|
||||
}
|
||||
|
||||
_, err := m.db.NamedExec(`INSERT INTO public.l2_message (nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer2_hash) VALUES (:nonce, :msg_hash, :height, :sender, :target, :value, :fee, :gas_limit, :deadline, :calldata, :layer2_hash);`, messageMaps)
|
||||
_, err := m.db.NamedExec(`INSERT INTO public.l2_message (nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, layer2_hash) VALUES (:nonce, :msg_hash, :height, :sender, :target, :value, :fee, :gas_limit, :deadline, :calldata, :layer1_hash, :layer2_hash);`, messageMaps)
|
||||
if err != nil {
|
||||
nonces := make([]uint64, 0, len(messages))
|
||||
heights := make([]uint64, 0, len(messages))
|
||||
|
||||
Reference in New Issue
Block a user