Compare commits

...

39 Commits

Author SHA1 Message Date
maskpp
9f9530faf2 trigger ci 2023-02-23 14:47:13 +08:00
maskpp
ce7962a1f2 fix comments. 2023-02-17 11:44:21 +08:00
maskpp
20e345453e Add Handle confirm process. 2023-02-17 11:37:14 +08:00
maskpp
8548d6b25d fix bug 2023-02-16 22:17:18 +08:00
maskpp
d0cd0f654d Change l2 relayer's init logic. 2023-02-16 22:10:57 +08:00
maskpp
2422bb8953 Update version 2023-02-14 11:21:37 +08:00
maskpp
db957af22d change check logic 2023-02-14 11:11:01 +08:00
maskpp
a6ff1d01dd change check logic 2023-02-14 11:04:06 +08:00
maskpp
6aa02874c0 change check logic 2023-02-14 10:59:14 +08:00
maskpp
714343c2b8 Update bridge/l1/relayer.go
Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
2023-02-14 10:02:53 +08:00
HAOYUatHZ
a26cc23c9a Merge branch 'staging' into maskpp/resender 2023-02-13 20:40:42 +08:00
maskpp
7f1eddb6f1 Fix test case. 2023-02-13 16:24:06 +08:00
maskpp
40f703e87d Merge staging branch and fix conflict. 2023-02-13 16:10:27 +08:00
colinlyguo
7bd80f1656 bump version 2023-02-10 21:52:56 +08:00
colin
a7a4115fb6 Merge branch 'staging' into maskpp/resender 2023-02-10 21:51:26 +08:00
colinlyguo
6c3f506e98 rename messagePack to packRelayMessage 2023-02-10 19:55:44 +08:00
maskpp
b589ebeaba revert mistake change 2023-02-10 16:49:31 +08:00
maskpp
631915480b fix comments. 2023-02-10 16:42:59 +08:00
maskpp
ee15881a04 Update bridge/l2/relayer_finalize.go
Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>
2023-02-10 16:35:55 +08:00
maskpp
80058c55d7 fix sender test case. 2023-02-10 10:34:38 +08:00
maskpp
fa0c8f1ba2 merge staging branch and fix conflict 2023-02-10 10:21:29 +08:00
maskpp
fd65128ec1 Update bridge/l2/relayer_finalize.go
Co-authored-by: ChuhanJin <60994121+ChuhanJin@users.noreply.github.com>
2023-02-10 10:08:39 +08:00
maskpp
fd17b869cd Update bridge/l2/relayer_commit.go
Co-authored-by: ChuhanJin <60994121+ChuhanJin@users.noreply.github.com>
2023-02-10 10:08:22 +08:00
maskpp
3297e1ef9c Update bridge/l1/relayer.go
Co-authored-by: ChuhanJin <60994121+ChuhanJin@users.noreply.github.com>
2023-02-10 10:06:20 +08:00
maskpp
828b0592f7 fix ci 2023-02-09 17:03:52 +08:00
maskpp
0265e1a1fe fix ci 2023-02-09 14:56:31 +08:00
maskpp
ddb6bf36f4 fix ci 2023-02-09 14:51:20 +08:00
maskpp
bce93a020d Upgrade check logic. 2023-02-09 14:27:56 +08:00
maskpp
5a336bd5a9 Upgrade check logic. 2023-02-09 11:54:30 +08:00
maskpp
857442fb6b fix ci lint 2023-02-08 18:33:27 +08:00
maskpp
1fc0477f43 fix comments 2023-02-08 18:27:54 +08:00
maskpp
c17f62718e fix comments 2023-02-08 18:27:54 +08:00
maskpp
5c9dd1bd32 Merge branch 'staging' into maskpp/resender 2023-02-08 18:27:28 +08:00
maskpp
e676f5e44a Update bridge/l2/relayer_message.go
Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
2023-02-08 17:55:21 +08:00
maskpp
0156e42e18 insert 2023-02-08 10:34:37 +08:00
maskpp
7ee7f93f3f The latest l1geth require . 2023-02-08 10:34:37 +08:00
maskpp
da096f8dd2 Merge branch 'staging' into maskpp/resender 2023-02-07 21:11:27 +08:00
maskpp
5ca3d1a03e Update version 2023-02-07 21:02:32 +08:00
maskpp
a00d3e9a69 resend tx 2023-02-07 20:59:45 +08:00
16 changed files with 871 additions and 324 deletions

View File

