Let sender instance support more type of TxMeta.

This commit is contained in:
maskpp
2023-02-15 20:08:34 +08:00
parent c674b7c704
commit e9f9e76c10
3 changed files with 66 additions and 31 deletions

View File

@@ -145,7 +145,7 @@ func (r *Layer2Relayer) processSavedEvent(msg *orm.L2Message, index uint64) erro
return err
}
hash, err := r.messageSender.SendTransaction(&confirmMsg{TxType: "MessageRelay", ID: msg.MsgHash}, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
hash, err := r.messageSender.SendTransaction(&sender.TxMeta{TxType: "MessageRelay", ID: msg.MsgHash}, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
if err != nil && err.Error() == "execution reverted: Message expired" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgExpired)
}
@@ -245,7 +245,7 @@ func (r *Layer2Relayer) ProcessPendingBatches() {
}
// add suffix `-commit` to avoid duplication with finalize tx in unit tests
hash, err := r.rollupSender.SendTransaction(&confirmMsg{TxType: "BatchCommitment", ID: id}, &r.cfg.RollupContractAddress, big.NewInt(0), data)
hash, err := r.rollupSender.SendTransaction(&sender.TxMeta{TxType: "BatchCommitment", ID: id}, &r.cfg.RollupContractAddress, big.NewInt(0), data)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send commitBatch tx to layer1 ", "id", id, "index", batch.Index, "err", err)
@@ -347,7 +347,7 @@ func (r *Layer2Relayer) ProcessCommittedBatches() {
}
// add suffix `-finalize` to avoid duplication with commit tx in unit tests
txHash, err := r.rollupSender.SendTransaction(&confirmMsg{TxType: "BatchFinalization", ID: id}, &r.cfg.RollupContractAddress, big.NewInt(0), data)
txHash, err := r.rollupSender.SendTransaction(&sender.TxMeta{TxType: "BatchFinalization", ID: id}, &r.cfg.RollupContractAddress, big.NewInt(0), data)
hash := &txHash
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
@@ -417,19 +417,13 @@ func (r *Layer2Relayer) Stop() {
close(r.stopCh)
}
type confirmMsg struct {
TxType string
ID string
}
func (r *Layer2Relayer) handleConfirmation(confirmation *sender.Confirmation) {
if !confirmation.IsSuccessful {
log.Warn("transaction confirmed but failed in layer1", "confirmation", confirmation)
return
}
txMeta, _ := confirmation.TxMeta.(*confirmMsg)
transactionType := txMeta.TxType
switch transactionType {
txMeta, _ := confirmation.TxMeta.(*sender.TxMeta)
switch txMeta.TxType {
case "MessageRelay":
err := r.db.UpdateLayer2StatusAndLayer1Hash(r.ctx, txMeta.ID, orm.MsgConfirmed, confirmation.TxHash.String())
if err != nil {
@@ -447,5 +441,5 @@ func (r *Layer2Relayer) handleConfirmation(confirmation *sender.Confirmation) {
log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "batch_id", txMeta.ID, "err", err)
}
}
log.Info("transaction confirmed in layer1", "type", transactionType, "confirmation", confirmation)
log.Info("transaction confirmed in layer1", "type", txMeta.TxType, "confirmation", confirmation)
}

View File

@@ -3,6 +3,7 @@ package sender
import (
"context"
"crypto/ecdsa"
"encoding/json"
"errors"
"fmt"
"math/big"
@@ -41,6 +42,12 @@ var (
ErrNoAvailableAccount = errors.New("sender has no available account to send transaction")
)
// TxMeta is prepared if one sender instance be used in two more places.
type TxMeta struct {
TxType string
ID string
}
// Confirmation struct used to indicate transaction confirmation details
type Confirmation struct {
TxMeta interface{}
@@ -189,23 +196,47 @@ func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, val
}, nil
}
func marshalTxMeta(msg interface{}) string {
switch msg.(type) {
case nil:
return ""
case string:
return msg.(string)
case *TxMeta:
meta := msg.(*TxMeta)
if meta.ID == "" || meta.TxType == "" {
return ""
}
data, _ := json.Marshal(msg)
return string(data)
default:
data, _ := json.Marshal(msg)
return string(data)
}
}
// SendTransaction send a signed L2tL1 transaction.
func (s *Sender) SendTransaction(txMeta interface{}, target *common.Address, value *big.Int, data []byte) (hash common.Hash, err error) {
// Use marshal message as id, in order avoid repeat txMetas.
var metaId string
if metaId = marshalTxMeta(txMeta); metaId == "" {
return common.Hash{}, fmt.Errorf("empty txMeta is not allowed")
}
// We occupy the ID, in case some other threads call with the same ID in the same time
if _, loaded := s.pendingTxs.LoadOrStore(txMeta, nil); loaded {
if _, loaded := s.pendingTxs.LoadOrStore(metaId, nil); loaded {
return common.Hash{}, fmt.Errorf("attempted to send duplicate transaction, txMeta: %v", txMeta)
}
// get
auth := s.auths.getAccount()
if auth == nil {
s.pendingTxs.Delete(txMeta) // release the ID on failure
s.pendingTxs.Delete(metaId) // release the ID on failure
return common.Hash{}, ErrNoAvailableAccount
}
defer s.auths.releaseAccount(auth)
defer func() {
if err != nil {
s.pendingTxs.Delete(txMeta) // release the ID on failure
s.pendingTxs.Delete(metaId) // release the ID on failure
}
}()
@@ -226,7 +257,7 @@ func (s *Sender) SendTransaction(txMeta interface{}, target *common.Address, val
submitAt: atomic.LoadUint64(&s.blockNumber),
feeData: feeData,
}
s.pendingTxs.Store(txMeta, pending)
s.pendingTxs.Store(metaId, pending)
return tx.Hash(), nil
}

View File

@@ -60,12 +60,18 @@ func TestSender(t *testing.T) {
})
}
type confirmMsg struct {
TxType string
ID string
}
func testSenderMsg(t *testing.T) {
var tests = []struct {
txMeta interface{}
isFail bool
}{
{1, false},
{nil, true},
{"", true},
{&sender.TxMeta{}, true},
{&sender.TxMeta{TxType: "", ID: "1"}, true},
{&sender.TxMeta{TxType: "1", ID: ""}, true},
}
newSender, err := sender.NewSender(context.Background(), cfg.L1Config.RelayerConfig.SenderConfig, privateKeys)
if err != nil {
t.Fatal(err)
@@ -73,12 +79,17 @@ func testSenderMsg(t *testing.T) {
toAddr := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
msg0 := &confirmMsg{ID: "1", TxType: "1"}
_, err = newSender.SendTransaction(msg0, &toAddr, nil, nil)
assert.NoError(t, err)
_, err = newSender.SendTransaction(msg0, &toAddr, nil, nil)
assert.Error(t, err)
for i, test := range tests {
_, err = newSender.SendTransaction(test.txMeta, &toAddr, nil, nil)
if test.isFail && err == nil {
t.Errorf("Test %d should fail", i)
continue
}
if !test.isFail && err != nil {
t.Errorf("Test %d should pass but got err: %v", i, err)
continue
}
}
}
func testBatchSender(t *testing.T, batchSize int) {
@@ -100,9 +111,8 @@ func testBatchSender(t *testing.T, batchSize int) {
// send transactions
var (
eg errgroup.Group
idCache = cmap.New()
confirmCh = newSender.ConfirmChan()
eg errgroup.Group
idCache = cmap.New()
)
for idx := 0; idx < newSender.NumberOfAccounts(); idx++ {
index := idx
@@ -132,7 +142,7 @@ func testBatchSender(t *testing.T, batchSize int) {
after := time.After(80 * time.Second)
for {
select {
case cmsg := <-confirmCh:
case cmsg := <-newSender.ConfirmChan():
assert.Equal(t, true, cmsg.IsSuccessful)
_, exist := idCache.Pop(cmsg.TxMeta.(string))
assert.Equal(t, true, exist)