Compare commits

...

24 Commits

Author SHA1 Message Date
maskpp
c4decb511a Merge branch 'alpha' into maskpp/sender_upgrade_2 2023-02-21 15:22:15 +08:00
maskpp
fab4a4e341 Merge branch 'alpha' into maskpp/sender_upgrade_2 2023-02-20 15:30:41 +08:00
maskpp
8dc3690311 Message hash don't allow to null. 2023-02-16 10:03:09 +08:00
maskpp
692e9ef2f3 trigger ci 2023-02-15 21:04:56 +08:00
maskpp
c09f362962 Let sender instance support more type of TxMeta. 2023-02-15 20:14:15 +08:00
maskpp
cc91bb7676 Let sender instance support more type of TxMeta. 2023-02-15 20:12:00 +08:00
maskpp
e9f9e76c10 Let sender instance support more type of TxMeta. 2023-02-15 20:08:34 +08:00
maskpp
c674b7c704 Merge branch 'staging' into maskpp/sender_upgrade_2 2023-02-14 14:26:15 +08:00
colinlyguo
98ffa5f255 fix comments 2023-02-13 21:02:01 +08:00
colin
28d23b6d77 Update bridge/sender/sender.go
Co-authored-by: Péter Garamvölgyi <peter@scroll.io>
2023-02-13 20:52:52 +08:00
colin
f6e52e3642 Update bridge/l2/relayer.go
Co-authored-by: Péter Garamvölgyi <peter@scroll.io>
2023-02-13 20:52:23 +08:00
colinlyguo
edb3d9ec69 trigger ci 2023-02-13 17:34:31 +08:00
colin
a50ecc8bf7 Merge branch 'staging' into maskpp/sender_upgrade_2 2023-02-13 17:26:05 +08:00
colin
754c1d85bc Merge branch 'staging' into maskpp/sender_upgrade_2 2023-02-09 21:41:34 +08:00
colinlyguo
be93e606a2 fix CI 2023-02-09 19:34:21 +08:00
colinlyguo
31bd4ab383 fix CI 2023-02-09 19:23:48 +08:00
colin
4068746b15 Update bridge/sender/sender.go
Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
2023-02-09 19:12:36 +08:00
colin
f83e0083c2 Merge branch 'staging' into maskpp/sender_upgrade_2 2023-02-09 19:04:35 +08:00
colin
3cd39802fc Merge branch 'staging' into maskpp/sender_upgrade_2 2023-02-07 22:38:19 +08:00
maskpp
53d56def8c Merge branch 'staging' into maskpp/sender_upgrade_2 2023-02-06 14:22:30 +08:00
maskpp
458e90e44f Merge branch 'staging' into maskpp/sender_upgrade_2 2023-02-06 10:56:13 +08:00
maskpp
876966d9ec trigger ci 2023-02-03 14:54:26 +08:00
maskpp
a2ca73d4e4 Upgrade the usage of sender. 2023-02-03 11:49:26 +08:00
maskpp
95d650240f Upgrade the usage of sender. 2023-02-03 11:47:21 +08:00
7 changed files with 109 additions and 109 deletions

View File

@@ -27,14 +27,12 @@ import (
// Actions are triggered by new head from layer 1 geth node.
// @todo It's better to be triggered by watcher.
type Layer1Relayer struct {
ctx context.Context
sender *sender.Sender
ctx context.Context
db orm.L1MessageOrm
cfg *config.RelayerConfig
// channel used to communicate with transaction sender
confirmationCh <-chan *sender.Confirmation
sender *sender.Sender
l2MessengerABI *abi.ABI
stopCh chan struct{}
@@ -62,7 +60,6 @@ func NewLayer1Relayer(ctx context.Context, db orm.L1MessageOrm, cfg *config.Rela
l2MessengerABI: l2MessengerABI,
cfg: cfg,
stopCh: make(chan struct{}),
confirmationCh: sender.ConfirmChan(),
}, nil
}
@@ -142,12 +139,12 @@ func (r *Layer1Relayer) Start() {
// number, err := r.client.BlockNumber(r.ctx)
// log.Info("receive header", "height", number)
r.ProcessSavedEvents()
case cfm := <-r.confirmationCh:
case cfm := <-r.sender.ConfirmChan():
if !cfm.IsSuccessful {
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
} else {
// @todo handle db error
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, orm.MsgConfirmed, cfm.TxHash.String())
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.TxMeta.(string), orm.MsgConfirmed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err)
}