@@ -16,7 +16,8 @@
"escalate_multiple_den": 10, "escalate_multiple_den": 10,
"max_gas_price": 10000000000, "max_gas_price": 10000000000,
"tx_type": "LegacyTx", "tx_type": "LegacyTx",
"min_balance": 100000000000000000000 "min_balance": 100000000000000000000,
"pending_limit": 500
}, },
"message_sender_private_keys": [ "message_sender_private_keys": [
"1212121212121212121212121212121212121212121212121212121212121212" "1212121212121212121212121212121212121212121212121212121212121212"
@@ -39,7 +40,8 @@
"escalate_multiple_den": 10, "escalate_multiple_den": 10,
"max_gas_price": 10000000000, "max_gas_price": 10000000000,
"tx_type": "LegacyTx", "tx_type": "LegacyTx",
"min_balance": 100000000000000000000 "min_balance": 100000000000000000000,
"pending_limit": 500
}, },
"message_sender_private_keys": [ "message_sender_private_keys": [
"1212121212121212121212121212121212121212121212121212121212121212" "1212121212121212121212121212121212121212121212121212121212121212"

View File

@@ -30,7 +30,8 @@ type SenderConfig struct {
// The transaction type to use: LegacyTx, AccessListTx, DynamicFeeTx // The transaction type to use: LegacyTx, AccessListTx, DynamicFeeTx
TxType string `json:"tx_type"` TxType string `json:"tx_type"`
// The min balance set for check and set balance for sender's accounts. // The min balance set for check and set balance for sender's accounts.
MinBalance *big.Int `json:"min_balance,omitempty"` MinBalance *big.Int `json:"min_balance,omitempty"`
PendingLimit int64 `json:"pending_limit,omitempty"`
} }
// RelayerConfig loads relayer configuration items. // RelayerConfig loads relayer configuration items.

View File

@@ -3,15 +3,17 @@ package l1
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"math/big" "math/big"
"time" "time"
// not sure if this will make problems when relay with l1geth
"github.com/scroll-tech/go-ethereum/accounts/abi" "github.com/scroll-tech/go-ethereum/accounts/abi"
"github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/crypto" "github.com/scroll-tech/go-ethereum/crypto"
"github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/log"
"modernc.org/mathutil"
"scroll-tech/common/utils"
"scroll-tech/database/orm" "scroll-tech/database/orm"
@@ -55,7 +57,7 @@ func NewLayer1Relayer(ctx context.Context, db orm.L1MessageOrm, cfg *config.Rela
return nil, err return nil, err
} }
return &Layer1Relayer{ layer1 := &Layer1Relayer{
ctx: ctx, ctx: ctx,
sender: sender, sender: sender,
db: db, db: db,
@@ -63,7 +65,89 @@ func NewLayer1Relayer(ctx context.Context, db orm.L1MessageOrm, cfg *config.Rela
cfg: cfg, cfg: cfg,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
confirmationCh: sender.ConfirmChan(), confirmationCh: sender.ConfirmChan(),
}, nil }
// Deal with broken transactions.
if err = layer1.prepare(ctx); err != nil {
return nil, err
}
return layer1, nil
}
// prepare to run check logic and until it's finished.
func (r *Layer1Relayer) prepare(ctx context.Context) error {
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case cfm := <-r.confirmationCh:
if !cfm.IsSuccessful {
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
} else {
// @todo handle db error
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, orm.MsgConfirmed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err)
}
log.Info("transaction confirmed in layer2", "confirmation", cfm)
}
}
}
}(ctx)
if err := r.checkSubmittedMessages(); err != nil {
log.Error("failed to init layer1 submitted tx", "err", err)
return err
}
// Wait forever util sender is empty.
utils.TryTimes(-1, func() bool {
return r.sender.PendingCount() == 0
})
return nil
}
func (r *Layer1Relayer) checkSubmittedMessages() error {
var blockNumber uint64
BEGIN:
msgs, err := r.db.GetL1Messages(
map[string]interface{}{"status": orm.MsgSubmitted},
fmt.Sprintf("AND height > %d", blockNumber),
fmt.Sprintf("ORDER BY height ASC LIMIT %d", 100),
)
if err != nil || len(msgs) == 0 {
return err
}
for msg := msgs[0]; len(msgs) > 0; { //nolint:staticcheck
// If pending txs pool is full, wait a while and retry.
if r.sender.IsFull() {
log.Warn("layer1 sender pending tx reaches pending limit")
time.Sleep(time.Millisecond * 500)
continue
}
msg, msgs = msgs[0], msgs[1:]
blockNumber = mathutil.MaxUint64(blockNumber, msg.Height)
data, err := r.packRelayMessage(msg)
if err != nil {
continue
}
err = r.sender.LoadOrSendTx(
common.HexToHash(msg.Layer2Hash),
msg.MsgHash,
&r.cfg.MessengerContractAddress,
big.NewInt(0),
data,
)
if err != nil {
log.Error("failed to load or send l1 submitted tx", "msg hash", msg.MsgHash, "err", err)
}
}
goto BEGIN
} }
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain // ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
@@ -89,7 +173,7 @@ func (r *Layer1Relayer) ProcessSavedEvents() {
} }
} }
func (r *Layer1Relayer) processSavedEvent(msg *orm.L1Message) error { func (r *Layer1Relayer) packRelayMessage(msg *orm.L1Message) ([]byte, error) {
// @todo add support to relay multiple messages // @todo add support to relay multiple messages
from := common.HexToAddress(msg.Sender) from := common.HexToAddress(msg.Sender)
target := common.HexToAddress(msg.Target) target := common.HexToAddress(msg.Target)
@@ -107,9 +191,16 @@ func (r *Layer1Relayer) processSavedEvent(msg *orm.L1Message) error {
if err != nil { if err != nil {
log.Error("Failed to pack relayMessage", "msg.nonce", msg.Nonce, "msg.height", msg.Height, "err", err) log.Error("Failed to pack relayMessage", "msg.nonce", msg.Nonce, "msg.height", msg.Height, "err", err)
// TODO: need to skip this message by changing its status to MsgError // TODO: need to skip this message by changing its status to MsgError
return nil, err
}
return data, nil
}
func (r *Layer1Relayer) processSavedEvent(msg *orm.L1Message) error {
data, err := r.packRelayMessage(msg)
if err != nil {
return err return err
} }
hash, err := r.sender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data) hash, err := r.sender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
if err != nil && err.Error() == "execution reverted: Message expired" { if err != nil && err.Error() == "execution reverted: Message expired" {
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, orm.MsgExpired) return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, orm.MsgExpired)

View File

@@ -2,28 +2,20 @@ package l2
import ( import (
"context" "context"
"errors"
"fmt"
"math/big"
"runtime"
"sync" "sync"
"time" "time"
// not sure if this will make problems when relay with l1geth // not sure if this will make problems when relay with l1geth
"github.com/scroll-tech/go-ethereum/accounts/abi"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/log"
"golang.org/x/sync/errgroup"
"modernc.org/mathutil" "scroll-tech/common/utils"
"scroll-tech/database" "scroll-tech/database"
"scroll-tech/database/orm" "scroll-tech/database/orm"
bridge_abi "scroll-tech/bridge/abi"
"scroll-tech/bridge/config" "scroll-tech/bridge/config"
"scroll-tech/bridge/sender" "scroll-tech/bridge/sender"
"scroll-tech/bridge/utils"
) )
// Layer2Relayer is responsible for // Layer2Relayer is responsible for
@@ -38,13 +30,11 @@ type Layer2Relayer struct {
db database.OrmFactory db database.OrmFactory
cfg *config.RelayerConfig cfg *config.RelayerConfig
messageSender *sender.Sender messageSender *sender.Sender
messageCh <-chan *sender.Confirmation messageCh <-chan *sender.Confirmation
l1MessengerABI *abi.ABI
rollupSender *sender.Sender rollupSender *sender.Sender
rollupCh <-chan *sender.Confirmation rollupCh <-chan *sender.Confirmation
l1RollupABI *abi.ABI
// A list of processing message. // A list of processing message.
// key(string): confirmation ID, value(string): layer2 hash. // key(string): confirmation ID, value(string): layer2 hash.
@@ -76,326 +66,65 @@ func NewLayer2Relayer(ctx context.Context, db database.OrmFactory, cfg *config.R
return nil, err return nil, err
} }
return &Layer2Relayer{ layer2 := &Layer2Relayer{
ctx: ctx, ctx: ctx,
db: db, db: db,
messageSender: messageSender, messageSender: messageSender,
messageCh: messageSender.ConfirmChan(), messageCh: messageSender.ConfirmChan(),
l1MessengerABI: bridge_abi.L1MessengerMetaABI,
rollupSender: rollupSender, rollupSender: rollupSender,
rollupCh: rollupSender.ConfirmChan(), rollupCh: rollupSender.ConfirmChan(),
l1RollupABI: bridge_abi.RollupMetaABI,
cfg: cfg, cfg: cfg,
processingMessage: sync.Map{}, processingMessage: sync.Map{},
processingCommitment: sync.Map{}, processingCommitment: sync.Map{},
processingFinalization: sync.Map{}, processingFinalization: sync.Map{},
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
}, nil }
// Deal with broken transactions.
if err = layer2.prepare(ctx); err != nil {
return nil, err
}
return layer2, nil
} }
const processMsgLimit = 100 // prepare to run check logic and until it's finished.
func (r *Layer2Relayer) prepare(ctx context.Context) error {
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain go func(ctx context.Context) {
func (r *Layer2Relayer) ProcessSavedEvents() { for {
batch, err := r.db.GetLatestFinalizedBatch() select {
if err != nil { case <-ctx.Done():
log.Error("GetLatestFinalizedBatch failed", "err", err) return
return case confirmation := <-r.messageCh:
} r.handleConfirmation(confirmation)
case confirmation := <-r.rollupCh:
// msgs are sorted by nonce in increasing order r.handleConfirmation(confirmation)
msgs, err := r.db.GetL2Messages(
map[string]interface{}{"status": orm.MsgPending},
fmt.Sprintf("AND height<=%d", batch.EndBlockNumber),
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
)
if err != nil {
log.Error("Failed to fetch unprocessed L2 messages", "err", err)
return
}
// process messages in batches
batchSize := mathutil.Min((runtime.GOMAXPROCS(0)+1)/2, r.messageSender.NumberOfAccounts())
for size := 0; len(msgs) > 0; msgs = msgs[size:] {
if size = len(msgs); size > batchSize {
size = batchSize
}
var g errgroup.Group
for _, msg := range msgs[:size] {
msg := msg
g.Go(func() error {
return r.processSavedEvent(msg, batch.Index)
})
}
if err := g.Wait(); err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("failed to process l2 saved event", "err", err)
} }
return
} }
} }(ctx)
}
func (r *Layer2Relayer) processSavedEvent(msg *orm.L2Message, index uint64) error { if err := r.checkSubmittedMessages(); err != nil {
// @todo fetch merkle proof from l2geth log.Error("failed to init layer2 submitted tx", "err", err)
log.Info("Processing L2 Message", "msg.nonce", msg.Nonce, "msg.height", msg.Height)
proof := bridge_abi.IL1ScrollMessengerL2MessageProof{
BlockHeight: big.NewInt(int64(msg.Height)),
BatchIndex: big.NewInt(0).SetUint64(index),
MerkleProof: make([]byte, 0),
}
from := common.HexToAddress(msg.Sender)
target := common.HexToAddress(msg.Target)
value, ok := big.NewInt(0).SetString(msg.Value, 10)
if !ok {
// @todo maybe panic?
log.Error("Failed to parse message value", "msg.nonce", msg.Nonce, "msg.height", msg.Height)
// TODO: need to skip this message by changing its status to MsgError
}
fee, _ := big.NewInt(0).SetString(msg.Fee, 10)
deadline := big.NewInt(int64(msg.Deadline))
msgNonce := big.NewInt(int64(msg.Nonce))
calldata := common.Hex2Bytes(msg.Calldata)
data, err := r.l1MessengerABI.Pack("relayMessageWithProof", from, target, value, fee, deadline, msgNonce, calldata, proof)
if err != nil {
log.Error("Failed to pack relayMessageWithProof", "msg.nonce", msg.Nonce, "err", err)
// TODO: need to skip this message by changing its status to MsgError
return err return err
} }
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data) if err := r.checkCommittingBatches(); err != nil {
if err != nil && err.Error() == "execution reverted: Message expired" { log.Error("failed to init layer2 committed tx", "err", err)
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgExpired)
}
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgConfirmed)
}
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send relayMessageWithProof tx to layer1 ", "msg.height", msg.Height, "msg.MsgHash", msg.MsgHash, "err", err)
}
return err return err
} }
log.Info("relayMessageWithProof to layer1", "msgHash", msg.MsgHash, "txhash", hash.String())
// save status in db if err := r.checkFinalizingBatches(); err != nil {
// @todo handle db error log.Error("failed to init layer2 finalized tx", "err", err)
err = r.db.UpdateLayer2StatusAndLayer1Hash(r.ctx, msg.MsgHash, orm.MsgSubmitted, hash.String())
if err != nil {
log.Error("UpdateLayer2StatusAndLayer1Hash failed", "msgHash", msg.MsgHash, "err", err)
return err return err
} }
r.processingMessage.Store(msg.MsgHash, msg.MsgHash)
// Wait forever until message sender and roller sender are empty.
utils.TryTimes(-1, func() bool {
return r.messageSender.PendingCount() == 0 && r.rollupSender.PendingCount() == 0
})
return nil return nil
} }
// ProcessPendingBatches submit batch data to layer 1 rollup contract
func (r *Layer2Relayer) ProcessPendingBatches() {
// batches are sorted by batch index in increasing order
batchesInDB, err := r.db.GetPendingBatches(1)
if err != nil {
log.Error("Failed to fetch pending L2 batches", "err", err)
return
}
if len(batchesInDB) == 0 {
return
}
id := batchesInDB[0]
// @todo add support to relay multiple batches
batches, err := r.db.GetBlockBatches(map[string]interface{}{"id": id})
if err != nil || len(batches) == 0 {
log.Error("Failed to GetBlockBatches", "batch_id", id, "err", err)
return
}
batch := batches[0]
traces, err := r.db.GetBlockTraces(map[string]interface{}{"batch_id": id}, "ORDER BY number ASC")
if err != nil || len(traces) == 0 {
log.Error("Failed to GetBlockTraces", "batch_id", id, "err", err)
return
}
layer2Batch := &bridge_abi.IZKRollupLayer2Batch{
BatchIndex: batch.Index,
ParentHash: common.HexToHash(batch.ParentHash),
Blocks: make([]bridge_abi.IZKRollupLayer2BlockHeader, len(traces)),
}
parentHash := common.HexToHash(batch.ParentHash)
for i, trace := range traces {
layer2Batch.Blocks[i] = bridge_abi.IZKRollupLayer2BlockHeader{
BlockHash: trace.Header.Hash(),
ParentHash: parentHash,
BaseFee: trace.Header.BaseFee,
StateRoot: trace.StorageTrace.RootAfter,
BlockHeight: trace.Header.Number.Uint64(),
GasUsed: 0,
Timestamp: trace.Header.Time,
ExtraData: make([]byte, 0),
Txs: make([]bridge_abi.IZKRollupLayer2Transaction, len(trace.Transactions)),
}
for j, tx := range trace.Transactions {
layer2Batch.Blocks[i].Txs[j] = bridge_abi.IZKRollupLayer2Transaction{
Caller: tx.From,
Nonce: tx.Nonce,
Gas: tx.Gas,
GasPrice: tx.GasPrice.ToInt(),
Value: tx.Value.ToInt(),
Data: common.Hex2Bytes(tx.Data),
R: tx.R.ToInt(),
S: tx.S.ToInt(),
V: tx.V.ToInt().Uint64(),
}
if tx.To != nil {
layer2Batch.Blocks[i].Txs[j].Target = *tx.To
}
layer2Batch.Blocks[i].GasUsed += trace.ExecutionResults[j].Gas
}
// for next iteration
parentHash = layer2Batch.Blocks[i].BlockHash
}
data, err := r.l1RollupABI.Pack("commitBatch", layer2Batch)
if err != nil {
log.Error("Failed to pack commitBatch", "id", id, "index", batch.Index, "err", err)
return
}
txID := id + "-commit"
// add suffix `-commit` to avoid duplication with finalize tx in unit tests
hash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send commitBatch tx to layer1 ", "id", id, "index", batch.Index, "err", err)
}
return
}
log.Info("commitBatch in layer1", "batch_id", id, "index", batch.Index, "hash", hash)
// record and sync with db, @todo handle db error
err = r.db.UpdateCommitTxHashAndRollupStatus(r.ctx, id, hash.String(), orm.RollupCommitting)
if err != nil {
log.Error("UpdateCommitTxHashAndRollupStatus failed", "id", id, "index", batch.Index, "err", err)
}
r.processingCommitment.Store(txID, id)
}
// ProcessCommittedBatches submit proof to layer 1 rollup contract
func (r *Layer2Relayer) ProcessCommittedBatches() {
// set skipped batches in a single db operation
if count, err := r.db.UpdateSkippedBatches(); err != nil {
log.Error("UpdateSkippedBatches failed", "err", err)
// continue anyway
} else if count > 0 {
log.Info("Skipping batches", "count", count)
}
// batches are sorted by batch index in increasing order
batches, err := r.db.GetCommittedBatches(1)
if err != nil {
log.Error("Failed to fetch committed L2 batches", "err", err)
return
}
if len(batches) == 0 {
return
}
id := batches[0]
// @todo add support to relay multiple batches
status, err := r.db.GetProvingStatusByID(id)
if err != nil {
log.Error("GetProvingStatusByID failed", "id", id, "err", err)
return
}
switch status {
case orm.ProvingTaskUnassigned, orm.ProvingTaskAssigned:
// The proof for this block is not ready yet.
return
case orm.ProvingTaskProved:
// It's an intermediate state. The roller manager received the proof but has not verified
// the proof yet. We don't roll up the proof until it's verified.
return
case orm.ProvingTaskFailed, orm.ProvingTaskSkipped:
// note: this is covered by UpdateSkippedBatches, but we keep it for completeness's sake
if err = r.db.UpdateRollupStatus(r.ctx, id, orm.RollupFinalizationSkipped); err != nil {
log.Warn("UpdateRollupStatus failed", "id", id, "err", err)
}
case orm.ProvingTaskVerified:
log.Info("Start to roll up zk proof", "id", id)
success := false
defer func() {
// TODO: need to revisit this and have a more fine-grained error handling
if !success {
log.Info("Failed to upload the proof, change rollup status to FinalizationSkipped", "id", id)
if err = r.db.UpdateRollupStatus(r.ctx, id, orm.RollupFinalizationSkipped); err != nil {
log.Warn("UpdateRollupStatus failed", "id", id, "err", err)
}
}
}()
proofBuffer, instanceBuffer, err := r.db.GetVerifiedProofAndInstanceByID(id)
if err != nil {
log.Warn("fetch get proof by id failed", "id", id, "err", err)
return
}
if proofBuffer == nil || instanceBuffer == nil {
log.Warn("proof or instance not ready", "id", id)
return
}
if len(proofBuffer)%32 != 0 {
log.Error("proof buffer has wrong length", "id", id, "length", len(proofBuffer))
return
}
if len(instanceBuffer)%32 != 0 {
log.Warn("instance buffer has wrong length", "id", id, "length", len(instanceBuffer))
return
}
proof := utils.BufferToUint256Le(proofBuffer)
instance := utils.BufferToUint256Le(instanceBuffer)
data, err := r.l1RollupABI.Pack("finalizeBatchWithProof", common.HexToHash(id), proof, instance)
if err != nil {
log.Error("Pack finalizeBatchWithProof failed", "err", err)
return
}
txID := id + "-finalize"
// add suffix `-finalize` to avoid duplication with commit tx in unit tests
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
hash := &txHash
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("finalizeBatchWithProof in layer1 failed", "id", id, "err", err)
}
return
}
log.Info("finalizeBatchWithProof in layer1", "batch_id", id, "hash", hash)
// record and sync with db, @todo handle db error
err = r.db.UpdateFinalizeTxHashAndRollupStatus(r.ctx, id, hash.String(), orm.RollupFinalizing)
if err != nil {
log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "batch_id", id, "err", err)
}
success = true
r.processingFinalization.Store(txID, id)
default:
log.Error("encounter unreachable case in ProcessCommittedBatches",
"block_status", status,
)
}
}
// Start the relayer process // Start the relayer process
func (r *Layer2Relayer) Start() { func (r *Layer2Relayer) Start() {
loop := func(ctx context.Context, f func()) { loop := func(ctx context.Context, f func()) {

171
bridge/l2/relayer_commit.go Normal file
View File

@@ -0,0 +1,171 @@
package l2
import (
"errors"
"fmt"
"math/big"
"time"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/log"
"modernc.org/mathutil"
bridge_abi "scroll-tech/bridge/abi"
"scroll-tech/bridge/sender"
"scroll-tech/database/orm"
)
func (r *Layer2Relayer) checkCommittingBatches() error {
var batchIndex uint64
BEGIN:
batches, err := r.db.GetBlockBatches(
map[string]interface{}{"rollup_status": orm.RollupCommitting},
fmt.Sprintf("AND index > %d", batchIndex),
fmt.Sprintf("ORDER BY index ASC LIMIT %d", 10),
)
if err != nil || len(batches) == 0 {
return err
}
for batch := batches[0]; len(batches) > 0; { //nolint:staticcheck
// If pending txs pool is full, wait a while and retry.
if r.rollupSender.IsFull() {
log.Warn("layer2 rollup sender pending committed tx reaches pending limit")
time.Sleep(time.Millisecond * 500)
continue
}
batch, batches = batches[0], batches[1:]
id := batch.ID
batchIndex = mathutil.MaxUint64(batchIndex, batch.Index)
txStr, err := r.db.GetCommitTxHash(id)
if err != nil {
log.Error("failed to get commit_tx_hash from block_batch", "err", err)
continue
}
_, data, err := r.packCommitBatch(id)
if err != nil {
log.Error("failed to load or send committed tx", "batch id", id, "err", err)
continue
}
txID := id + "-commit"
err = r.rollupSender.LoadOrSendTx(
common.HexToHash(txStr.String),
txID,
&r.cfg.RollupContractAddress,
big.NewInt(0),
data,
)
if err != nil {
log.Error("failed to load or send tx", "batch id", id, "err", err)
} else {
r.processingCommitment.Store(txID, id)
}
}
goto BEGIN
}
func (r *Layer2Relayer) packCommitBatch(id string) (*orm.BlockBatch, []byte, error) {
batches, err := r.db.GetBlockBatches(map[string]interface{}{"id": id})
if err != nil || len(batches) == 0 {
log.Error("Failed to GetBlockBatches", "batch_id", id, "err", err)
return nil, nil, err
}
batch := batches[0]
traces, err := r.db.GetBlockTraces(map[string]interface{}{"batch_id": id}, "ORDER BY number ASC")
if err != nil || len(traces) == 0 {
log.Error("Failed to GetBlockTraces", "batch_id", id, "err", err)
return nil, nil, err
}
layer2Batch := &bridge_abi.IZKRollupLayer2Batch{
BatchIndex: batch.Index,
ParentHash: common.HexToHash(batch.ParentHash),
Blocks: make([]bridge_abi.IZKRollupLayer2BlockHeader, len(traces)),
}
parentHash := common.HexToHash(batch.ParentHash)
for i, trace := range traces {
layer2Batch.Blocks[i] = bridge_abi.IZKRollupLayer2BlockHeader{
BlockHash: trace.Header.Hash(),
ParentHash: parentHash,
BaseFee: trace.Header.BaseFee,
StateRoot: trace.StorageTrace.RootAfter,
BlockHeight: trace.Header.Number.Uint64(),
GasUsed: 0,
Timestamp: trace.Header.Time,
ExtraData: make([]byte, 0),
Txs: make([]bridge_abi.IZKRollupLayer2Transaction, len(trace.Transactions)),
}
for j, tx := range trace.Transactions {
layer2Batch.Blocks[i].Txs[j] = bridge_abi.IZKRollupLayer2Transaction{
Caller: tx.From,
Nonce: tx.Nonce,
Gas: tx.Gas,
GasPrice: tx.GasPrice.ToInt(),
Value: tx.Value.ToInt(),
Data: common.Hex2Bytes(tx.Data),
R: tx.R.ToInt(),
S: tx.S.ToInt(),
V: tx.V.ToInt().Uint64(),
}
if tx.To != nil {
layer2Batch.Blocks[i].Txs[j].Target = *tx.To
}
layer2Batch.Blocks[i].GasUsed += trace.ExecutionResults[j].Gas
}
// for next iteration
parentHash = layer2Batch.Blocks[i].BlockHash
}
data, err := bridge_abi.RollupMetaABI.Pack("commitBatch", layer2Batch)
if err != nil {
log.Error("Failed to pack commitBatch", "id", id, "index", batch.Index, "err", err)
return nil, nil, err
}
return batch, data, nil
}
// ProcessPendingBatches submit batch data to layer 1 rollup contract
func (r *Layer2Relayer) ProcessPendingBatches() {
// batches are sorted by batch index in increasing order
batchesInDB, err := r.db.GetPendingBatches(1)
if err != nil {
log.Error("Failed to fetch pending L2 batches", "err", err)
return
}
if len(batchesInDB) == 0 {
return
}
id := batchesInDB[0]
// @todo add support to relay multiple batches
batch, data, err := r.packCommitBatch(id)
if err != nil {
return
}
txID := id + "-commit"
// add suffix `-commit` to avoid duplication with finalize tx in unit tests
hash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send commitBatch tx to layer1 ", "id", id, "index", batch.Index, "err", err)
}
return
}
log.Info("commitBatch in layer1", "batch_id", id, "index", batch.Index, "hash", hash)
// record and sync with db, @todo handle db error
err = r.db.UpdateCommitTxHashAndRollupStatus(r.ctx, id, hash.String(), orm.RollupCommitting)
if err != nil {
log.Error("UpdateCommitTxHashAndRollupStatus failed", "id", id, "index", batch.Index, "err", err)
}
r.processingCommitment.Store(txID, id)
}

View File

@@ -0,0 +1,195 @@
package l2
import (
"errors"
"fmt"
"math/big"
"time"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/log"
"modernc.org/mathutil"
bridge_abi "scroll-tech/bridge/abi"
"scroll-tech/bridge/sender"
"scroll-tech/bridge/utils"
"scroll-tech/database/orm"
)
func (r *Layer2Relayer) checkFinalizingBatches() error {
var (
batchLimit = 10
batchIndex uint64
)
BEGIN:
batches, err := r.db.GetBlockBatches(
map[string]interface{}{"rollup_status": orm.RollupFinalizing},
fmt.Sprintf("AND index > %d", batchIndex),
fmt.Sprintf("ORDER BY index ASC LIMIT %d", batchLimit),
)
if err != nil || len(batches) == 0 {
return err
}
for batch := batches[0]; len(batches) > 0; { //nolint:staticcheck
// If pending txs pool is full, wait a while and retry.
if r.rollupSender.IsFull() {
log.Warn("layer2 rollup sender pending finalized tx reaches pending limit")
time.Sleep(time.Millisecond * 500)
continue
}
batch, batches = batches[0], batches[1:]
id := batch.ID
batchIndex = mathutil.MaxUint64(batchIndex, batch.Index)
txStr, err := r.db.GetFinalizeTxHash(id)
if err != nil {
log.Error("failed to get finalize_tx_hash from block_batch", "err", err)
continue
}
data, err := r.packFinalizeBatch(id)
if err != nil {
log.Error("failed to pack finalize data", "err", err)
continue
}
txID := id + "-finalize"
err = r.rollupSender.LoadOrSendTx(
common.HexToHash(txStr.String),
txID,
&r.cfg.RollupContractAddress,
big.NewInt(0),
data,
)
if err != nil {
log.Error("failed to load or send finalized tx", "batch id", id, "err", err)
} else {
r.processingFinalization.Store(txID, id)
}
}
goto BEGIN
}
func (r *Layer2Relayer) packFinalizeBatch(id string) ([]byte, error) {
proofBuffer, instanceBuffer, err := r.db.GetVerifiedProofAndInstanceByID(id)
if err != nil {
log.Warn("fetch get proof by id failed", "id", id, "err", err)
return nil, err
}
if proofBuffer == nil || instanceBuffer == nil {
log.Warn("proof or instance not ready", "id", id)
return nil, err
}
if len(proofBuffer)%32 != 0 {
log.Error("proof buffer has wrong length", "id", id, "length", len(proofBuffer))
return nil, err
}
if len(instanceBuffer)%32 != 0 {
log.Warn("instance buffer has wrong length", "id", id, "length", len(instanceBuffer))
return nil, err
}
proof := utils.BufferToUint256Le(proofBuffer)
instance := utils.BufferToUint256Le(instanceBuffer)
data, err := bridge_abi.RollupMetaABI.Pack("finalizeBatchWithProof", common.HexToHash(id), proof, instance)
if err != nil {
log.Error("Pack finalizeBatchWithProof failed", "err", err)
return nil, err
}
return data, nil
}
// ProcessCommittedBatches submit proof to layer 1 rollup contract
func (r *Layer2Relayer) ProcessCommittedBatches() {
// set skipped batches in a single db operation
if count, err := r.db.UpdateSkippedBatches(); err != nil {
log.Error("UpdateSkippedBatches failed", "err", err)
// continue anyway
} else if count > 0 {
log.Info("Skipping batches", "count", count)
}
// batches are sorted by batch index in increasing order
batches, err := r.db.GetCommittedBatches(1)
if err != nil {
log.Error("Failed to fetch committed L2 batches", "err", err)
return
}
if len(batches) == 0 {
return
}
id := batches[0]
// @todo add support to relay multiple batches
status, err := r.db.GetProvingStatusByID(id)
if err != nil {
log.Error("GetProvingStatusByID failed", "id", id, "err", err)
return
}
switch status {
case orm.ProvingTaskUnassigned, orm.ProvingTaskAssigned:
// The proof for this block is not ready yet.
return
case orm.ProvingTaskProved:
// It's an intermediate state. The roller manager received the proof but has not verified
// the proof yet. We don't roll up the proof until it's verified.
return
case orm.ProvingTaskFailed, orm.ProvingTaskSkipped:
// note: this is covered by UpdateSkippedBatches, but we keep it for completeness's sake
if err = r.db.UpdateRollupStatus(r.ctx, id, orm.RollupFinalizationSkipped); err != nil {
log.Warn("UpdateRollupStatus failed", "id", id, "err", err)
}
case orm.ProvingTaskVerified:
log.Info("Start to roll up zk proof", "id", id)
success := false
defer func() {
// TODO: need to revisit this and have a more fine-grained error handling
if !success {
log.Info("Failed to upload the proof, change rollup status to FinalizationSkipped", "id", id)
if err = r.db.UpdateRollupStatus(r.ctx, id, orm.RollupFinalizationSkipped); err != nil {
log.Warn("UpdateRollupStatus failed", "id", id, "err", err)
}
}
}()
// Pack finalize data.
data, err := r.packFinalizeBatch(id)
if err != nil {
return
}
txID := id + "-finalize"
// add suffix `-finalize` to avoid duplication with commit tx in unit tests
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
hash := &txHash
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("finalizeBatchWithProof in layer1 failed", "id", id, "err", err)
}
return
}
log.Info("finalizeBatchWithProof in layer1", "batch_id", id, "hash", hash)
// record and sync with db, @todo handle db error
err = r.db.UpdateFinalizeTxHashAndRollupStatus(r.ctx, id, hash.String(), orm.RollupFinalizing)
if err != nil {
log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "batch_id", id, "err", err)
}
success = true
r.processingFinalization.Store(txID, id)
default:
log.Error("encounter unreachable case in ProcessCommittedBatches",
"block_status", status,
)
}
}

