mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-14 00:18:03 -05:00
Co-authored-by: colinlyguo <651734127@qq.com> Co-authored-by: zimpha <zimpha@gmail.com> Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com> Co-authored-by: HAOYUatHZ <haoyu@protonmail.com>
180 lines
5.8 KiB
Go
180 lines
5.8 KiB
Go
package orm
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
"github.com/scroll-tech/go-ethereum/log"
|
|
|
|
"scroll-tech/common/types"
|
|
)
|
|
|
|
type l1MessageOrm struct {
|
|
db *sqlx.DB
|
|
}
|
|
|
|
var _ L1MessageOrm = (*l1MessageOrm)(nil)
|
|
|
|
// NewL1MessageOrm create an L1MessageOrm instance
|
|
func NewL1MessageOrm(db *sqlx.DB) L1MessageOrm {
|
|
return &l1MessageOrm{db: db}
|
|
}
|
|
|
|
// GetL1MessageByMsgHash fetch message by queue_index
|
|
func (m *l1MessageOrm) GetL1MessageByMsgHash(msgHash string) (*types.L1Message, error) {
|
|
msg := types.L1Message{}
|
|
|
|
row := m.db.QueryRowx(`SELECT queue_index, msg_hash, height, sender, target, value, gas_limit, calldata, layer1_hash, status FROM l1_message WHERE msg_hash = $1`, msgHash)
|
|
|
|
if err := row.StructScan(&msg); err != nil {
|
|
return nil, err
|
|
}
|
|
return &msg, nil
|
|
}
|
|
|
|
// GetL1MessageByQueueIndex fetch message by queue_index
|
|
func (m *l1MessageOrm) GetL1MessageByQueueIndex(queueIndex uint64) (*types.L1Message, error) {
|
|
msg := types.L1Message{}
|
|
|
|
row := m.db.QueryRowx(`SELECT queue_index, msg_hash, height, sender, target, value, calldata, layer1_hash, status FROM l1_message WHERE queue_index = $1`, queueIndex)
|
|
|
|
if err := row.StructScan(&msg); err != nil {
|
|
return nil, err
|
|
}
|
|
return &msg, nil
|
|
}
|
|
|
|
// GetL1MessagesByStatus fetch list of unprocessed messages given msg status
|
|
func (m *l1MessageOrm) GetL1MessagesByStatus(status types.MsgStatus, limit uint64) ([]*types.L1Message, error) {
|
|
rows, err := m.db.Queryx(`SELECT queue_index, msg_hash, height, sender, target, value, calldata, layer1_hash, status FROM l1_message WHERE status = $1 ORDER BY queue_index ASC LIMIT $2;`, status, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var msgs []*types.L1Message
|
|
for rows.Next() {
|
|
msg := &types.L1Message{}
|
|
if err = rows.StructScan(&msg); err != nil {
|
|
break
|
|
}
|
|
msgs = append(msgs, msg)
|
|
}
|
|
if len(msgs) == 0 || errors.Is(err, sql.ErrNoRows) {
|
|
// log.Warn("no unprocessed layer1 messages in db", "err", err)
|
|
} else if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return msgs, rows.Close()
|
|
}
|
|
|
|
// GetL1ProcessedQueueIndex fetch latest processed message queue_index
|
|
func (m *l1MessageOrm) GetL1ProcessedQueueIndex() (int64, error) {
|
|
row := m.db.QueryRow(`SELECT MAX(queue_index) FROM l1_message WHERE status = $1;`, types.MsgConfirmed)
|
|
|
|
var queueIndex sql.NullInt64
|
|
if err := row.Scan(&queueIndex); err != nil {
|
|
if err == sql.ErrNoRows || !queueIndex.Valid {
|
|
// no row means no message
|
|
// since queueIndex starts with 0, return -1 as the processed queueIndex
|
|
return -1, nil
|
|
}
|
|
return 0, err
|
|
}
|
|
if queueIndex.Valid {
|
|
return queueIndex.Int64, nil
|
|
}
|
|
return -1, nil
|
|
}
|
|
|
|
// SaveL1Messages batch save a list of layer1 messages
|
|
func (m *l1MessageOrm) SaveL1Messages(ctx context.Context, messages []*types.L1Message) error {
|
|
if len(messages) == 0 {
|
|
return nil
|
|
}
|
|
|
|
messageMaps := make([]map[string]interface{}, len(messages))
|
|
for i, msg := range messages {
|
|
|
|
messageMaps[i] = map[string]interface{}{
|
|
"queue_index": msg.QueueIndex,
|
|
"msg_hash": msg.MsgHash,
|
|
"height": msg.Height,
|
|
"sender": msg.Sender,
|
|
"target": msg.Target,
|
|
"value": msg.Value,
|
|
"gas_limit": msg.GasLimit,
|
|
"calldata": msg.Calldata,
|
|
"layer1_hash": msg.Layer1Hash,
|
|
}
|
|
}
|
|
_, err := m.db.NamedExec(`INSERT INTO public.l1_message (queue_index, msg_hash, height, sender, target, value, gas_limit, calldata, layer1_hash) VALUES (:queue_index, :msg_hash, :height, :sender, :target, :value, :gas_limit, :calldata, :layer1_hash);`, messageMaps)
|
|
if err != nil {
|
|
queueIndices := make([]uint64, 0, len(messages))
|
|
heights := make([]uint64, 0, len(messages))
|
|
for _, msg := range messages {
|
|
queueIndices = append(queueIndices, msg.QueueIndex)
|
|
heights = append(heights, msg.Height)
|
|
}
|
|
log.Error("failed to insert l1Messages", "queueIndices", queueIndices, "heights", heights, "err", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// UpdateLayer2Hash update corresponding layer2 hash, given message hash
|
|
func (m *l1MessageOrm) UpdateLayer2Hash(ctx context.Context, msgHash, layer2Hash string) error {
|
|
if _, err := m.db.ExecContext(ctx, m.db.Rebind("update l1_message set layer2_hash = ? where msg_hash = ?;"), layer2Hash, msgHash); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateLayer1Status updates message stauts, given message hash
|
|
func (m *l1MessageOrm) UpdateLayer1Status(ctx context.Context, msgHash string, status types.MsgStatus) error {
|
|
if _, err := m.db.ExecContext(ctx, m.db.Rebind("update l1_message set status = ? where msg_hash = ?;"), status, msgHash); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateLayer1StatusAndLayer2Hash updates message status and layer2 transaction hash, given message hash
|
|
func (m *l1MessageOrm) UpdateLayer1StatusAndLayer2Hash(ctx context.Context, msgHash string, status types.MsgStatus, layer2Hash string) error {
|
|
if _, err := m.db.ExecContext(ctx, m.db.Rebind("update l1_message set status = ?, layer2_hash = ? where msg_hash = ?;"), status, layer2Hash, msgHash); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetLayer1LatestWatchedHeight returns latest height stored in the table
|
|
func (m *l1MessageOrm) GetLayer1LatestWatchedHeight() (int64, error) {
|
|
// @note It's not correct, since we may don't have message in some blocks.
|
|
// But it will only be called at start, some redundancy is acceptable.
|
|
row := m.db.QueryRow("SELECT MAX(height) FROM l1_message;")
|
|
|
|
var height sql.NullInt64
|
|
if err := row.Scan(&height); err != nil {
|
|
if err == sql.ErrNoRows || !height.Valid {
|
|
return -1, nil
|
|
}
|
|
return 0, err
|
|
}
|
|
if height.Valid {
|
|
return height.Int64, nil
|
|
}
|
|
return -1, nil
|
|
}
|
|
|
|
func (m *l1MessageOrm) GetRelayL1MessageTxHash(queueIndex uint64) (sql.NullString, error) {
|
|
row := m.db.QueryRow(`SELECT layer2_hash FROM l1_message WHERE queue_index = $1`, queueIndex)
|
|
var hash sql.NullString
|
|
if err := row.Scan(&hash); err != nil {
|
|
return sql.NullString{}, err
|
|
}
|
|
return hash, nil
|
|
}
|