View File

@@ -6,7 +6,6 @@ import (
"fmt"
"math/big"
"runtime"
"sync"
"time"
// not sure if this will make problems when relay with l1geth
@@ -39,25 +38,11 @@ type Layer2Relayer struct {
cfg *config.RelayerConfig
messageSender *sender.Sender
messageCh <-chan *sender.Confirmation
l1MessengerABI *abi.ABI
rollupSender *sender.Sender
rollupCh <-chan *sender.Confirmation
l1RollupABI *abi.ABI
// A list of processing message.
// key(string): confirmation ID, value(string): layer2 hash.
processingMessage sync.Map
// A list of processing batch commitment.
// key(string): confirmation ID, value(string): batch id.
processingCommitment sync.Map
// A list of processing batch finalization.
// key(string): confirmation ID, value(string): batch id.
processingFinalization sync.Map
stopCh chan struct{}
}
@@ -77,19 +62,14 @@ func NewLayer2Relayer(ctx context.Context, db database.OrmFactory, cfg *config.R
}
return &Layer2Relayer{
ctx: ctx,
db: db,
messageSender: messageSender,
messageCh: messageSender.ConfirmChan(),
l1MessengerABI: bridge_abi.L1MessengerMetaABI,
rollupSender: rollupSender,
rollupCh: rollupSender.ConfirmChan(),
l1RollupABI: bridge_abi.RollupMetaABI,
cfg: cfg,
processingMessage: sync.Map{},
processingCommitment: sync.Map{},
processingFinalization: sync.Map{},
stopCh: make(chan struct{}),
ctx: ctx,
db: db,
messageSender: messageSender,
l1MessengerABI: bridge_abi.L1MessengerMetaABI,
rollupSender: rollupSender,
l1RollupABI: bridge_abi.RollupMetaABI,
cfg: cfg,
stopCh: make(chan struct{}),
}, nil
}
@@ -165,7 +145,7 @@ func (r *Layer2Relayer) processSavedEvent(msg *orm.L2Message, index uint64) erro
return err
}
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
hash, err := r.messageSender.SendTransaction(&sender.TxMeta{TxType: "MessageRelay", ID: msg.MsgHash}, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
if err != nil && err.Error() == "execution reverted: Message expired" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgExpired)
}
@@ -187,7 +167,6 @@ func (r *Layer2Relayer) processSavedEvent(msg *orm.L2Message, index uint64) erro
log.Error("UpdateLayer2StatusAndLayer1Hash failed", "msgHash", msg.MsgHash, "err", err)
return err
}
r.processingMessage.Store(msg.MsgHash, msg.MsgHash)
return nil
}
@@ -265,9 +244,8 @@ func (r *Layer2Relayer) ProcessPendingBatches() {
return
}
txID := id + "-commit"
// add suffix `-commit` to avoid duplication with finalize tx in unit tests
hash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
hash, err := r.rollupSender.SendTransaction(&sender.TxMeta{TxType: "BatchCommitment", ID: id}, &r.cfg.RollupContractAddress, big.NewInt(0), data)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send commitBatch tx to layer1 ", "id", id, "index", batch.Index, "err", err)
@@ -281,7 +259,6 @@ func (r *Layer2Relayer) ProcessPendingBatches() {
if err != nil {
log.Error("UpdateCommitTxHashAndRollupStatus failed", "id", id, "index", batch.Index, "err", err)
}
r.processingCommitment.Store(txID, id)
}
// ProcessCommittedBatches submit proof to layer 1 rollup contract
@@ -369,9 +346,8 @@ func (r *Layer2Relayer) ProcessCommittedBatches() {
return
}
txID := id + "-finalize"
// add suffix `-finalize` to avoid duplication with commit tx in unit tests
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
txHash, err := r.rollupSender.SendTransaction(&sender.TxMeta{TxType: "BatchFinalization", ID: id}, &r.cfg.RollupContractAddress, big.NewInt(0), data)
hash := &txHash
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
@@ -387,7 +363,6 @@ func (r *Layer2Relayer) ProcessCommittedBatches() {
log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "batch_id", id, "err", err)
}
success = true
r.processingFinalization.Store(txID, id)
default:
log.Error("encounter unreachable case in ProcessCommittedBatches",
@@ -424,9 +399,9 @@ func (r *Layer2Relayer) Start() {
select {
case <-ctx.Done():
return
case confirmation := <-r.messageCh:
case confirmation := <-r.messageSender.ConfirmChan():
r.handleConfirmation(confirmation)
case confirmation := <-r.rollupCh:
case confirmation := <-r.rollupSender.ConfirmChan():
r.handleConfirmation(confirmation)
}
}
@@ -447,39 +422,24 @@ func (r *Layer2Relayer) handleConfirmation(confirmation *sender.Confirmation) {
log.Warn("transaction confirmed but failed in layer1", "confirmation", confirmation)
return
}
transactionType := "Unknown"
// check whether it is message relay transaction
if msgHash, ok := r.processingMessage.Load(confirmation.ID); ok {
transactionType = "MessageRelay"
// @todo handle db error
err := r.db.UpdateLayer2StatusAndLayer1Hash(r.ctx, msgHash.(string), orm.MsgConfirmed, confirmation.TxHash.String())
txMeta, _ := confirmation.TxMeta.(*sender.TxMeta)
switch txMeta.TxType {
case "MessageRelay":
err := r.db.UpdateLayer2StatusAndLayer1Hash(r.ctx, txMeta.ID, orm.MsgConfirmed, confirmation.TxHash.String())
if err != nil {
log.Warn("UpdateLayer2StatusAndLayer1Hash failed", "msgHash", msgHash.(string), "err", err)
log.Warn("UpdateLayer2StatusAndLayer1Hash failed", "msgHash", txMeta.ID, "err", err)
}
r.processingMessage.Delete(confirmation.ID)
}
// check whether it is block commitment transaction
if batchID, ok := r.processingCommitment.Load(confirmation.ID); ok {
transactionType = "BatchCommitment"
// @todo handle db error
err := r.db.UpdateCommitTxHashAndRollupStatus(r.ctx, batchID.(string), confirmation.TxHash.String(), orm.RollupCommitted)
case "BatchCommitment":
err := r.db.UpdateCommitTxHashAndRollupStatus(r.ctx, txMeta.ID, confirmation.TxHash.String(), orm.RollupCommitted)
if err != nil {
log.Warn("UpdateCommitTxHashAndRollupStatus failed", "batch_id", batchID.(string), "err", err)
log.Warn("UpdateCommitTxHashAndRollupStatus failed", "batch_id", txMeta.ID, "err", err)
}
r.processingCommitment.Delete(confirmation.ID)
}
// check whether it is proof finalization transaction
if batchID, ok := r.processingFinalization.Load(confirmation.ID); ok {
transactionType = "ProofFinalization"
case "BatchFinalization":
// @todo handle db error
err := r.db.UpdateFinalizeTxHashAndRollupStatus(r.ctx, batchID.(string), confirmation.TxHash.String(), orm.RollupFinalized)
err := r.db.UpdateFinalizeTxHashAndRollupStatus(r.ctx, txMeta.ID, confirmation.TxHash.String(), orm.RollupFinalized)
if err != nil {
log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "batch_id", batchID.(string), "err", err)
log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "batch_id", txMeta.ID, "err", err)
}
r.processingFinalization.Delete(confirmation.ID)
}
log.Info("transaction confirmed in layer1", "type", transactionType, "confirmation", confirmation)
log.Info("transaction confirmed in layer1", "type", txMeta.TxType, "confirmation", confirmation)
}