View File

@@ -0,0 +1,183 @@
package l2
import (
"errors"
"fmt"
"math/big"
"runtime"
"time"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/log"
"golang.org/x/sync/errgroup"
"modernc.org/mathutil"
bridge_abi "scroll-tech/bridge/abi"
"scroll-tech/bridge/sender"
"scroll-tech/database/orm"
)
const processMsgLimit = 100
func (r *Layer2Relayer) checkSubmittedMessages() error {
var nonce uint64
BEGIN:
// msgs are sorted by nonce in increasing order
msgs, err := r.db.GetL2Messages(
map[string]interface{}{"status": orm.MsgSubmitted},
fmt.Sprintf("AND nonce > %d", nonce),
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
)
if err != nil || len(msgs) == 0 {
return err
}
var batch *orm.BlockBatch
for msg := msgs[0]; len(msgs) > 0; { //nolint:staticcheck
// If pending pool is full, wait a while and retry.
if r.messageSender.IsFull() {
log.Warn("layer2 message tx sender is full")
time.Sleep(time.Millisecond * 500)
continue
}
msg, msgs = msgs[0], msgs[1:]
nonce = mathutil.MaxUint64(nonce, msg.Nonce)
// Get batch by block number.
if batch == nil || msg.Height < batch.StartBlockNumber || msg.Height > batch.EndBlockNumber {
batches, err := r.db.GetBlockBatches(
map[string]interface{}{},
fmt.Sprintf("AND start_block_number <= %d AND end_block_number >= %d", msg.Height, msg.Height),
)
// If get batch failed, stop and return immediately.
if err != nil || len(batches) == 0 {
return err
}
batch = batches[0]
}
data, err := r.packRelayMessage(msg, batch.Index)
if err != nil {
continue
}
err = r.messageSender.LoadOrSendTx(
common.HexToHash(msg.Layer1Hash),
msg.MsgHash,
&r.cfg.MessengerContractAddress,
big.NewInt(0),
data,
)
if err != nil {
log.Error("failed to load or send l2 submitted tx", "batch id", batch.ID, "msg hash", msg.MsgHash, "err", err)
} else {
r.processingMessage.Store(msg.MsgHash, msg.MsgHash)
}
}
goto BEGIN
}
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
func (r *Layer2Relayer) ProcessSavedEvents() {
batch, err := r.db.GetLatestFinalizedBatch()
if err != nil {
log.Error("GetLatestFinalizedBatch failed", "err", err)
return
}
// msgs are sorted by nonce in increasing order
msgs, err := r.db.GetL2Messages(
map[string]interface{}{"status": orm.MsgPending},
fmt.Sprintf("AND height<=%d", batch.EndBlockNumber),
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
)
if err != nil {
log.Error("Failed to fetch unprocessed L2 messages", "err", err)
return
}
// process messages in batches
batchSize := mathutil.Min((runtime.GOMAXPROCS(0)+1)/2, r.messageSender.NumberOfAccounts())
for size := 0; len(msgs) > 0; msgs = msgs[size:] {
if size = len(msgs); size > batchSize {
size = batchSize
}
var g errgroup.Group
for _, msg := range msgs[:size] {
msg := msg
g.Go(func() error {
return r.processSavedEvent(msg, batch.Index)
})
}
if err := g.Wait(); err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("failed to process l2 saved event", "err", err)
}
return
}
}
}
func (r *Layer2Relayer) processSavedEvent(msg *orm.L2Message, index uint64) error {
data, err := r.packRelayMessage(msg, index)
if err != nil {
return err
}
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
if err != nil && err.Error() == "execution reverted: Message expired" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgExpired)
}
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgConfirmed)
}
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send relayMessageWithProof tx to layer1 ", "msg.height", msg.Height, "msg.MsgHash", msg.MsgHash, "err", err)
}
return err
}
log.Info("relayMessageWithProof to layer1", "msgHash", msg.MsgHash, "txhash", hash.String())
// save status in db
// @todo handle db error
err = r.db.UpdateLayer2StatusAndLayer1Hash(r.ctx, msg.MsgHash, orm.MsgSubmitted, hash.String())
if err != nil {
log.Error("UpdateLayer2StatusAndLayer1Hash failed", "msgHash", msg.MsgHash, "err", err)
return err
}
r.processingMessage.Store(msg.MsgHash, msg.MsgHash)
return nil
}
func (r *Layer2Relayer) packRelayMessage(msg *orm.L2Message, index uint64) ([]byte, error) {
// @todo fetch merkle proof from l2geth
log.Info("Processing L2 Message", "msg.nonce", msg.Nonce, "msg.height", msg.Height)
proof := bridge_abi.IL1ScrollMessengerL2MessageProof{
BlockHeight: big.NewInt(int64(msg.Height)),
BatchIndex: big.NewInt(0).SetUint64(index),
MerkleProof: make([]byte, 0),
}
from := common.HexToAddress(msg.Sender)
target := common.HexToAddress(msg.Target)
value, ok := big.NewInt(0).SetString(msg.Value, 10)
if !ok {
// @todo maybe panic?
log.Error("Failed to parse message value", "msg.nonce", msg.Nonce, "msg.height", msg.Height)
// TODO: need to skip this message by changing its status to MsgError
}
fee, _ := big.NewInt(0).SetString(msg.Fee, 10)
deadline := big.NewInt(int64(msg.Deadline))
msgNonce := big.NewInt(int64(msg.Nonce))
calldata := common.Hex2Bytes(msg.Calldata)
data, err := bridge_abi.L1MessengerMetaABI.Pack("relayMessageWithProof", from, target, value, fee, deadline, msgNonce, calldata, proof)
if err != nil {
log.Error("Failed to pack relayMessageWithProof", "msg.nonce", msg.Nonce, "err", err)
// TODO: need to skip this message by changing its status to MsgError
return nil, err
}
return data, nil
}

