mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-11 23:18:07 -05:00
Compare commits
24 Commits
test/code
...
maskpp/sen
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c4decb511a | ||
|
|
fab4a4e341 | ||
|
|
8dc3690311 | ||
|
|
692e9ef2f3 | ||
|
|
c09f362962 | ||
|
|
cc91bb7676 | ||
|
|
e9f9e76c10 | ||
|
|
c674b7c704 | ||
|
|
98ffa5f255 | ||
|
|
28d23b6d77 | ||
|
|
f6e52e3642 | ||
|
|
edb3d9ec69 | ||
|
|
a50ecc8bf7 | ||
|
|
754c1d85bc | ||
|
|
be93e606a2 | ||
|
|
31bd4ab383 | ||
|
|
4068746b15 | ||
|
|
f83e0083c2 | ||
|
|
3cd39802fc | ||
|
|
53d56def8c | ||
|
|
458e90e44f | ||
|
|
876966d9ec | ||
|
|
a2ca73d4e4 | ||
|
|
95d650240f |
@@ -27,14 +27,12 @@ import (
|
||||
// Actions are triggered by new head from layer 1 geth node.
|
||||
// @todo It's better to be triggered by watcher.
|
||||
type Layer1Relayer struct {
|
||||
ctx context.Context
|
||||
sender *sender.Sender
|
||||
ctx context.Context
|
||||
|
||||
db orm.L1MessageOrm
|
||||
cfg *config.RelayerConfig
|
||||
|
||||
// channel used to communicate with transaction sender
|
||||
confirmationCh <-chan *sender.Confirmation
|
||||
sender *sender.Sender
|
||||
l2MessengerABI *abi.ABI
|
||||
|
||||
stopCh chan struct{}
|
||||
@@ -62,7 +60,6 @@ func NewLayer1Relayer(ctx context.Context, db orm.L1MessageOrm, cfg *config.Rela
|
||||
l2MessengerABI: l2MessengerABI,
|
||||
cfg: cfg,
|
||||
stopCh: make(chan struct{}),
|
||||
confirmationCh: sender.ConfirmChan(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -142,12 +139,12 @@ func (r *Layer1Relayer) Start() {
|
||||
// number, err := r.client.BlockNumber(r.ctx)
|
||||
// log.Info("receive header", "height", number)
|
||||
r.ProcessSavedEvents()
|
||||
case cfm := <-r.confirmationCh:
|
||||
case cfm := <-r.sender.ConfirmChan():
|
||||
if !cfm.IsSuccessful {
|
||||
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
|
||||
} else {
|
||||
// @todo handle db error
|
||||
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, orm.MsgConfirmed, cfm.TxHash.String())
|
||||
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.TxMeta.(string), orm.MsgConfirmed, cfm.TxHash.String())
|
||||
if err != nil {
|
||||
log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err)
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"fmt"
|
||||
"math/big"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
// not sure if this will make problems when relay with l1geth
|
||||
@@ -39,25 +38,11 @@ type Layer2Relayer struct {
|
||||
cfg *config.RelayerConfig
|
||||
|
||||
messageSender *sender.Sender
|
||||
messageCh <-chan *sender.Confirmation
|
||||
l1MessengerABI *abi.ABI
|
||||
|
||||
rollupSender *sender.Sender
|
||||
rollupCh <-chan *sender.Confirmation
|
||||
l1RollupABI *abi.ABI
|
||||
|
||||
// A list of processing message.
|
||||
// key(string): confirmation ID, value(string): layer2 hash.
|
||||
processingMessage sync.Map
|
||||
|
||||
// A list of processing batch commitment.
|
||||
// key(string): confirmation ID, value(string): batch id.
|
||||
processingCommitment sync.Map
|
||||
|
||||
// A list of processing batch finalization.
|
||||
// key(string): confirmation ID, value(string): batch id.
|
||||
processingFinalization sync.Map
|
||||
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
@@ -77,19 +62,14 @@ func NewLayer2Relayer(ctx context.Context, db database.OrmFactory, cfg *config.R
|
||||
}
|
||||
|
||||
return &Layer2Relayer{
|
||||
ctx: ctx,
|
||||
db: db,
|
||||
messageSender: messageSender,
|
||||
messageCh: messageSender.ConfirmChan(),
|
||||
l1MessengerABI: bridge_abi.L1MessengerMetaABI,
|
||||
rollupSender: rollupSender,
|
||||
rollupCh: rollupSender.ConfirmChan(),
|
||||
l1RollupABI: bridge_abi.RollupMetaABI,
|
||||
cfg: cfg,
|
||||
processingMessage: sync.Map{},
|
||||
processingCommitment: sync.Map{},
|
||||
processingFinalization: sync.Map{},
|
||||
stopCh: make(chan struct{}),
|
||||
ctx: ctx,
|
||||
db: db,
|
||||
messageSender: messageSender,
|
||||
l1MessengerABI: bridge_abi.L1MessengerMetaABI,
|
||||
rollupSender: rollupSender,
|
||||
l1RollupABI: bridge_abi.RollupMetaABI,
|
||||
cfg: cfg,
|
||||
stopCh: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -165,7 +145,7 @@ func (r *Layer2Relayer) processSavedEvent(msg *orm.L2Message, index uint64) erro
|
||||
return err
|
||||
}
|
||||
|
||||
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
|
||||
hash, err := r.messageSender.SendTransaction(&sender.TxMeta{TxType: "MessageRelay", ID: msg.MsgHash}, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
|
||||
if err != nil && err.Error() == "execution reverted: Message expired" {
|
||||
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgExpired)
|
||||
}
|
||||
@@ -187,7 +167,6 @@ func (r *Layer2Relayer) processSavedEvent(msg *orm.L2Message, index uint64) erro
|
||||
log.Error("UpdateLayer2StatusAndLayer1Hash failed", "msgHash", msg.MsgHash, "err", err)
|
||||
return err
|
||||
}
|
||||
r.processingMessage.Store(msg.MsgHash, msg.MsgHash)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -265,9 +244,8 @@ func (r *Layer2Relayer) ProcessPendingBatches() {
|
||||
return
|
||||
}
|
||||
|
||||
txID := id + "-commit"
|
||||
// add suffix `-commit` to avoid duplication with finalize tx in unit tests
|
||||
hash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
|
||||
hash, err := r.rollupSender.SendTransaction(&sender.TxMeta{TxType: "BatchCommitment", ID: id}, &r.cfg.RollupContractAddress, big.NewInt(0), data)
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send commitBatch tx to layer1 ", "id", id, "index", batch.Index, "err", err)
|
||||
@@ -281,7 +259,6 @@ func (r *Layer2Relayer) ProcessPendingBatches() {
|
||||
if err != nil {
|
||||
log.Error("UpdateCommitTxHashAndRollupStatus failed", "id", id, "index", batch.Index, "err", err)
|
||||
}
|
||||
r.processingCommitment.Store(txID, id)
|
||||
}
|
||||
|
||||
// ProcessCommittedBatches submit proof to layer 1 rollup contract
|
||||
@@ -369,9 +346,8 @@ func (r *Layer2Relayer) ProcessCommittedBatches() {
|
||||
return
|
||||
}
|
||||
|
||||
txID := id + "-finalize"
|
||||
// add suffix `-finalize` to avoid duplication with commit tx in unit tests
|
||||
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
|
||||
txHash, err := r.rollupSender.SendTransaction(&sender.TxMeta{TxType: "BatchFinalization", ID: id}, &r.cfg.RollupContractAddress, big.NewInt(0), data)
|
||||
hash := &txHash
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
@@ -387,7 +363,6 @@ func (r *Layer2Relayer) ProcessCommittedBatches() {
|
||||
log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "batch_id", id, "err", err)
|
||||
}
|
||||
success = true
|
||||
r.processingFinalization.Store(txID, id)
|
||||
|
||||
default:
|
||||
log.Error("encounter unreachable case in ProcessCommittedBatches",
|
||||
@@ -424,9 +399,9 @@ func (r *Layer2Relayer) Start() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case confirmation := <-r.messageCh:
|
||||
case confirmation := <-r.messageSender.ConfirmChan():
|
||||
r.handleConfirmation(confirmation)
|
||||
case confirmation := <-r.rollupCh:
|
||||
case confirmation := <-r.rollupSender.ConfirmChan():
|
||||
r.handleConfirmation(confirmation)
|
||||
}
|
||||
}
|
||||
@@ -447,39 +422,24 @@ func (r *Layer2Relayer) handleConfirmation(confirmation *sender.Confirmation) {
|
||||
log.Warn("transaction confirmed but failed in layer1", "confirmation", confirmation)
|
||||
return
|
||||
}
|
||||
|
||||
transactionType := "Unknown"
|
||||
// check whether it is message relay transaction
|
||||
if msgHash, ok := r.processingMessage.Load(confirmation.ID); ok {
|
||||
transactionType = "MessageRelay"
|
||||
// @todo handle db error
|
||||
err := r.db.UpdateLayer2StatusAndLayer1Hash(r.ctx, msgHash.(string), orm.MsgConfirmed, confirmation.TxHash.String())
|
||||
txMeta, _ := confirmation.TxMeta.(*sender.TxMeta)
|
||||
switch txMeta.TxType {
|
||||
case "MessageRelay":
|
||||
err := r.db.UpdateLayer2StatusAndLayer1Hash(r.ctx, txMeta.ID, orm.MsgConfirmed, confirmation.TxHash.String())
|
||||
if err != nil {
|
||||
log.Warn("UpdateLayer2StatusAndLayer1Hash failed", "msgHash", msgHash.(string), "err", err)
|
||||
log.Warn("UpdateLayer2StatusAndLayer1Hash failed", "msgHash", txMeta.ID, "err", err)
|
||||
}
|
||||
r.processingMessage.Delete(confirmation.ID)
|
||||
}
|
||||
|
||||
// check whether it is block commitment transaction
|
||||
if batchID, ok := r.processingCommitment.Load(confirmation.ID); ok {
|
||||
transactionType = "BatchCommitment"
|
||||
// @todo handle db error
|
||||
err := r.db.UpdateCommitTxHashAndRollupStatus(r.ctx, batchID.(string), confirmation.TxHash.String(), orm.RollupCommitted)
|
||||
case "BatchCommitment":
|
||||
err := r.db.UpdateCommitTxHashAndRollupStatus(r.ctx, txMeta.ID, confirmation.TxHash.String(), orm.RollupCommitted)
|
||||
if err != nil {
|
||||
log.Warn("UpdateCommitTxHashAndRollupStatus failed", "batch_id", batchID.(string), "err", err)
|
||||
log.Warn("UpdateCommitTxHashAndRollupStatus failed", "batch_id", txMeta.ID, "err", err)
|
||||
}
|
||||
r.processingCommitment.Delete(confirmation.ID)
|
||||
}
|
||||
|
||||
// check whether it is proof finalization transaction
|
||||
if batchID, ok := r.processingFinalization.Load(confirmation.ID); ok {
|
||||
transactionType = "ProofFinalization"
|
||||
case "BatchFinalization":
|
||||
// @todo handle db error
|
||||
err := r.db.UpdateFinalizeTxHashAndRollupStatus(r.ctx, batchID.(string), confirmation.TxHash.String(), orm.RollupFinalized)
|
||||
err := r.db.UpdateFinalizeTxHashAndRollupStatus(r.ctx, txMeta.ID, confirmation.TxHash.String(), orm.RollupFinalized)
|
||||
if err != nil {
|
||||
log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "batch_id", batchID.(string), "err", err)
|
||||
log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "batch_id", txMeta.ID, "err", err)
|
||||
}
|
||||
r.processingFinalization.Delete(confirmation.ID)
|
||||
}
|
||||
log.Info("transaction confirmed in layer1", "type", transactionType, "confirmation", confirmation)
|
||||
log.Info("transaction confirmed in layer1", "type", txMeta.TxType, "confirmation", confirmation)
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ var (
|
||||
templateL2Message = []*orm.L2Message{
|
||||
{
|
||||
Nonce: 1,
|
||||
MsgHash: "msgHash",
|
||||
Height: 1,
|
||||
Sender: "0x596a746661dbed76a84556111c2872249b070e15",
|
||||
Value: "100",
|
||||
|
||||
@@ -3,6 +3,7 @@ package sender
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
@@ -41,19 +42,15 @@ var (
|
||||
ErrNoAvailableAccount = errors.New("sender has no available account to send transaction")
|
||||
)
|
||||
|
||||
// DefaultSenderConfig The default config
|
||||
var DefaultSenderConfig = config.SenderConfig{
|
||||
Endpoint: "",
|
||||
EscalateBlocks: 3,
|
||||
EscalateMultipleNum: 11,
|
||||
EscalateMultipleDen: 10,
|
||||
MaxGasPrice: 1000_000_000_000, // this is 1000 gwei
|
||||
TxType: AccessListTxType,
|
||||
// TxMeta is prepared if one sender instance be used in two more places.
|
||||
type TxMeta struct {
|
||||
TxType string
|
||||
ID string
|
||||
}
|
||||
|
||||
// Confirmation struct used to indicate transaction confirmation details
|
||||
type Confirmation struct {
|
||||
ID string
|
||||
TxMeta interface{}
|
||||
IsSuccessful bool
|
||||
TxHash common.Hash
|
||||
}
|
||||
@@ -70,7 +67,7 @@ type FeeData struct {
|
||||
// PendingTransaction submitted but pending transactions
|
||||
type PendingTransaction struct {
|
||||
submitAt uint64
|
||||
id string
|
||||
txMeta interface{}
|
||||
feeData *FeeData
|
||||
signer *bind.TransactOpts
|
||||
tx *types.Transaction
|
||||
@@ -98,7 +95,7 @@ type Sender struct {
|
||||
// txConfirmationCh is used to notify confirmed transaction
|
||||
func NewSender(ctx context.Context, config *config.SenderConfig, privs []*ecdsa.PrivateKey) (*Sender, error) {
|
||||
if config == nil {
|
||||
config = &DefaultSenderConfig
|
||||
return nil, errors.New("create sender instance failed due to the empty config")
|
||||
}
|
||||
client, err := ethclient.Dial(config.Endpoint)
|
||||
if err != nil {
|
||||
@@ -199,23 +196,44 @@ func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, val
|
||||
}, nil
|
||||
}
|
||||
|
||||
// marshal txMeta if it's not empty.
|
||||
func marshalTxMeta(val interface{}) string {
|
||||
switch msg := val.(type) {
|
||||
case nil:
|
||||
return ""
|
||||
case string:
|
||||
return msg
|
||||
case *TxMeta:
|
||||
if msg.ID == "" || msg.TxType == "" {
|
||||
return ""
|
||||
}
|
||||
}
|
||||
data, _ := json.Marshal(val)
|
||||
return string(data)
|
||||
}
|
||||
|
||||
// SendTransaction send a signed L2tL1 transaction.
|
||||
func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.Int, data []byte) (hash common.Hash, err error) {
|
||||
func (s *Sender) SendTransaction(txMeta interface{}, target *common.Address, value *big.Int, data []byte) (hash common.Hash, err error) {
|
||||
// Use marshal message as id, in order avoid repeat txMetas.
|
||||
var metaID string
|
||||
if metaID = marshalTxMeta(txMeta); metaID == "" {
|
||||
return common.Hash{}, fmt.Errorf("empty txMeta is not allowed")
|
||||
}
|
||||
// We occupy the ID, in case some other threads call with the same ID in the same time
|
||||
if _, loaded := s.pendingTxs.LoadOrStore(ID, nil); loaded {
|
||||
return common.Hash{}, fmt.Errorf("has the repeat tx ID, ID: %s", ID)
|
||||
if _, loaded := s.pendingTxs.LoadOrStore(metaID, nil); loaded {
|
||||
return common.Hash{}, fmt.Errorf("attempted to send duplicate transaction, txMeta: %v", txMeta)
|
||||
}
|
||||
// get
|
||||
auth := s.auths.getAccount()
|
||||
if auth == nil {
|
||||
s.pendingTxs.Delete(ID) // release the ID on failure
|
||||
s.pendingTxs.Delete(metaID) // release the ID on failure
|
||||
return common.Hash{}, ErrNoAvailableAccount
|
||||
}
|
||||
|
||||
defer s.auths.releaseAccount(auth)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
s.pendingTxs.Delete(ID) // release the ID on failure
|
||||
s.pendingTxs.Delete(metaID) // release the ID on failure
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -231,12 +249,12 @@ func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.I
|
||||
// add pending transaction to queue
|
||||
pending := &PendingTransaction{
|
||||
tx: tx,
|
||||
id: ID,
|
||||
txMeta: txMeta,
|
||||
signer: auth,
|
||||
submitAt: atomic.LoadUint64(&s.blockNumber),
|
||||
feeData: feeData,
|
||||
}
|
||||
s.pendingTxs.Store(ID, pending)
|
||||
s.pendingTxs.Store(metaID, pending)
|
||||
return tx.Hash(), nil
|
||||
}
|
||||
|
||||
@@ -263,7 +281,7 @@ func (s *Sender) createAndSendTx(auth *bind.TransactOpts, feeData *FeeData, targ
|
||||
GasPrice: feeData.gasPrice,
|
||||
Gas: feeData.gasLimit,
|
||||
To: target,
|
||||
Value: new(big.Int).Set(value),
|
||||
Value: value,
|
||||
Data: common.CopyBytes(data),
|
||||
V: new(big.Int),
|
||||
R: new(big.Int),
|
||||
@@ -276,7 +294,7 @@ func (s *Sender) createAndSendTx(auth *bind.TransactOpts, feeData *FeeData, targ
|
||||
GasPrice: feeData.gasPrice,
|
||||
Gas: feeData.gasLimit,
|
||||
To: target,
|
||||
Value: new(big.Int).Set(value),
|
||||
Value: value,
|
||||
Data: common.CopyBytes(data),
|
||||
AccessList: make(types.AccessList, 0),
|
||||
V: new(big.Int),
|
||||
@@ -290,7 +308,7 @@ func (s *Sender) createAndSendTx(auth *bind.TransactOpts, feeData *FeeData, targ
|
||||
Data: common.CopyBytes(data),
|
||||
Gas: feeData.gasLimit,
|
||||
AccessList: make(types.AccessList, 0),
|
||||
Value: new(big.Int).Set(value),
|
||||
Value: value,
|
||||
ChainID: s.chainID,
|
||||
GasTipCap: feeData.gasTipCap,
|
||||
GasFeeCap: feeData.gasFeeCap,
|
||||
@@ -390,7 +408,7 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
|
||||
s.pendingTxs.Delete(key)
|
||||
// send confirm message
|
||||
s.confirmCh <- &Confirmation{
|
||||
ID: pending.id,
|
||||
TxMeta: pending.txMeta,
|
||||
IsSuccessful: receipt.Status == types.ReceiptStatusSuccessful,
|
||||
TxHash: pending.tx.Hash(),
|
||||
}
|
||||
@@ -424,7 +442,7 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
|
||||
if (err == nil) && (receipt != nil) {
|
||||
// send confirm message
|
||||
s.confirmCh <- &Confirmation{
|
||||
ID: pending.id,
|
||||
TxMeta: pending.txMeta,
|
||||
IsSuccessful: receipt.Status == types.ReceiptStatusSuccessful,
|
||||
TxHash: pending.tx.Hash(),
|
||||
}
|
||||
|
||||
@@ -49,6 +49,7 @@ func TestSender(t *testing.T) {
|
||||
// Setup
|
||||
setupEnv(t)
|
||||
|
||||
t.Run("testSenderMsg", testSenderMsg)
|
||||
t.Run("test 1 account sender", func(t *testing.T) { testBatchSender(t, 1) })
|
||||
t.Run("test 3 account sender", func(t *testing.T) { testBatchSender(t, 3) })
|
||||
t.Run("test 8 account sender", func(t *testing.T) { testBatchSender(t, 8) })
|
||||
@@ -59,6 +60,38 @@ func TestSender(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func testSenderMsg(t *testing.T) {
|
||||
var tests = []struct {
|
||||
txMeta interface{}
|
||||
isFail bool
|
||||
}{
|
||||
{1, false},
|
||||
{nil, true},
|
||||
{"", true},
|
||||
{&sender.TxMeta{}, true},
|
||||
{&sender.TxMeta{TxType: "", ID: "1"}, true},
|
||||
{&sender.TxMeta{TxType: "1", ID: ""}, true},
|
||||
}
|
||||
newSender, err := sender.NewSender(context.Background(), cfg.L1Config.RelayerConfig.SenderConfig, privateKeys)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
toAddr := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
|
||||
|
||||
for i, test := range tests {
|
||||
_, err = newSender.SendTransaction(test.txMeta, &toAddr, nil, nil)
|
||||
if test.isFail && err == nil {
|
||||
t.Errorf("Test %d should fail", i)
|
||||
continue
|
||||
}
|
||||
if !test.isFail && err != nil {
|
||||
t.Errorf("Test %d should pass but got err: %v", i, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testBatchSender(t *testing.T, batchSize int) {
|
||||
for len(privateKeys) < batchSize {
|
||||
priv, err := crypto.GenerateKey()
|
||||
@@ -78,9 +111,8 @@ func testBatchSender(t *testing.T, batchSize int) {
|
||||
|
||||
// send transactions
|
||||
var (
|
||||
eg errgroup.Group
|
||||
idCache = cmap.New()
|
||||
confirmCh = newSender.ConfirmChan()
|
||||
eg errgroup.Group
|
||||
idCache = cmap.New()
|
||||
)
|
||||
for idx := 0; idx < newSender.NumberOfAccounts(); idx++ {
|
||||
index := idx
|
||||
@@ -110,9 +142,9 @@ func testBatchSender(t *testing.T, batchSize int) {
|
||||
after := time.After(80 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case cmsg := <-confirmCh:
|
||||
case cmsg := <-newSender.ConfirmChan():
|
||||
assert.Equal(t, true, cmsg.IsSuccessful)
|
||||
_, exist := idCache.Pop(cmsg.ID)
|
||||
_, exist := idCache.Pop(cmsg.TxMeta.(string))
|
||||
assert.Equal(t, true, exist)
|
||||
// Receive all confirmed txs.
|
||||
if idCache.Count() == 0 {
|
||||
|
||||
@@ -292,14 +292,6 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
}
|
||||
|
||||
var err error
|
||||
tasks, err := m.orm.GetBlockBatches(map[string]interface{}{"id": msg.ID})
|
||||
if len(tasks) == 0 {
|
||||
if err != nil {
|
||||
log.Error("failed to get tasks", "error", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
success, err = m.verifier.VerifyProof(msg.Proof)
|
||||
if err != nil {
|
||||
// TODO: this is only a temp workaround for testnet, we should return err in real cases
|
||||
|
||||
@@ -61,7 +61,7 @@ func testMonitorMetrics(t *testing.T) {
|
||||
runDBCliApp(t, "migrate", "current version:")
|
||||
|
||||
// Start bridge process with metrics server.
|
||||
port, _ := rand.Int(rand.Reader, big.NewInt(2000))
|
||||
port, _ := rand.Int(rand.Reader, big.NewInt(10000))
|
||||
svrPort := strconv.FormatInt(port.Int64()+50000, 10)
|
||||
bridgeCmd := runBridgeApp(t, "--metrics", "--metrics.addr", "localhost", "--metrics.port", svrPort)
|
||||
bridgeCmd.RunApp(func() bool { return bridgeCmd.WaitResult(time.Second*20, "Start bridge successfully") })
|
||||
|
||||
Reference in New Issue
Block a user