View File

@@ -20,6 +20,7 @@ var (
templateL2Message = []*orm.L2Message{
{
Nonce: 1,
MsgHash: "msgHash",
Height: 1,
Sender: "0x596a746661dbed76a84556111c2872249b070e15",
Value: "100",

View File

@@ -3,6 +3,7 @@ package sender
import (
"context"
"crypto/ecdsa"
"encoding/json"
"errors"
"fmt"
"math/big"
@@ -41,19 +42,15 @@ var (
ErrNoAvailableAccount = errors.New("sender has no available account to send transaction")
)
// DefaultSenderConfig The default config
var DefaultSenderConfig = config.SenderConfig{
Endpoint: "",
EscalateBlocks: 3,
EscalateMultipleNum: 11,
EscalateMultipleDen: 10,
MaxGasPrice: 1000_000_000_000, // this is 1000 gwei
TxType: AccessListTxType,
// TxMeta is prepared if one sender instance be used in two more places.
type TxMeta struct {
TxType string
ID string
}
// Confirmation struct used to indicate transaction confirmation details
type Confirmation struct {
ID string
TxMeta interface{}
IsSuccessful bool
TxHash common.Hash
}
@@ -70,7 +67,7 @@ type FeeData struct {
// PendingTransaction submitted but pending transactions
type PendingTransaction struct {
submitAt uint64
id string
txMeta interface{}
feeData *FeeData
signer *bind.TransactOpts
tx *types.Transaction
@@ -98,7 +95,7 @@ type Sender struct {
// txConfirmationCh is used to notify confirmed transaction
func NewSender(ctx context.Context, config *config.SenderConfig, privs []*ecdsa.PrivateKey) (*Sender, error) {
if config == nil {
config = &DefaultSenderConfig
return nil, errors.New("create sender instance failed due to the empty config")
}
client, err := ethclient.Dial(config.Endpoint)
if err != nil {
@@ -199,23 +196,44 @@ func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, val
}, nil
}
// marshal txMeta if it's not empty.
func marshalTxMeta(val interface{}) string {
switch msg := val.(type) {
case nil:
return ""
case string:
return msg
case *TxMeta:
if msg.ID == "" || msg.TxType == "" {
return ""
}
}
data, _ := json.Marshal(val)
return string(data)
}
// SendTransaction send a signed L2tL1 transaction.
func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.Int, data []byte) (hash common.Hash, err error) {
func (s *Sender) SendTransaction(txMeta interface{}, target *common.Address, value *big.Int, data []byte) (hash common.Hash, err error) {
// Use marshal message as id, in order avoid repeat txMetas.
var metaID string
if metaID = marshalTxMeta(txMeta); metaID == "" {
return common.Hash{}, fmt.Errorf("empty txMeta is not allowed")
}
// We occupy the ID, in case some other threads call with the same ID in the same time
if _, loaded := s.pendingTxs.LoadOrStore(ID, nil); loaded {
return common.Hash{}, fmt.Errorf("has the repeat tx ID, ID: %s", ID)
if _, loaded := s.pendingTxs.LoadOrStore(metaID, nil); loaded {
return common.Hash{}, fmt.Errorf("attempted to send duplicate transaction, txMeta: %v", txMeta)
}
// get
auth := s.auths.getAccount()
if auth == nil {
s.pendingTxs.Delete(ID) // release the ID on failure
s.pendingTxs.Delete(metaID) // release the ID on failure
return common.Hash{}, ErrNoAvailableAccount
}
defer s.auths.releaseAccount(auth)
defer func() {
if err != nil {
s.pendingTxs.Delete(ID) // release the ID on failure
s.pendingTxs.Delete(metaID) // release the ID on failure
}
}()
@@ -231,12 +249,12 @@ func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.I
// add pending transaction to queue
pending := &PendingTransaction{
tx: tx,
id: ID,
txMeta: txMeta,
signer: auth,
submitAt: atomic.LoadUint64(&s.blockNumber),
feeData: feeData,
}
s.pendingTxs.Store(ID, pending)
s.pendingTxs.Store(metaID, pending)
return tx.Hash(), nil
}
@@ -263,7 +281,7 @@ func (s *Sender) createAndSendTx(auth *bind.TransactOpts, feeData *FeeData, targ
GasPrice: feeData.gasPrice,
Gas: feeData.gasLimit,
To: target,
Value: new(big.Int).Set(value),
Value: value,
Data: common.CopyBytes(data),
V: new(big.Int),
R: new(big.Int),
@@ -276,7 +294,7 @@ func (s *Sender) createAndSendTx(auth *bind.TransactOpts, feeData *FeeData, targ
GasPrice: feeData.gasPrice,
Gas: feeData.gasLimit,
To: target,
Value: new(big.Int).Set(value),
Value: value,
Data: common.CopyBytes(data),
AccessList: make(types.AccessList, 0),
V: new(big.Int),
@@ -290,7 +308,7 @@ func (s *Sender) createAndSendTx(auth *bind.TransactOpts, feeData *FeeData, targ
Data: common.CopyBytes(data),
Gas: feeData.gasLimit,
AccessList: make(types.AccessList, 0),
Value: new(big.Int).Set(value),
Value: value,
ChainID: s.chainID,
GasTipCap: feeData.gasTipCap,
GasFeeCap: feeData.gasFeeCap,
@@ -390,7 +408,7 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
s.pendingTxs.Delete(key)
// send confirm message
s.confirmCh <- &Confirmation{
ID: pending.id,
TxMeta: pending.txMeta,
IsSuccessful: receipt.Status == types.ReceiptStatusSuccessful,
TxHash: pending.tx.Hash(),
}
@@ -424,7 +442,7 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
if (err == nil) && (receipt != nil) {
// send confirm message
s.confirmCh <- &Confirmation{
ID: pending.id,
TxMeta: pending.txMeta,
IsSuccessful: receipt.Status == types.ReceiptStatusSuccessful,
TxHash: pending.tx.Hash(),
}

View File

@@ -49,6 +49,7 @@ func TestSender(t *testing.T) {
// Setup
setupEnv(t)
t.Run("testSenderMsg", testSenderMsg)
t.Run("test 1 account sender", func(t *testing.T) { testBatchSender(t, 1) })
t.Run("test 3 account sender", func(t *testing.T) { testBatchSender(t, 3) })
t.Run("test 8 account sender", func(t *testing.T) { testBatchSender(t, 8) })
@@ -59,6 +60,38 @@ func TestSender(t *testing.T) {
})
}
func testSenderMsg(t *testing.T) {
var tests = []struct {
txMeta interface{}
isFail bool
}{
{1, false},
{nil, true},
{"", true},
{&sender.TxMeta{}, true},
{&sender.TxMeta{TxType: "", ID: "1"}, true},
{&sender.TxMeta{TxType: "1", ID: ""}, true},
}
newSender, err := sender.NewSender(context.Background(), cfg.L1Config.RelayerConfig.SenderConfig, privateKeys)
if err != nil {
t.Fatal(err)
}
toAddr := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
for i, test := range tests {
_, err = newSender.SendTransaction(test.txMeta, &toAddr, nil, nil)
if test.isFail && err == nil {
t.Errorf("Test %d should fail", i)
continue
}
if !test.isFail && err != nil {
t.Errorf("Test %d should pass but got err: %v", i, err)
continue
}
}
}
func testBatchSender(t *testing.T, batchSize int) {
for len(privateKeys) < batchSize {
priv, err := crypto.GenerateKey()
@@ -78,9 +111,8 @@ func testBatchSender(t *testing.T, batchSize int) {
// send transactions
var (
eg errgroup.Group
idCache = cmap.New()
confirmCh = newSender.ConfirmChan()
eg errgroup.Group
idCache = cmap.New()
)
for idx := 0; idx < newSender.NumberOfAccounts(); idx++ {
index := idx
@@ -110,9 +142,9 @@ func testBatchSender(t *testing.T, batchSize int) {
after := time.After(80 * time.Second)
for {
select {
case cmsg := <-confirmCh:
case cmsg := <-newSender.ConfirmChan():
assert.Equal(t, true, cmsg.IsSuccessful)
_, exist := idCache.Pop(cmsg.ID)
_, exist := idCache.Pop(cmsg.TxMeta.(string))
assert.Equal(t, true, exist)
// Receive all confirmed txs.
if idCache.Count() == 0 {

View File

@@ -292,14 +292,6 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
}
var err error
tasks, err := m.orm.GetBlockBatches(map[string]interface{}{"id": msg.ID})
if len(tasks) == 0 {
if err != nil {
log.Error("failed to get tasks", "error", err)
}
return err
}
success, err = m.verifier.VerifyProof(msg.Proof)
if err != nil {
// TODO: this is only a temp workaround for testnet, we should return err in real cases

View File

@@ -61,7 +61,7 @@ func testMonitorMetrics(t *testing.T) {
runDBCliApp(t, "migrate", "current version:")
// Start bridge process with metrics server.
port, _ := rand.Int(rand.Reader, big.NewInt(2000))
port, _ := rand.Int(rand.Reader, big.NewInt(10000))
svrPort := strconv.FormatInt(port.Int64()+50000, 10)
bridgeCmd := runBridgeApp(t, "--metrics", "--metrics.addr", "localhost", "--metrics.port", svrPort)
bridgeCmd.RunApp(func() bool { return bridgeCmd.WaitResult(time.Second*20, "Start bridge successfully") })