View File

@@ -88,6 +88,7 @@ type Sender struct {
blockNumber uint64 // Current block number on chain. blockNumber uint64 // Current block number on chain.
baseFeePerGas uint64 // Current base fee per gas on chain baseFeePerGas uint64 // Current base fee per gas on chain
pendingNum int64 // current pending tx count.
pendingTxs sync.Map // Mapping from nonce to pending transaction pendingTxs sync.Map // Mapping from nonce to pending transaction
confirmCh chan *Confirmation confirmCh chan *Confirmation
@@ -149,6 +150,16 @@ func NewSender(ctx context.Context, config *config.SenderConfig, privs []*ecdsa.
return sender, nil return sender, nil
} }
// PendingCount return the current pending txs num.
func (s *Sender) PendingCount() int64 {
return atomic.LoadInt64(&s.pendingNum)
}
// PendingLimit return the maximum pendingTxs can handle.
func (s *Sender) PendingLimit() int64 {
return s.config.PendingLimit
}
// Stop stop the sender module. // Stop stop the sender module.
func (s *Sender) Stop() { func (s *Sender) Stop() {
close(s.stopCh) close(s.stopCh)
@@ -199,16 +210,27 @@ func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, val
}, nil }, nil
} }
// IsFull If pendingTxs pool is full return true.
func (s *Sender) IsFull() bool {
return atomic.LoadInt64(&s.pendingNum) == s.config.PendingLimit
}
// SendTransaction send a signed L2tL1 transaction. // SendTransaction send a signed L2tL1 transaction.
func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.Int, data []byte) (hash common.Hash, err error) { func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.Int, data []byte) (hash common.Hash, err error) {
if s.IsFull() {
return common.Hash{}, fmt.Errorf("pending txs is full, pending size: %d", s.config.PendingLimit)
}
// We occupy the ID, in case some other threads call with the same ID in the same time // We occupy the ID, in case some other threads call with the same ID in the same time
if _, loaded := s.pendingTxs.LoadOrStore(ID, nil); loaded { if _, loaded := s.pendingTxs.LoadOrStore(ID, nil); loaded {
return common.Hash{}, fmt.Errorf("has the repeat tx ID, ID: %s", ID) return common.Hash{}, fmt.Errorf("has the repeat tx ID, ID: %s", ID)
} }
atomic.AddInt64(&s.pendingNum, 1)
// get // get
auth := s.auths.getAccount() auth := s.auths.getAccount()
if auth == nil { if auth == nil {
s.pendingTxs.Delete(ID) // release the ID on failure s.pendingTxs.Delete(ID) // release the ID on failure
atomic.AddInt64(&s.pendingNum, -1)
return common.Hash{}, ErrNoAvailableAccount return common.Hash{}, ErrNoAvailableAccount
} }
@@ -216,6 +238,7 @@ func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.I
defer func() { defer func() {
if err != nil { if err != nil {
s.pendingTxs.Delete(ID) // release the ID on failure s.pendingTxs.Delete(ID) // release the ID on failure
atomic.AddInt64(&s.pendingNum, -1)
} }
}() }()
@@ -243,6 +266,61 @@ func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.I
return return
} }
func (s *Sender) getTxAndAddr(txHash common.Hash) (*types.Transaction, uint64, common.Address, error) {
tx, isPending, err := s.client.TransactionByHash(s.ctx, txHash)
if err != nil {
return nil, 0, common.Address{}, err
}
sender, err := types.Sender(types.LatestSignerForChainID(s.chainID), tx)
if err != nil {
return nil, 0, common.Address{}, err
}
if isPending {
return tx, s.blockNumber, sender, nil
}
receipt, err := s.client.TransactionReceipt(s.ctx, txHash)
if err != nil {
return nil, 0, common.Address{}, err
}
return tx, receipt.BlockNumber.Uint64(), sender, nil
}
// LoadOrSendTx If the tx already exist in chain load it or resend it.
func (s *Sender) LoadOrSendTx(destTxHash common.Hash, ID string, target *common.Address, value *big.Int, data []byte) error {
tx, blockNumber, from, err := s.getTxAndAddr(destTxHash)
// If this tx already exist load it to the pending.
if err == nil && tx != nil {
auth := s.auths.accounts[from]
var feeData *FeeData
feeData, err = s.getFeeData(auth, target, value, data)
if err != nil {
return err
}
// We occupy the ID, in case some other threads call with the same ID in the same time
if _, loaded := s.pendingTxs.LoadOrStore(ID, nil); loaded {
return fmt.Errorf("has the repeat tx ID, ID: %s", ID)
}
atomic.AddInt64(&s.pendingNum, 1)
s.pendingTxs.Store(ID, &PendingTransaction{
tx: tx,
id: ID,
signer: auth,
// Record the transaction's block blockNumber.
submitAt: blockNumber,
feeData: feeData,
})
return nil
}
// Tx is dropped from chain node, resend it.
_, err = s.SendTransaction(ID, target, value, data)
return err
}
func (s *Sender) createAndSendTx(auth *bind.TransactOpts, feeData *FeeData, target *common.Address, value *big.Int, data []byte, overrideNonce *uint64) (tx *types.Transaction, err error) { func (s *Sender) createAndSendTx(auth *bind.TransactOpts, feeData *FeeData, target *common.Address, value *big.Int, data []byte, overrideNonce *uint64) (tx *types.Transaction, err error) {
var ( var (
nonce = auth.Nonce.Uint64() nonce = auth.Nonce.Uint64()
@@ -388,6 +466,7 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
if (err == nil) && (receipt != nil) { if (err == nil) && (receipt != nil) {
if receipt.BlockNumber.Uint64() <= confirmed { if receipt.BlockNumber.Uint64() <= confirmed {
s.pendingTxs.Delete(key) s.pendingTxs.Delete(key)
atomic.AddInt64(&s.pendingNum, -1)
// send confirm message // send confirm message
s.confirmCh <- &Confirmation{ s.confirmCh <- &Confirmation{
ID: pending.id, ID: pending.id,
@@ -419,6 +498,7 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
if strings.Contains(err.Error(), "nonce") { if strings.Contains(err.Error(), "nonce") {
// This key can be deleted // This key can be deleted
s.pendingTxs.Delete(key) s.pendingTxs.Delete(key)
atomic.AddInt64(&s.pendingNum, -1)
// Try get receipt by the latest replaced tx hash // Try get receipt by the latest replaced tx hash
receipt, err := s.client.TransactionReceipt(s.ctx, pending.tx.Hash()) receipt, err := s.client.TransactionReceipt(s.ctx, pending.tx.Hash())
if (err == nil) && (receipt != nil) { if (err == nil) && (receipt != nil) {

View File

@@ -49,6 +49,7 @@ func TestSender(t *testing.T) {
// Setup // Setup
setupEnv(t) setupEnv(t)
t.Run("testLoadOrSendTx", testLoadOrSendTx)
t.Run("test 1 account sender", func(t *testing.T) { testBatchSender(t, 1) }) t.Run("test 1 account sender", func(t *testing.T) { testBatchSender(t, 1) })
t.Run("test 3 account sender", func(t *testing.T) { testBatchSender(t, 3) }) t.Run("test 3 account sender", func(t *testing.T) { testBatchSender(t, 3) })
t.Run("test 8 account sender", func(t *testing.T) { testBatchSender(t, 8) }) t.Run("test 8 account sender", func(t *testing.T) { testBatchSender(t, 8) })
@@ -59,6 +60,38 @@ func TestSender(t *testing.T) {
}) })
} }
func testLoadOrSendTx(t *testing.T) {
senderCfg := cfg.L1Config.RelayerConfig.SenderConfig
senderCfg.Confirmations = 0
newSender, err := sender.NewSender(context.Background(), senderCfg, privateKeys)
if err != nil {
t.Fatal(err)
}
newSender2, err := sender.NewSender(context.Background(), senderCfg, privateKeys)
if err != nil {
t.Fatal(err)
}
toAddr := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
id := "aaa"
hash, err := newSender.SendTransaction(id, &toAddr, big.NewInt(0), nil)
assert.NoError(t, err)
err = newSender2.LoadOrSendTx(hash, id, &toAddr, big.NewInt(0), nil)
assert.NoError(t, err)
select {
case cfm := <-newSender2.ConfirmChan():
assert.Equal(t, true, cfm.IsSuccessful)
assert.Equal(t, hash, cfm.TxHash)
assert.Equal(t, id, cfm.ID)
case <-time.After(time.Second * 10):
t.Error("testLoadOrSendTx test failed because of timeout")
}
}
func testBatchSender(t *testing.T, batchSize int) { func testBatchSender(t *testing.T, batchSize int) {
for len(privateKeys) < batchSize { for len(privateKeys) < batchSize {
priv, err := crypto.GenerateKey() priv, err := crypto.GenerateKey()

View File

@@ -11,6 +11,6 @@ if [ ! -n "${IPC_PATH}" ];then
IPC_PATH="/tmp/l1geth_path.ipc" IPC_PATH="/tmp/l1geth_path.ipc"
fi fi
exec geth --mine --datadir "." --unlock 0 --password "./password" --allow-insecure-unlock --nodiscover \ exec geth --mine --datadir "." --unlock 0 --miner.etherbase "0x1c5a77d9fa7ef466951b2f01f724bca3a5820b63" --password "./password" --allow-insecure-unlock --nodiscover \
--http --http.addr "0.0.0.0" --http.port 8545 --ws --ws.addr "0.0.0.0" --ws.port 8546 --ipcpath ${IPC_PATH} --http --http.addr "0.0.0.0" --http.port 8545 --ws --ws.addr "0.0.0.0" --ws.port 8546 --ipcpath ${IPC_PATH}

View File

@@ -4,7 +4,7 @@ import "time"
// TryTimes try run several times until the function return true. // TryTimes try run several times until the function return true.
func TryTimes(times int, run func() bool) { func TryTimes(times int, run func() bool) {
for i := 0; i < times; i++ { for i := 0; times == -1 || i < times; i++ {
if run() { if run() {
return return
} }

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug" "runtime/debug"
) )
var tag = "prealpha-v13.1" var tag = "prealpha-v13.2"
var commit = func() string { var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok { if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -236,6 +236,29 @@ func (o *blockBatchOrm) BatchRecordExist(id string) (bool, error) {
return true, nil return true, nil
} }
func (o *blockBatchOrm) GetBatchesByRollupStatus(status RollupStatus, limit uint64) ([]string, error) {
rows, err := o.db.Queryx(`SELECT id FROM block_batch WHERE rollup_status = $1 ORDER BY index ASC LIMIT $2`, status, limit)
if err != nil {
return nil, err
}
var ids []string
for rows.Next() {
var id string
if err = rows.Scan(&id); err != nil {
break
}
ids = append(ids, id)
}
if len(ids) == 0 || errors.Is(err, sql.ErrNoRows) {
// log.Warn("no pending batches in db", "err", err)
} else if err != nil {
return nil, err
}
return ids, rows.Close()
}
func (o *blockBatchOrm) GetPendingBatches(limit uint64) ([]string, error) { func (o *blockBatchOrm) GetPendingBatches(limit uint64) ([]string, error) {
rows, err := o.db.Queryx(`SELECT id FROM block_batch WHERE rollup_status = $1 ORDER BY index ASC LIMIT $2`, RollupPending, limit) rows, err := o.db.Queryx(`SELECT id FROM block_batch WHERE rollup_status = $1 ORDER BY index ASC LIMIT $2`, RollupPending, limit)
if err != nil { if err != nil {

View File

@@ -46,6 +46,7 @@ type L1Message struct {
Target string `json:"target" db:"target"` Target string `json:"target" db:"target"`
Calldata string `json:"calldata" db:"calldata"` Calldata string `json:"calldata" db:"calldata"`
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"` Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
Status MsgStatus `json:"status" db:"status"` Status MsgStatus `json:"status" db:"status"`
} }
@@ -62,6 +63,7 @@ type L2Message struct {
Target string `json:"target" db:"target"` Target string `json:"target" db:"target"`
Calldata string `json:"calldata" db:"calldata"` Calldata string `json:"calldata" db:"calldata"`
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"` Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
Status MsgStatus `json:"status" db:"status"` Status MsgStatus `json:"status" db:"status"`
} }
@@ -145,6 +147,7 @@ type BlockBatchOrm interface {
ResetProvingStatusFor(before ProvingStatus) error ResetProvingStatusFor(before ProvingStatus) error
NewBatchInDBTx(dbTx *sqlx.Tx, startBlock *BlockInfo, endBlock *BlockInfo, parentHash string, totalTxNum uint64, gasUsed uint64) (string, error) NewBatchInDBTx(dbTx *sqlx.Tx, startBlock *BlockInfo, endBlock *BlockInfo, parentHash string, totalTxNum uint64, gasUsed uint64) (string, error)
BatchRecordExist(id string) (bool, error) BatchRecordExist(id string) (bool, error)
GetBatchesByRollupStatus(status RollupStatus, limit uint64) ([]string, error)
GetPendingBatches(limit uint64) ([]string, error) GetPendingBatches(limit uint64) ([]string, error)
GetCommittedBatches(limit uint64) ([]string, error) GetCommittedBatches(limit uint64) ([]string, error)
GetRollupStatus(id string) (RollupStatus, error) GetRollupStatus(id string) (RollupStatus, error)
@@ -166,6 +169,7 @@ type L1MessageOrm interface {
GetL1MessageByMsgHash(msgHash string) (*L1Message, error) GetL1MessageByMsgHash(msgHash string) (*L1Message, error)
GetL1MessagesByStatus(status MsgStatus, limit uint64) ([]*L1Message, error) GetL1MessagesByStatus(status MsgStatus, limit uint64) ([]*L1Message, error)
GetL1ProcessedNonce() (int64, error) GetL1ProcessedNonce() (int64, error)
GetL1Messages(fields map[string]interface{}, args ...string) ([]*L1Message, error)
SaveL1Messages(ctx context.Context, messages []*L1Message) error SaveL1Messages(ctx context.Context, messages []*L1Message) error
UpdateLayer2Hash(ctx context.Context, msgHash string, layer2Hash string) error UpdateLayer2Hash(ctx context.Context, msgHash string, layer2Hash string) error
UpdateLayer1Status(ctx context.Context, msgHash string, status MsgStatus) error UpdateLayer1Status(ctx context.Context, msgHash string, status MsgStatus) error

View File

@@ -4,6 +4,8 @@ import (
"context" "context"
"database/sql" "database/sql"
"errors" "errors"
"fmt"
"strings"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/log"
@@ -46,7 +48,7 @@ func (m *l1MessageOrm) GetL1MessageByNonce(nonce uint64) (*L1Message, error) {
// GetL1MessagesByStatus fetch list of unprocessed messages given msg status // GetL1MessagesByStatus fetch list of unprocessed messages given msg status
func (m *l1MessageOrm) GetL1MessagesByStatus(status MsgStatus, limit uint64) ([]*L1Message, error) { func (m *l1MessageOrm) GetL1MessagesByStatus(status MsgStatus, limit uint64) ([]*L1Message, error) {
rows, err := m.db.Queryx(`SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, status FROM l1_message WHERE status = $1 ORDER BY nonce ASC LIMIT $2;`, status, limit) rows, err := m.db.Queryx(`SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, layer2_hash, status FROM l1_message WHERE status = $1 ORDER BY nonce ASC LIMIT $2;`, status, limit)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -87,6 +89,37 @@ func (m *l1MessageOrm) GetL1ProcessedNonce() (int64, error) {
return -1, nil return -1, nil
} }
// GetL1Messages get l1 messages by k-v map and args.
func (m *l1MessageOrm) GetL1Messages(fields map[string]interface{}, args ...string) ([]*L1Message, error) {
query := "SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, layer2_hash, status FROM l2_message WHERE 1 = 1 "
for key := range fields {
query += fmt.Sprintf(" AND %s=:%s ", key, key)
}
query = strings.Join(append([]string{query}, args...), " ")
db := m.db
rows, err := db.NamedQuery(db.Rebind(query), fields)
if err != nil {
return nil, err
}
var msgs []*L1Message
for rows.Next() {
msg := &L1Message{}
if err = rows.StructScan(&msg); err != nil {
break
}
msgs = append(msgs, msg)
}
if len(msgs) == 0 || errors.Is(err, sql.ErrNoRows) {
// log.Warn("no unprocessed layer2 messages in db", "err", err)
} else if err != nil {
return nil, err
}
return msgs, rows.Close()
}
// SaveL1Messages batch save a list of layer1 messages // SaveL1Messages batch save a list of layer1 messages
func (m *l1MessageOrm) SaveL1Messages(ctx context.Context, messages []*L1Message) error { func (m *l1MessageOrm) SaveL1Messages(ctx context.Context, messages []*L1Message) error {
if len(messages) == 0 { if len(messages) == 0 {
@@ -107,9 +140,10 @@ func (m *l1MessageOrm) SaveL1Messages(ctx context.Context, messages []*L1Message
"deadline": msg.Deadline, "deadline": msg.Deadline,
"calldata": msg.Calldata, "calldata": msg.Calldata,
"layer1_hash": msg.Layer1Hash, "layer1_hash": msg.Layer1Hash,
"layer2_hash": msg.Layer2Hash,
} }
} }
_, err := m.db.NamedExec(`INSERT INTO public.l1_message (nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash) VALUES (:nonce, :msg_hash, :height, :sender, :target, :value, :fee, :gas_limit, :deadline, :calldata, :layer1_hash);`, messageMaps) _, err := m.db.NamedExec(`INSERT INTO public.l1_message (nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, layer2_hash) VALUES (:nonce, :msg_hash, :height, :sender, :target, :value, :fee, :gas_limit, :deadline, :calldata, :layer1_hash, :layer2_hash);`, messageMaps)
if err != nil { if err != nil {
nonces := make([]uint64, 0, len(messages)) nonces := make([]uint64, 0, len(messages))
heights := make([]uint64, 0, len(messages)) heights := make([]uint64, 0, len(messages))

View File

@@ -88,9 +88,9 @@ func (m *layer2MessageOrm) GetL2ProcessedNonce() (int64, error) {
return -1, nil return -1, nil
} }
// GetL2MessagesByStatus fetch list of messages given msg status // GetL2Messages fetch list of messages given msg status
func (m *layer2MessageOrm) GetL2Messages(fields map[string]interface{}, args ...string) ([]*L2Message, error) { func (m *layer2MessageOrm) GetL2Messages(fields map[string]interface{}, args ...string) ([]*L2Message, error) {
query := "SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer2_hash FROM l2_message WHERE 1 = 1 " query := "SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer2_hash, layer1_hash FROM l2_message WHERE 1 = 1 "
for key := range fields { for key := range fields {
query += fmt.Sprintf("AND %s=:%s ", key, key) query += fmt.Sprintf("AND %s=:%s ", key, key)
} }
@@ -138,11 +138,12 @@ func (m *layer2MessageOrm) SaveL2Messages(ctx context.Context, messages []*L2Mes
"gas_limit": msg.GasLimit, "gas_limit": msg.GasLimit,
"deadline": msg.Deadline, "deadline": msg.Deadline,
"calldata": msg.Calldata, "calldata": msg.Calldata,
"layer1_hash": msg.Layer1Hash,
"layer2_hash": msg.Layer2Hash, "layer2_hash": msg.Layer2Hash,
} }
} }
_, err := m.db.NamedExec(`INSERT INTO public.l2_message (nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer2_hash) VALUES (:nonce, :msg_hash, :height, :sender, :target, :value, :fee, :gas_limit, :deadline, :calldata, :layer2_hash);`, messageMaps) _, err := m.db.NamedExec(`INSERT INTO public.l2_message (nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, layer2_hash) VALUES (:nonce, :msg_hash, :height, :sender, :target, :value, :fee, :gas_limit, :deadline, :calldata, :layer1_hash, :layer2_hash);`, messageMaps)
if err != nil { if err != nil {
nonces := make([]uint64, 0, len(messages)) nonces := make([]uint64, 0, len(messages))
heights := make([]uint64, 0, len(messages)) heights := make([]uint64, 0, len(messages))