add resend unconfirm txs logic

This commit is contained in:
maskpp
2023-04-04 14:35:26 +08:00
parent 068e35705a
commit 340dc45108
5 changed files with 161 additions and 38 deletions

View File

@@ -117,7 +117,7 @@ type TxOrm interface {
SaveTx(id, sender string, tx *etypes.Transaction) error
UpdateTxMsgByID(hash string, txHash string) error
GetTxByID(id string) (*types.TxMessage, error)
GetL1TxMessages(fields map[string]interface{}, args ...string) ([]*types.TxMessage, error)
GetL2TxMessages(fields map[string]interface{}, args ...string) ([]*types.TxMessage, error)
GetBlockBatchTxMessages(fields map[string]interface{}, args ...string) ([]*types.TxMessage, error)
GetL1TxMessages(fields map[string]interface{}, args ...string) (uint64, []*types.TxMessage, error)
GetL2TxMessages(fields map[string]interface{}, args ...string) (uint64, []*types.TxMessage, error)
GetBlockBatchTxMessages(fields map[string]interface{}, args ...string) (uint64, []*types.TxMessage, error)
}

View File

@@ -2,6 +2,7 @@ package orm
import (
"fmt"
"modernc.org/mathutil"
"strings"
"github.com/jmoiron/sqlx"
@@ -71,79 +72,100 @@ func (t *txOrm) GetTxByID(id string) (*stypes.TxMessage, error) {
// where 1 = 1 AND status = :status AND queue_index > 0
// ORDER BY queue_index ASC
// LIMIT 10) as l1 on tx.id = l1.msg_hash;
func (t *txOrm) GetL1TxMessages(fields map[string]interface{}, args ...string) ([]*stypes.TxMessage, error) {
query := "select msg_hash from l1_message where 1 = 1"
func (t *txOrm) GetL1TxMessages(fields map[string]interface{}, args ...string) (uint64, []*stypes.TxMessage, error) {
query := "select msg_hash, queue_index from l1_message where 1 = 1"
for key := range fields {
query = query + fmt.Sprintf(" AND %s = :%s", key, key)
}
query = strings.Join(append([]string{query}, args...), " ")
query = fmt.Sprintf("select l1.msg_hash as id, tx.tx_hash, tx.sender, tx.nonce, tx.target, tx.value, tx.data from transaction as tx right join (%s) as l1 on tx.id = l1.msg_hash;", query)
query = fmt.Sprintf("select l1.queue_index as index, l1.msg_hash as id, tx.tx_hash, tx.sender, tx.nonce, tx.target, tx.value, tx.data from transaction as tx right join (%s) as l1 on tx.id = l1.msg_hash;", query)
db := t.db
rows, err := db.NamedQuery(db.Rebind(query), fields)
if err != nil {
return nil, err
return 0, nil, err
}
var txMsgs []*stypes.TxMessage
var (
index uint64
txMsgs []*stypes.TxMessage
)
for rows.Next() {
msg := &stypes.TxMessage{}
if err = rows.StructScan(msg); err != nil {
return nil, err
warp := struct {
Index uint64 `db:"index"`
*stypes.TxMessage
}{}
if err = rows.StructScan(&warp); err != nil {
return 0, nil, err
}
txMsgs = append(txMsgs, msg)
index = mathutil.MaxUint64(index, warp.Index)
txMsgs = append(txMsgs, warp.TxMessage)
}
return txMsgs, nil
return index, txMsgs, nil
}
// GetL2TxMessages gets tx messages by transaction right join l2_message.
func (t *txOrm) GetL2TxMessages(fields map[string]interface{}, args ...string) ([]*stypes.TxMessage, error) {
query := "select msg_hash from l2_message where 1 = 1"
func (t *txOrm) GetL2TxMessages(fields map[string]interface{}, args ...string) (uint64, []*stypes.TxMessage, error) {
query := "select msg_hash, nonce from l2_message where 1 = 1"
for key := range fields {
query = query + fmt.Sprintf(" AND %s = :%s", key, key)
}
query = strings.Join(append([]string{query}, args...), " ")
query = fmt.Sprintf("select l2.msg_hash as id, tx.tx_hash, tx.sender, tx.nonce, tx.target, tx.value, tx.data from transaction as tx right join (%s) as l2 on tx.id = l2.msg_hash;", query)
query = fmt.Sprintf("select l2.nonce as l2_nonce, l2.msg_hash as id, tx.tx_hash, tx.sender, tx.nonce, tx.target, tx.value, tx.data from transaction as tx right join (%s) as l2 on tx.id = l2.msg_hash;", query)
db := t.db
rows, err := db.NamedQuery(db.Rebind(query), fields)
if err != nil {
return nil, err
return 0, nil, err
}
var txMsgs []*stypes.TxMessage
var (
nonce uint64
txMsgs []*stypes.TxMessage
)
for rows.Next() {
msg := &stypes.TxMessage{}
if err = rows.StructScan(msg); err != nil {
return nil, err
warp := struct {
Nonce uint64 `db:"l2_nonce"`
*stypes.TxMessage
}{}
if err = rows.StructScan(&warp); err != nil {
return 0, nil, err
}
txMsgs = append(txMsgs, msg)
nonce = mathutil.MaxUint64(nonce, warp.Nonce)
txMsgs = append(txMsgs, warp.TxMessage)
}
return txMsgs, nil
return nonce, txMsgs, nil
}
// GetBlockBatchTxMessages gets tx messages by transaction right join block_batch.
func (t *txOrm) GetBlockBatchTxMessages(fields map[string]interface{}, args ...string) ([]*stypes.TxMessage, error) {
query := "select hash from block_batch where 1 = 1"
func (t *txOrm) GetBlockBatchTxMessages(fields map[string]interface{}, args ...string) (uint64, []*stypes.TxMessage, error) {
query := "select hash, index from block_batch where 1 = 1"
for key := range fields {
query = query + fmt.Sprintf(" AND %s = :%s", key, key)
}
query = strings.Join(append([]string{query}, args...), " ")
query = fmt.Sprintf("select bt.hash as id, tx.tx_hash, tx.sender, tx.nonce, tx.target, tx.value, tx.data from transaction as tx right join (%s) as bt on tx.id = bt.hash;", query)
query = fmt.Sprintf("select bt.index as index, bt.hash as id, tx.tx_hash, tx.sender, tx.nonce, tx.target, tx.value, tx.data from transaction as tx right join (%s) as bt on tx.id = bt.hash;", query)
db := t.db
rows, err := db.NamedQuery(db.Rebind(query), fields)
if err != nil {
return nil, err
return 0, nil, err
}
var txMsgs []*stypes.TxMessage
var (
index uint64
txMsgs []*stypes.TxMessage
)
for rows.Next() {
msg := &stypes.TxMessage{}
if err = rows.StructScan(msg); err != nil {
return nil, err
warp := struct {
Index uint64 `db:"index"`
*stypes.TxMessage
}{}
if err = rows.StructScan(&warp); err != nil {
return 0, nil, err
}
txMsgs = append(txMsgs, msg)
index = mathutil.MaxUint64(index, warp.Index)
txMsgs = append(txMsgs, warp.TxMessage)
}
return txMsgs, nil
return index, txMsgs, nil
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"math/big"
"modernc.org/mathutil"
"os"
"testing"
"time"
@@ -93,8 +94,8 @@ var (
func setupEnv(t *testing.T) error {
// Init db config and start db container.
dbConfig = &database.DBConfig{DriverName: "postgres"}
base.RunImages(t)
dbConfig.DSN = base.DBEndpoint()
//base.RunImages(t)
dbConfig.DSN = "postgres://maskpp:123456@localhost:5432/postgres?sslmode=disable" //base.DBEndpoint()
// Create db handler and reset db.
factory, err := database.NewOrmFactory(dbConfig)
@@ -518,12 +519,14 @@ func testTxOrmGetL1TxMessages(t *testing.T) {
assert.NoError(t, err)
}
txMsgs, err := ormTx.GetL1TxMessages(
index, txMsgs, err := ormTx.GetL1TxMessages(
map[string]interface{}{"status": types.MsgSubmitted},
fmt.Sprintf("AND queue_index > %d", 0),
fmt.Sprintf("ORDER BY queue_index ASC LIMIT %d", 10),
)
assert.NoError(t, err)
// check index is the biggest one.
assert.Equal(t, templateL1Message[1].QueueIndex, index)
assert.Equal(t, len(templateL1Message), len(txMsgs))
// The first field is full.
assert.Equal(t, templateL1Message[0].MsgHash, txMsgs[0].ID)
@@ -552,12 +555,13 @@ func testTxOrmGetL2TxMessages(t *testing.T) {
assert.NoError(t, err)
}
txMsgs, err := ormTx.GetL2TxMessages(
nonce, txMsgs, err := ormTx.GetL2TxMessages(
map[string]interface{}{"status": types.MsgSubmitted},
fmt.Sprintf("AND nonce > %d", 0),
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", 10),
)
assert.NoError(t, err)
assert.Equal(t, templateL2Message[1].Nonce, nonce)
assert.Equal(t, len(templateL2Message), len(txMsgs))
assert.Equal(t, templateL2Message[0].MsgHash, txMsgs[0].ID)
assert.Equal(t, false, txMsgs[1].TxHash.Valid)
@@ -583,12 +587,14 @@ func testTxOrmGetBlockBatchTxMessages(t *testing.T) {
err = ormTx.SaveTx(batchData1.Hash().String(), auth.From.String(), signedTx)
assert.Nil(t, err)
txMsgs, err := ormTx.GetBlockBatchTxMessages(
batchIndex, txMsgs, err := ormTx.GetBlockBatchTxMessages(
map[string]interface{}{"rollup_status": types.RollupPending},
fmt.Sprintf("AND index > %d", 0),
fmt.Sprintf("ORDER BY index ASC LIMIT %d", 10),
)
assert.NoError(t, err)
// Check bath index is the biggest one.
assert.Equal(t, mathutil.MaxUint64(batchData1.Batch.BatchIndex, batchData2.Batch.BatchIndex), batchIndex)
assert.Equal(t, 2, len(txMsgs))
assert.Equal(t, batchData1.Hash().String(), txMsgs[0].ID)
assert.Equal(t, false, txMsgs[1].TxHash.Valid)