Compare commits

..

4 Commits

Author SHA1 Message Date
ChuhanJin
3d9fce26b6 refactor(bridge-history-api): add combine constraint (#585)
Co-authored-by: vincent <419436363@qq.com>
Co-authored-by: georgehao <haohongfan@gmail.com>
Co-authored-by: colinlyguo <colinlyguo@scroll.io>
2023-07-04 13:48:51 +08:00
georgehao
95124ce70e feat(coordinator): refactor coordinator session info (#604)
Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
2023-07-03 21:36:01 +08:00
Péter Garamvölgyi
f8d4855f26 fix: use QueueIndex as nonce for L1MessageTx (#603) 2023-06-30 15:55:14 +02:00
ChuhanJin
e303fafefc feat(bridge): user claim part2: disable bridge relay (#571)
Co-authored-by: vincent <419436363@qq.com>
Co-authored-by: colinlyguo <colinlyguo@scroll.io>
Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>
Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
2023-06-30 16:23:32 +08:00
39 changed files with 441 additions and 1092 deletions

View File

@@ -75,7 +75,7 @@ func (c *CrossMsgFetcher) Start() {
return
case <-tick.C:
c.mu.Lock()
c.forwardFetchAndSaveMissingEvents(0)
c.forwardFetchAndSaveMissingEvents(1)
c.mu.Unlock()
}
}

View File

@@ -12,6 +12,7 @@ import (
backendabi "bridge-history-api/abi"
"bridge-history-api/db"
"bridge-history-api/db/orm"
"bridge-history-api/utils"
)
@@ -104,6 +105,14 @@ func L1FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, databas
log.Error("l1FetchAndSaveEvents: Failed to parse cross msg event logs", "err", err)
return err
}
for i := range depositL1CrossMsgs {
for _, msgHash := range msgHashes {
if depositL1CrossMsgs[i].Layer1Hash == msgHash.TxHash.Hex() {
depositL1CrossMsgs[i].MsgHash = msgHash.MsgHash.Hex()
break
}
}
}
dbTx, err := database.Beginx()
if err != nil {
log.Error("l2FetchAndSaveEvents: Failed to begin db transaction", "err", err)
@@ -120,11 +129,6 @@ func L1FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, databas
dbTx.Rollback()
log.Crit("l1FetchAndSaveEvents: Failed to insert relayed message event logs", "err", err)
}
err = updateL1CrossMsgMsgHash(ctx, dbTx, database, msgHashes)
if err != nil {
dbTx.Rollback()
log.Crit("l1FetchAndSaveEvents: Failed to update msgHash in L1 cross msg", "err", err)
}
err = dbTx.Commit()
if err != nil {
// if we can not insert into DB, there must something wrong, need a on-call member handle the dababase manually
@@ -157,11 +161,22 @@ func L2FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, databas
log.Warn("Failed to get l2 event logs", "err", err)
return err
}
depositL2CrossMsgs, msgHashes, relayedMsg, l2sentMsgs, err := utils.ParseBackendL2EventLogs(logs)
depositL2CrossMsgs, relayedMsg, L2SentMsgWrappers, err := utils.ParseBackendL2EventLogs(logs)
if err != nil {
log.Error("l2FetchAndSaveEvents: Failed to parse cross msg event logs", "err", err)
return err
}
var l2SentMsgs []*orm.L2SentMsg
for i := range depositL2CrossMsgs {
for _, l2SentMsgWrapper := range L2SentMsgWrappers {
if depositL2CrossMsgs[i].Layer2Hash == l2SentMsgWrapper.TxHash.Hex() {
depositL2CrossMsgs[i].MsgHash = l2SentMsgWrapper.L2SentMsg.MsgHash
l2SentMsgWrapper.L2SentMsg.TxSender = depositL2CrossMsgs[i].Sender
l2SentMsgs = append(l2SentMsgs, l2SentMsgWrapper.L2SentMsg)
break
}
}
}
dbTx, err := database.Beginx()
if err != nil {
log.Error("l2FetchAndSaveEvents: Failed to begin db transaction", "err", err)
@@ -179,16 +194,12 @@ func L2FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, databas
log.Crit("l2FetchAndSaveEvents: Failed to insert relayed message event logs", "err", err)
}
err = updateL2CrossMsgMsgHash(ctx, dbTx, database, msgHashes)
if err != nil {
dbTx.Rollback()
log.Crit("l2FetchAndSaveEvents: Failed to update msgHash in L2 cross msg", "err", err)
}
err = database.BatchInsertL2SentMsgDBTx(dbTx, l2sentMsgs)
if err != nil {
dbTx.Rollback()
log.Crit("l2FetchAndSaveEvents: Failed to insert l2 sent message", "err", err)
if len(l2SentMsgs) > 0 {
err = database.BatchInsertL2SentMsgDBTx(dbTx, l2SentMsgs)
if err != nil {
dbTx.Rollback()
log.Crit("l2FetchAndSaveEvents: Failed to insert l2 sent message", "err", err)
}
}
err = dbTx.Commit()

View File

@@ -12,30 +12,32 @@ create table cross_message
layer2_hash VARCHAR NOT NULL DEFAULT '',
layer1_token VARCHAR NOT NULL DEFAULT '',
layer2_token VARCHAR NOT NULL DEFAULT '',
token_id BIGINT NOT NULL DEFAULT 0,
asset SMALLINT NOT NULL,
msg_type SMALLINT NOT NULL,
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
-- use array to support nft bridge
token_ids VARCHAR[] NOT NULL DEFAULT '{}',
-- use array to support nft bridge
token_amounts VARCHAR[] NOT NULL DEFAULT '{}',
block_timestamp TIMESTAMP(0) DEFAULT NULL,
created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP(0) DEFAULT NULL
);
create unique index uk_msg_hash_msg_type
on cross_message (msg_hash, msg_type) where deleted_at IS NULL;
comment
on column cross_message.asset is 'ETH, ERC20, ERC721, ERC1155';
comment
on column cross_message.msg_type is 'unknown, l1msg, l2msg';
comment
on column cross_message.is_deleted is 'NotDeleted false, Deleted true';
CREATE INDEX idx_l1_msg_index ON cross_message (layer1_hash, deleted_at);
CREATE INDEX valid_l1_msg_index ON cross_message (layer1_hash, is_deleted);
CREATE INDEX idx_l2_msg_index ON cross_message (layer2_hash, deleted_at);
CREATE INDEX valid_l2_msg_index ON cross_message (layer2_hash, is_deleted);
CREATE INDEX valid_height_index ON cross_message (height, msg_type, is_deleted);
CREATE INDEX idx_height_msg_type_index ON cross_message (height, msg_type, deleted_at);
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
@@ -49,22 +51,6 @@ CREATE TRIGGER update_timestamp BEFORE UPDATE
ON cross_message FOR EACH ROW EXECUTE PROCEDURE
update_timestamp();
CREATE OR REPLACE FUNCTION deleted_at_trigger()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.is_deleted AND OLD.is_deleted != NEW.is_deleted THEN
UPDATE cross_message SET deleted_at = NOW() WHERE id = NEW.id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER deleted_at_trigger
AFTER UPDATE ON cross_message
FOR EACH ROW
EXECUTE FUNCTION deleted_at_trigger();
-- +goose StatementEnd
-- +goose Down

View File

@@ -7,17 +7,17 @@ create table relayed_msg
height BIGINT NOT NULL,
layer1_hash VARCHAR NOT NULL DEFAULT '',
layer2_hash VARCHAR NOT NULL DEFAULT '',
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP(0) DEFAULT NULL
);
comment
on column relayed_msg.is_deleted is 'NotDeleted, Deleted';
create unique index uk_msg_hash
on relayed_msg (msg_hash) where deleted_at IS NULL;
create unique index relayed_msg_hash_uindex
on relayed_msg (msg_hash);
CREATE INDEX idx_l1_msg_index ON relayed_msg (layer1_hash, deleted_at);
CREATE INDEX idx_l2_msg_index ON relayed_msg (layer2_hash, deleted_at);
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
@@ -31,22 +31,6 @@ CREATE TRIGGER update_timestamp BEFORE UPDATE
ON relayed_msg FOR EACH ROW EXECUTE PROCEDURE
update_timestamp();
CREATE OR REPLACE FUNCTION deleted_at_trigger()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.is_deleted AND OLD.is_deleted != NEW.is_deleted THEN
UPDATE relayed_msg SET deleted_at = NOW() WHERE id = NEW.id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER deleted_at_trigger
AFTER UPDATE ON relayed_msg
FOR EACH ROW
EXECUTE FUNCTION deleted_at_trigger();
-- +goose StatementEnd
-- +goose Down

View File

@@ -3,6 +3,7 @@
create table l2_sent_msg
(
id BIGSERIAL PRIMARY KEY,
tx_sender VARCHAR NOT NULL,
sender VARCHAR NOT NULL,
target VARCHAR NOT NULL,
value VARCHAR NOT NULL,
@@ -12,14 +13,16 @@ create table l2_sent_msg
batch_index BIGINT NOT NULL DEFAULT 0,
msg_proof TEXT NOT NULL DEFAULT '',
msg_data TEXT NOT NULL DEFAULT '',
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP(0) DEFAULT NULL
);
comment
on column l2_sent_msg.is_deleted is 'NotDeleted, Deleted';
create unique index uk_msg_hash
on l2_sent_msg (msg_hash) where deleted_at IS NULL;
create unique index uk_nonce
on l2_sent_msg (nonce) where deleted_at IS NULL;
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
@@ -33,22 +36,6 @@ CREATE TRIGGER update_timestamp BEFORE UPDATE
ON l2_sent_msg FOR EACH ROW EXECUTE PROCEDURE
update_timestamp();
CREATE OR REPLACE FUNCTION deleted_at_trigger()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.is_deleted AND OLD.is_deleted != NEW.is_deleted THEN
UPDATE l2_sent_msg SET deleted_at = NOW() WHERE id = NEW.id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER deleted_at_trigger
AFTER UPDATE ON l2_sent_msg
FOR EACH ROW
EXECUTE FUNCTION deleted_at_trigger();
-- +goose StatementEnd
-- +goose Down

View File

@@ -8,12 +8,17 @@ create table rollup_batch
start_block_number BIGINT NOT NULL,
end_block_number BIGINT NOT NULL,
batch_hash VARCHAR NOT NULL,
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP(0) DEFAULT NULL
);
create unique index uk_batch_index
on rollup_batch (batch_index) where deleted_at IS NULL;
create unique index uk_batch_hash
on rollup_batch (batch_hash) where deleted_at IS NULL;
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
@@ -26,21 +31,6 @@ CREATE TRIGGER update_timestamp BEFORE UPDATE
ON rollup_batch FOR EACH ROW EXECUTE PROCEDURE
update_timestamp();
CREATE OR REPLACE FUNCTION deleted_at_trigger()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.is_deleted AND OLD.is_deleted != NEW.is_deleted THEN
UPDATE rollup_batch SET deleted_at = NOW() WHERE id = NEW.id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER deleted_at_trigger
AFTER UPDATE ON rollup_batch
FOR EACH ROW
EXECUTE FUNCTION deleted_at_trigger();
-- +goose StatementEnd
-- +goose Down

View File

@@ -2,7 +2,6 @@ package orm
import (
"database/sql"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/jmoiron/sqlx"
@@ -40,14 +39,6 @@ func (b *rollupBatchOrm) BatchInsertRollupBatchDBTx(dbTx *sqlx.Tx, batches []*Ro
"start_block_number": batch.StartBlockNumber,
"end_block_number": batch.EndBlockNumber,
}
var exists bool
err = dbTx.QueryRow(`SELECT EXISTS(SELECT 1 FROM rollup_batch WHERE batch_index = $1 AND NOT is_deleted)`, batch.BatchIndex).Scan(&exists)
if err != nil {
return err
}
if exists {
return fmt.Errorf("BatchInsertRollupBatchDBTx: batch index %v already exists at height %v", batch.BatchIndex, batch.CommitHeight)
}
}
_, err = dbTx.NamedExec(`insert into rollup_batch(commit_height, batch_index, batch_hash, start_block_number, end_block_number) values(:commit_height, :batch_index, :batch_hash, :start_block_number, :end_block_number);`, batchMaps)
if err != nil {

View File

@@ -40,31 +40,24 @@ const (
// CrossMsg represents a cross message from layer 1 to layer 2
type CrossMsg struct {
ID uint64 `json:"id" db:"id"`
MsgHash string `json:"msg_hash" db:"msg_hash"`
Height uint64 `json:"height" db:"height"`
Sender string `json:"sender" db:"sender"`
Target string `json:"target" db:"target"`
Amount string `json:"amount" db:"amount"`
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
Layer1Token string `json:"layer1_token" db:"layer1_token"`
Layer2Token string `json:"layer2_token" db:"layer2_token"`
TokenID uint64 `json:"token_id" db:"token_id"`
Asset int `json:"asset" db:"asset"`
MsgType int `json:"msg_type" db:"msg_type"`
IsDeleted bool `json:"is_deleted" db:"is_deleted"`
Timestamp *time.Time `json:"timestamp" db:"block_timestamp"`
CreatedAt *time.Time `json:"created_at" db:"created_at"`
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
DeletedAt *time.Time `json:"deleted_at" db:"deleted_at"`
}
type RelayedMsg struct {
MsgHash string `json:"msg_hash" db:"msg_hash"`
Height uint64 `json:"height" db:"height"`
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
ID uint64 `json:"id" db:"id"`
MsgHash string `json:"msg_hash" db:"msg_hash"`
Height uint64 `json:"height" db:"height"`
Sender string `json:"sender" db:"sender"`
Target string `json:"target" db:"target"`
Amount string `json:"amount" db:"amount"`
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
Layer1Token string `json:"layer1_token" db:"layer1_token"`
Layer2Token string `json:"layer2_token" db:"layer2_token"`
TokenIDs []string `json:"token_ids" db:"token_ids"`
TokenAmounts []string `json:"token_amounts" db:"token_amounts"`
Asset int `json:"asset" db:"asset"`
MsgType int `json:"msg_type" db:"msg_type"`
Timestamp *time.Time `json:"timestamp" db:"block_timestamp"`
CreatedAt *time.Time `json:"created_at" db:"created_at"`
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
DeletedAt *time.Time `json:"deleted_at" db:"deleted_at"`
}
// L1CrossMsgOrm provides operations on l1_cross_message table

View File

@@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"errors"
"fmt"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -23,7 +22,7 @@ func NewL1CrossMsgOrm(db *sqlx.DB) L1CrossMsgOrm {
func (l *l1CrossMsgOrm) GetL1CrossMsgByHash(l1Hash common.Hash) (*CrossMsg, error) {
result := &CrossMsg{}
row := l.db.QueryRowx(`SELECT * FROM cross_message WHERE layer1_hash = $1 AND msg_type = $2 AND NOT is_deleted;`, l1Hash.String(), Layer1Msg)
row := l.db.QueryRowx(`SELECT * FROM cross_message WHERE layer1_hash = $1 AND msg_type = $2 AND deleted_at IS NULL;`, l1Hash.String(), Layer1Msg)
if err := row.StructScan(result); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
@@ -37,7 +36,7 @@ func (l *l1CrossMsgOrm) GetL1CrossMsgByHash(l1Hash common.Hash) (*CrossMsg, erro
// Warning: return empty slice if no data found
func (l *l1CrossMsgOrm) GetL1CrossMsgsByAddress(sender common.Address) ([]*CrossMsg, error) {
var results []*CrossMsg
rows, err := l.db.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND msg_type = 1 AND NOT is_deleted;`, sender.String(), Layer1Msg)
rows, err := l.db.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND msg_type = 1 AND deleted_at IS NULL;`, sender.String(), Layer1Msg)
for rows.Next() {
msg := &CrossMsg{}
@@ -69,19 +68,11 @@ func (l *l1CrossMsgOrm) BatchInsertL1CrossMsgDBTx(dbTx *sqlx.Tx, messages []*Cro
"layer1_hash": msg.Layer1Hash,
"layer1_token": msg.Layer1Token,
"layer2_token": msg.Layer2Token,
"token_id": msg.TokenID,
"token_ids": msg.TokenIDs,
"msg_type": Layer1Msg,
}
var exists bool
err = dbTx.QueryRow(`SELECT EXISTS(SELECT 1 FROM cross_message WHERE layer1_hash = $1 AND NOT is_deleted)`, msg.Layer1Hash).Scan(&exists)
if err != nil {
return err
}
if exists {
return fmt.Errorf("BatchInsertL1CrossMsgDBTx: l1 cross msg layer1Hash %v already exists at height %v", msg.Layer1Hash, msg.Height)
}
}
_, err = dbTx.NamedExec(`insert into cross_message(height, sender, target, asset, layer1_hash, layer1_token, layer2_token, token_id, amount, msg_type) values(:height, :sender, :target, :asset, :layer1_hash, :layer1_token, :layer2_token, :token_id, :amount, :msg_type);`, messageMaps)
_, err = dbTx.NamedExec(`insert into cross_message(height, sender, target, asset, layer1_hash, layer1_token, layer2_token, token_ids, amount, msg_type) values(:height, :sender, :target, :asset, :layer1_hash, :layer1_token, :layer2_token, :token_ids, :amount, :msg_type);`, messageMaps)
if err != nil {
log.Error("BatchInsertL1CrossMsgDBTx: failed to insert l1 cross msgs", "err", err)
return err
@@ -92,7 +83,7 @@ func (l *l1CrossMsgOrm) BatchInsertL1CrossMsgDBTx(dbTx *sqlx.Tx, messages []*Cro
// UpdateL1CrossMsgHashDBTx update l1 cross msg hash in db, no need to check msg_type since layer1_hash wont be empty if its layer1 msg
func (l *l1CrossMsgOrm) UpdateL1CrossMsgHashDBTx(ctx context.Context, dbTx *sqlx.Tx, l1Hash, msgHash common.Hash) error {
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update public.cross_message set msg_hash = ? where layer1_hash = ? AND NOT is_deleted;"), msgHash.String(), l1Hash.String()); err != nil {
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update public.cross_message set msg_hash = ? where layer1_hash = ? AND deleted_at IS NULL;"), msgHash.String(), l1Hash.String()); err != nil {
return err
}
return nil
@@ -100,7 +91,7 @@ func (l *l1CrossMsgOrm) UpdateL1CrossMsgHashDBTx(ctx context.Context, dbTx *sqlx
}
func (l *l1CrossMsgOrm) UpdateL1CrossMsgHash(ctx context.Context, l1Hash, msgHash common.Hash) error {
if _, err := l.db.ExecContext(ctx, l.db.Rebind("update public.l1_cross_message set msg_hash = ? where layer1_hash = ? AND NOT is_deleted;"), msgHash.String(), l1Hash.String()); err != nil {
if _, err := l.db.ExecContext(ctx, l.db.Rebind("update public.l1_cross_message set msg_hash = ? where layer1_hash = ? AND deleted_at IS NULL;"), msgHash.String(), l1Hash.String()); err != nil {
return err
}
return nil
@@ -108,7 +99,7 @@ func (l *l1CrossMsgOrm) UpdateL1CrossMsgHash(ctx context.Context, l1Hash, msgHas
}
func (l *l1CrossMsgOrm) GetLatestL1ProcessedHeight() (int64, error) {
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE msg_type = $1 AND NOT is_deleted ORDER BY id DESC LIMIT 1;`, Layer1Msg)
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE msg_type = $1 AND deleted_at IS NULL ORDER BY id DESC LIMIT 1;`, Layer1Msg)
var result sql.NullInt64
if err := row.Scan(&result); err != nil {
if err == sql.ErrNoRows || !result.Valid {
@@ -123,21 +114,21 @@ func (l *l1CrossMsgOrm) GetLatestL1ProcessedHeight() (int64, error) {
}
func (l *l1CrossMsgOrm) DeleteL1CrossMsgAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error {
if _, err := l.db.Exec(`UPDATE cross_message SET is_deleted = true WHERE height > $1 AND msg_type = $2;`, height, Layer1Msg); err != nil {
if _, err := l.db.Exec(`UPDATE cross_message SET deleted_at = current_timestamp WHERE height > $1 AND msg_type = $2;`, height, Layer1Msg); err != nil {
return err
}
return nil
}
func (l *l1CrossMsgOrm) UpdateL1BlockTimestamp(height uint64, timestamp time.Time) error {
if _, err := l.db.Exec(`UPDATE cross_message SET block_timestamp = $1 where height = $2 AND msg_type = $3 AND NOT is_deleted`, timestamp, height, Layer1Msg); err != nil {
if _, err := l.db.Exec(`UPDATE cross_message SET block_timestamp = $1 where height = $2 AND msg_type = $3 AND deleted_at IS NULL`, timestamp, height, Layer1Msg); err != nil {
return err
}
return nil
}
func (l *l1CrossMsgOrm) GetL1EarliestNoBlockTimestampHeight() (uint64, error) {
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE block_timestamp IS NULL AND msg_type = $1 AND NOT is_deleted ORDER BY height ASC LIMIT 1;`, Layer1Msg)
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE block_timestamp IS NULL AND msg_type = $1 AND deleted_at IS NULL ORDER BY height ASC LIMIT 1;`, Layer1Msg)
var result uint64
if err := row.Scan(&result); err != nil {
if err == sql.ErrNoRows {

View File

@@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"errors"
"fmt"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -23,7 +22,7 @@ func NewL2CrossMsgOrm(db *sqlx.DB) L2CrossMsgOrm {
func (l *l2CrossMsgOrm) GetL2CrossMsgByHash(l2Hash common.Hash) (*CrossMsg, error) {
result := &CrossMsg{}
row := l.db.QueryRowx(`SELECT * FROM cross_message WHERE layer2_hash = $1 AND NOT is_deleted;`, l2Hash.String())
row := l.db.QueryRowx(`SELECT * FROM cross_message WHERE layer2_hash = $1 AND deleted_at IS NULL;`, l2Hash.String())
if err := row.StructScan(result); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
@@ -37,7 +36,7 @@ func (l *l2CrossMsgOrm) GetL2CrossMsgByHash(l2Hash common.Hash) (*CrossMsg, erro
// Warning: return empty slice if no data found
func (l *l2CrossMsgOrm) GetL2CrossMsgByAddress(sender common.Address) ([]*CrossMsg, error) {
var results []*CrossMsg
rows, err := l.db.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND msg_type = $2 AND NOT is_deleted;`, sender.String(), Layer2Msg)
rows, err := l.db.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND msg_type = $2 AND deleted_at IS NULL;`, sender.String(), Layer2Msg)
for rows.Next() {
msg := &CrossMsg{}
@@ -56,7 +55,7 @@ func (l *l2CrossMsgOrm) GetL2CrossMsgByAddress(sender common.Address) ([]*CrossM
}
func (l *l2CrossMsgOrm) DeleteL2CrossMsgFromHeightDBTx(dbTx *sqlx.Tx, height int64) error {
_, err := dbTx.Exec(`UPDATE cross_message SET is_deleted = true where height > $1 AND msg_type = $2 ;`, height, Layer2Msg)
_, err := dbTx.Exec(`UPDATE cross_message SET deleted_at = current_timestamp where height > $1 AND msg_type = $2 ;`, height, Layer2Msg)
if err != nil {
log.Error("DeleteL1CrossMsgAfterHeightDBTx: failed to delete", "height", height, "err", err)
return err
@@ -81,20 +80,12 @@ func (l *l2CrossMsgOrm) BatchInsertL2CrossMsgDBTx(dbTx *sqlx.Tx, messages []*Cro
"layer2_hash": msg.Layer2Hash,
"layer1_token": msg.Layer1Token,
"layer2_token": msg.Layer2Token,
"token_id": msg.TokenID,
"token_ids": msg.TokenIDs,
"amount": msg.Amount,
"msg_type": Layer2Msg,
}
var exists bool
err = dbTx.QueryRow(`SELECT EXISTS(SELECT 1 FROM cross_message WHERE layer2_hash = $1 AND NOT is_deleted)`, msg.Layer2Hash).Scan(&exists)
if err != nil {
return err
}
if exists {
return fmt.Errorf("BatchInsertL2CrossMsgDBTx: l2 cross msg layer2Hash %v already exists at height %v", msg.Layer2Hash, msg.Height)
}
}
_, err = dbTx.NamedExec(`insert into cross_message(height, sender, target, asset, layer2_hash, layer1_token, layer2_token, token_id, amount, msg_type) values(:height, :sender, :target, :asset, :layer2_hash, :layer1_token, :layer2_token, :token_id, :amount, :msg_type);`, messageMaps)
_, err = dbTx.NamedExec(`insert into cross_message(height, sender, target, asset, layer2_hash, layer1_token, layer2_token, token_ids, amount, msg_type) values(:height, :sender, :target, :asset, :layer2_hash, :layer1_token, :layer2_token, :token_ids, :amount, :msg_type);`, messageMaps)
if err != nil {
log.Error("BatchInsertL2CrossMsgDBTx: failed to insert l2 cross msgs", "err", err)
return err
@@ -103,21 +94,21 @@ func (l *l2CrossMsgOrm) BatchInsertL2CrossMsgDBTx(dbTx *sqlx.Tx, messages []*Cro
}
func (l *l2CrossMsgOrm) UpdateL2CrossMsgHashDBTx(ctx context.Context, dbTx *sqlx.Tx, l2Hash, msgHash common.Hash) error {
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update public.cross_message set msg_hash = ? where layer2_hash = ? AND NOT is_deleted;"), msgHash.String(), l2Hash.String()); err != nil {
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update cross_message set msg_hash = ? where layer2_hash = ? AND deleted_at IS NULL;"), msgHash.String(), l2Hash.String()); err != nil {
return err
}
return nil
}
func (l *l2CrossMsgOrm) UpdateL2CrossMsgHash(ctx context.Context, l2Hash, msgHash common.Hash) error {
if _, err := l.db.ExecContext(ctx, l.db.Rebind("update public.cross_message set msg_hash = ? where layer2_hash = ? AND NOT is_deleted;"), msgHash.String(), l2Hash.String()); err != nil {
if _, err := l.db.ExecContext(ctx, l.db.Rebind("update cross_message set msg_hash = ? where layer2_hash = ? AND deleted_at IS NULL;"), msgHash.String(), l2Hash.String()); err != nil {
return err
}
return nil
}
func (l *l2CrossMsgOrm) GetLatestL2ProcessedHeight() (int64, error) {
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE msg_type = $1 AND NOT is_deleted ORDER BY id DESC LIMIT 1;`, Layer2Msg)
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE msg_type = $1 AND deleted_at IS NULL ORDER BY id DESC LIMIT 1;`, Layer2Msg)
var result sql.NullInt64
if err := row.Scan(&result); err != nil {
if err == sql.ErrNoRows || !result.Valid {
@@ -132,14 +123,14 @@ func (l *l2CrossMsgOrm) GetLatestL2ProcessedHeight() (int64, error) {
}
func (l *l2CrossMsgOrm) UpdateL2BlockTimestamp(height uint64, timestamp time.Time) error {
if _, err := l.db.Exec(`UPDATE cross_message SET block_timestamp = $1 where height = $2 AND msg_type = $3 AND NOT is_deleted`, timestamp, height, Layer2Msg); err != nil {
if _, err := l.db.Exec(`UPDATE cross_message SET block_timestamp = $1 where height = $2 AND msg_type = $3 AND deleted_at IS NULL`, timestamp, height, Layer2Msg); err != nil {
return err
}
return nil
}
func (l *l2CrossMsgOrm) GetL2EarliestNoBlockTimestampHeight() (uint64, error) {
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE block_timestamp IS NULL AND msg_type = $1 AND NOT is_deleted ORDER BY height ASC LIMIT 1;`, Layer2Msg)
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE block_timestamp IS NULL AND msg_type = $1 AND deleted_at IS NULL ORDER BY height ASC LIMIT 1;`, Layer2Msg)
var result uint64
if err := row.Scan(&result); err != nil {
if err == sql.ErrNoRows {

View File

@@ -3,7 +3,6 @@ package orm
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/ethereum/go-ethereum/log"
@@ -12,6 +11,7 @@ import (
type L2SentMsg struct {
ID uint64 `json:"id" db:"id"`
TxSender string `json:"tx_sender" db:"tx_sender"`
MsgHash string `json:"msg_hash" db:"msg_hash"`
Sender string `json:"sender" db:"sender"`
Target string `json:"target" db:"target"`
@@ -21,7 +21,6 @@ type L2SentMsg struct {
BatchIndex uint64 `json:"batch_index" db:"batch_index"`
MsgProof string `json:"msg_proof" db:"msg_proof"`
MsgData string `json:"msg_data" db:"msg_data"`
IsDeleted bool `json:"is_deleted" db:"is_deleted"`
CreatedAt *time.Time `json:"created_at" db:"created_at"`
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
DeletedAt *time.Time `json:"deleted_at" db:"deleted_at"`
@@ -38,7 +37,7 @@ func NewL2SentMsgOrm(db *sqlx.DB) L2SentMsgOrm {
func (l *l2SentMsgOrm) GetL2SentMsgByHash(msgHash string) (*L2SentMsg, error) {
result := &L2SentMsg{}
row := l.db.QueryRowx(`SELECT * FROM l2_sent_msg WHERE msg_hash = $1 AND NOT is_deleted;`, msgHash)
row := l.db.QueryRowx(`SELECT * FROM l2_sent_msg WHERE msg_hash = $1 AND deleted_at IS NULL;`, msgHash)
if err := row.StructScan(result); err != nil {
return nil, err
}
@@ -53,6 +52,7 @@ func (l *l2SentMsgOrm) BatchInsertL2SentMsgDBTx(dbTx *sqlx.Tx, messages []*L2Sen
messageMaps := make([]map[string]interface{}, len(messages))
for i, msg := range messages {
messageMaps[i] = map[string]interface{}{
"tx_sender": msg.TxSender,
"sender": msg.Sender,
"target": msg.Target,
"value": msg.Value,
@@ -63,25 +63,17 @@ func (l *l2SentMsgOrm) BatchInsertL2SentMsgDBTx(dbTx *sqlx.Tx, messages []*L2Sen
"msg_proof": msg.MsgProof,
"msg_data": msg.MsgData,
}
var exists bool
err = dbTx.QueryRow(`SELECT EXISTS(SELECT 1 FROM l2_sent_msg WHERE (msg_hash = $1 OR nonce = $2) AND NOT is_deleted)`, msg.MsgHash, msg.Nonce).Scan(&exists)
if err != nil {
return err
}
if exists {
return fmt.Errorf("BatchInsertL2SentMsgDBTx: l2 sent msg_hash %v already exists at height %v", msg.MsgHash, msg.Height)
}
}
_, err = dbTx.NamedExec(`insert into l2_sent_msg(sender, target, value, msg_hash, height, nonce, batch_index, msg_proof, msg_data) values(:sender, :target, :value, :msg_hash, :height, :nonce, :batch_index, :msg_proof, :msg_data);`, messageMaps)
_, err = dbTx.NamedExec(`insert into l2_sent_msg(tx_sender, sender, target, value, msg_hash, height, nonce, batch_index, msg_proof, msg_data) values(:tx_sender, :sender, :target, :value, :msg_hash, :height, :nonce, :batch_index, :msg_proof, :msg_data);`, messageMaps)
if err != nil {
log.Error("BatchInsertL2SentMsgDBTx: failed to insert l2 sent msgs", "msg_Hash", "err", err)
log.Error("BatchInsertL2SentMsgDBTx: failed to insert l2 sent msgs", "err", err)
return err
}
return err
}
func (l *l2SentMsgOrm) GetLatestSentMsgHeightOnL2() (int64, error) {
row := l.db.QueryRow(`SELECT height FROM l2_sent_msg WHERE NOT is_deleted ORDER BY nonce DESC LIMIT 1;`)
row := l.db.QueryRow(`SELECT height FROM l2_sent_msg WHERE deleted_at IS NULL ORDER BY nonce DESC LIMIT 1;`)
var result sql.NullInt64
if err := row.Scan(&result); err != nil {
if err == sql.ErrNoRows || !result.Valid {
@@ -96,14 +88,14 @@ func (l *l2SentMsgOrm) GetLatestSentMsgHeightOnL2() (int64, error) {
}
func (l *l2SentMsgOrm) UpdateL2MessageProofInDBTx(ctx context.Context, dbTx *sqlx.Tx, msgHash string, proof string, batch_index uint64) error {
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update l2_sent_msg set msg_proof = ?, batch_index = ? where msg_hash = ? AND NOT is_deleted;"), proof, batch_index, msgHash); err != nil {
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update l2_sent_msg set msg_proof = ?, batch_index = ? where msg_hash = ? AND deleted_at IS NULL;"), proof, batch_index, msgHash); err != nil {
return err
}
return nil
}
func (l *l2SentMsgOrm) GetLatestL2SentMsgBatchIndex() (int64, error) {
row := l.db.QueryRow(`SELECT batch_index FROM l2_sent_msg WHERE msg_proof != '' AND NOT is_deleted ORDER BY batch_index DESC LIMIT 1;`)
row := l.db.QueryRow(`SELECT batch_index FROM l2_sent_msg WHERE msg_proof != '' AND deleted_at IS NULL ORDER BY batch_index DESC LIMIT 1;`)
var result sql.NullInt64
if err := row.Scan(&result); err != nil {
if err == sql.ErrNoRows || !result.Valid {
@@ -119,7 +111,7 @@ func (l *l2SentMsgOrm) GetLatestL2SentMsgBatchIndex() (int64, error) {
func (l *l2SentMsgOrm) GetL2SentMsgMsgHashByHeightRange(startHeight, endHeight uint64) ([]*L2SentMsg, error) {
var results []*L2SentMsg
rows, err := l.db.Queryx(`SELECT * FROM l2_sent_msg WHERE height >= $1 AND height <= $2 AND NOT is_deleted ORDER BY nonce ASC;`, startHeight, endHeight)
rows, err := l.db.Queryx(`SELECT * FROM l2_sent_msg WHERE height >= $1 AND height <= $2 AND deleted_at IS NULL ORDER BY nonce ASC;`, startHeight, endHeight)
if err != nil {
return nil, err
}
@@ -135,7 +127,7 @@ func (l *l2SentMsgOrm) GetL2SentMsgMsgHashByHeightRange(startHeight, endHeight u
func (l *l2SentMsgOrm) GetL2SentMessageByNonce(nonce uint64) (*L2SentMsg, error) {
result := &L2SentMsg{}
row := l.db.QueryRowx(`SELECT * FROM l2_sent_msg WHERE nonce = $1 AND NOT is_deleted;`, nonce)
row := l.db.QueryRowx(`SELECT * FROM l2_sent_msg WHERE nonce = $1 AND deleted_at IS NULL;`, nonce)
err := row.StructScan(result)
if err != nil {
return nil, err
@@ -145,7 +137,7 @@ func (l *l2SentMsgOrm) GetL2SentMessageByNonce(nonce uint64) (*L2SentMsg, error)
func (l *l2SentMsgOrm) GetLatestL2SentMsgLEHeight(endBlockNumber uint64) (*L2SentMsg, error) {
result := &L2SentMsg{}
row := l.db.QueryRowx(`select * from l2_sent_msg where height <= $1 AND NOT is_deleted order by nonce desc limit 1`, endBlockNumber)
row := l.db.QueryRowx(`select * from l2_sent_msg where height <= $1 AND deleted_at IS NULL order by nonce desc limit 1`, endBlockNumber)
err := row.StructScan(result)
if err != nil {
return nil, err
@@ -154,6 +146,6 @@ func (l *l2SentMsgOrm) GetLatestL2SentMsgLEHeight(endBlockNumber uint64) (*L2Sen
}
func (l *l2SentMsgOrm) DeleteL2SentMsgAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error {
_, err := dbTx.Exec(`UPDATE l2_sent_msg SET is_deleted = true WHERE height > $1;`, height)
_, err := dbTx.Exec(`UPDATE l2_sent_msg SET deleted_at = current_timestamp WHERE height > $1;`, height)
return err
}

View File

@@ -8,6 +8,13 @@ import (
"github.com/jmoiron/sqlx"
)
type RelayedMsg struct {
MsgHash string `json:"msg_hash" db:"msg_hash"`
Height uint64 `json:"height" db:"height"`
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
}
type relayedMsgOrm struct {
db *sqlx.DB
}
@@ -33,7 +40,7 @@ func (l *relayedMsgOrm) BatchInsertRelayedMsgDBTx(dbTx *sqlx.Tx, messages []*Rel
}
_, err = dbTx.NamedExec(`insert into relayed_msg(msg_hash, height, layer1_hash, layer2_hash) values(:msg_hash, :height, :layer1_hash, :layer2_hash);`, messageMaps)
if err != nil {
log.Error("BatchInsertRelayedMsgDBTx: failed to insert l1 cross msgs", "msg_Hashe", "err", err)
log.Error("BatchInsertRelayedMsgDBTx: failed to insert relayed msgs", "err", err)
return err
}
return nil
@@ -41,7 +48,7 @@ func (l *relayedMsgOrm) BatchInsertRelayedMsgDBTx(dbTx *sqlx.Tx, messages []*Rel
func (l *relayedMsgOrm) GetRelayedMsgByHash(msg_hash string) (*RelayedMsg, error) {
result := &RelayedMsg{}
row := l.db.QueryRowx(`SELECT msg_hash, height, layer1_hash, layer2_hash FROM relayed_msg WHERE msg_hash = $1 AND NOT is_deleted;`, msg_hash)
row := l.db.QueryRowx(`SELECT msg_hash, height, layer1_hash, layer2_hash FROM relayed_msg WHERE msg_hash = $1 AND deleted_at IS NULL;`, msg_hash)
if err := row.StructScan(result); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
@@ -52,7 +59,7 @@ func (l *relayedMsgOrm) GetRelayedMsgByHash(msg_hash string) (*RelayedMsg, error
}
func (l *relayedMsgOrm) GetLatestRelayedHeightOnL1() (int64, error) {
row := l.db.QueryRow(`SELECT height FROM relayed_msg WHERE layer1_hash != '' AND NOT is_deleted ORDER BY height DESC LIMIT 1;`)
row := l.db.QueryRow(`SELECT height FROM relayed_msg WHERE layer1_hash != '' AND deleted_at IS NULL ORDER BY height DESC LIMIT 1;`)
var result sql.NullInt64
if err := row.Scan(&result); err != nil {
if err == sql.ErrNoRows || !result.Valid {
@@ -67,7 +74,7 @@ func (l *relayedMsgOrm) GetLatestRelayedHeightOnL1() (int64, error) {
}
func (l *relayedMsgOrm) GetLatestRelayedHeightOnL2() (int64, error) {
row := l.db.QueryRow(`SELECT height FROM relayed_msg WHERE layer2_hash != '' AND NOT is_deleted ORDER BY height DESC LIMIT 1;`)
row := l.db.QueryRow(`SELECT height FROM relayed_msg WHERE layer2_hash != '' AND deleted_at IS NULL ORDER BY height DESC LIMIT 1;`)
var result sql.NullInt64
if err := row.Scan(&result); err != nil {
if err == sql.ErrNoRows || !result.Valid {
@@ -82,11 +89,11 @@ func (l *relayedMsgOrm) GetLatestRelayedHeightOnL2() (int64, error) {
}
func (l *relayedMsgOrm) DeleteL1RelayedHashAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error {
_, err := dbTx.Exec(`UPDATE relayed_msg SET is_deleted = true WHERE height > $1 AND layer1_hash != '';`, height)
_, err := dbTx.Exec(`UPDATE relayed_msg SET deleted_at = current_timestamp WHERE height > $1 AND layer1_hash != '';`, height)
return err
}
func (l *relayedMsgOrm) DeleteL2RelayedHashAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error {
_, err := dbTx.Exec(`UPDATE relayed_msg SET is_deleted = true WHERE height > $1 AND layer2_hash != '';`, height)
_, err := dbTx.Exec(`UPDATE relayed_msg SET deleted_at = current_timestamp WHERE height > $1 AND layer2_hash != '';`, height)
return err
}

View File

@@ -68,7 +68,7 @@ func (o *ormFactory) Beginx() (*sqlx.Tx, error) {
func (o *ormFactory) GetTotalCrossMsgCountByAddress(sender string) (uint64, error) {
var count uint64
row := o.DB.QueryRowx(`SELECT COUNT(*) FROM cross_message WHERE sender = $1 AND NOT is_deleted;`, sender)
row := o.DB.QueryRowx(`SELECT COUNT(*) FROM cross_message WHERE sender = $1 AND deleted_at IS NULL;`, sender)
if err := row.Scan(&count); err != nil {
return 0, err
}
@@ -78,7 +78,7 @@ func (o *ormFactory) GetTotalCrossMsgCountByAddress(sender string) (uint64, erro
func (o *ormFactory) GetCrossMsgsByAddressWithOffset(sender string, offset int64, limit int64) ([]*orm.CrossMsg, error) {
para := sender
var results []*orm.CrossMsg
rows, err := o.DB.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND NOT is_deleted ORDER BY block_timestamp DESC NULLS FIRST, id DESC LIMIT $2 OFFSET $3;`, para, limit, offset)
rows, err := o.DB.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND deleted_at IS NULL ORDER BY block_timestamp DESC NULLS FIRST, id DESC LIMIT $2 OFFSET $3;`, para, limit, offset)
if err != nil || rows == nil {
return nil, err
}

View File

@@ -77,7 +77,7 @@ func GetCrossTxClaimInfo(msgHash string, db db.OrmFactory) *UserClaimInfo {
Value: l2sentMsg.Value,
Nonce: strconv.FormatUint(l2sentMsg.Nonce, 10),
Message: l2sentMsg.MsgData,
Proof: l2sentMsg.MsgProof,
Proof: "0x" + l2sentMsg.MsgProof,
BatchHash: batch.BatchHash,
BatchIndex: strconv.FormatUint(l2sentMsg.BatchIndex, 10),
}

View File

@@ -18,6 +18,11 @@ type MsgHashWrapper struct {
TxHash common.Hash
}
type L2SentMsgWrapper struct {
L2SentMsg *orm.L2SentMsg
TxHash common.Hash
}
type CachedParsedTxCalldata struct {
CallDataIndex uint64
BatchIndices []uint64
@@ -81,7 +86,7 @@ func ParseBackendL1EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
Layer1Hash: vlog.TxHash.Hex(),
Layer1Token: event.L1Token.Hex(),
Layer2Token: event.L2Token.Hex(),
TokenID: event.TokenID.Uint64(),
TokenIDs: []string{event.TokenID.String()},
})
case backendabi.L1DepositERC1155Sig:
event := backendabi.ERC1155MessageEvent{}
@@ -98,7 +103,7 @@ func ParseBackendL1EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
Layer1Hash: vlog.TxHash.Hex(),
Layer1Token: event.L1Token.Hex(),
Layer2Token: event.L2Token.Hex(),
TokenID: event.TokenID.Uint64(),
TokenIDs: []string{event.TokenID.String()},
Amount: event.Amount.String(),
})
case backendabi.L1SentMessageEventSignature:
@@ -131,15 +136,14 @@ func ParseBackendL1EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
return l1CrossMsg, msgHashes, relayedMsgs, nil
}
func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrapper, []*orm.RelayedMsg, []*orm.L2SentMsg, error) {
func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []*orm.RelayedMsg, []L2SentMsgWrapper, error) {
// Need use contract abi to parse event Log
// Can only be tested after we have our contracts set up
var l2CrossMsg []*orm.CrossMsg
// this is use to confirm finalized l1 msg
var relayedMsgs []*orm.RelayedMsg
var l2SentMsg []*orm.L2SentMsg
var msgHashes []MsgHashWrapper
var l2SentMsg []L2SentMsgWrapper
for _, vlog := range logs {
switch vlog.Topics[0] {
case backendabi.L2WithdrawETHSig:
@@ -147,7 +151,7 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
err := UnpackLog(backendabi.L2ETHGatewayABI, &event, "WithdrawETH", vlog)
if err != nil {
log.Warn("Failed to unpack WithdrawETH event", "err", err)
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
return l2CrossMsg, relayedMsgs, l2SentMsg, err
}
l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{
Height: vlog.BlockNumber,
@@ -162,7 +166,7 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
err := UnpackLog(backendabi.L2StandardERC20GatewayABI, &event, "WithdrawERC20", vlog)
if err != nil {
log.Warn("Failed to unpack WithdrawERC20 event", "err", err)
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
return l2CrossMsg, relayedMsgs, l2SentMsg, err
}
l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{
Height: vlog.BlockNumber,
@@ -179,7 +183,7 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
err := UnpackLog(backendabi.L2ERC721GatewayABI, &event, "WithdrawERC721", vlog)
if err != nil {
log.Warn("Failed to unpack WithdrawERC721 event", "err", err)
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
return l2CrossMsg, relayedMsgs, l2SentMsg, err
}
l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{
Height: vlog.BlockNumber,
@@ -189,14 +193,14 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
Layer2Hash: vlog.TxHash.Hex(),
Layer1Token: event.L1Token.Hex(),
Layer2Token: event.L2Token.Hex(),
TokenID: event.TokenID.Uint64(),
TokenIDs: []string{event.TokenID.String()},
})
case backendabi.L2WithdrawERC1155Sig:
event := backendabi.ERC1155MessageEvent{}
err := UnpackLog(backendabi.L2ERC1155GatewayABI, &event, "WithdrawERC1155", vlog)
if err != nil {
log.Warn("Failed to unpack WithdrawERC1155 event", "err", err)
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
return l2CrossMsg, relayedMsgs, l2SentMsg, err
}
l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{
Height: vlog.BlockNumber,
@@ -206,7 +210,7 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
Layer2Hash: vlog.TxHash.Hex(),
Layer1Token: event.L1Token.Hex(),
Layer2Token: event.L2Token.Hex(),
TokenID: event.TokenID.Uint64(),
TokenIDs: []string{event.TokenID.String()},
Amount: event.Amount.String(),
})
case backendabi.L2SentMessageEventSignature:
@@ -214,27 +218,28 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
err := UnpackLog(backendabi.L2ScrollMessengerABI, &event, "SentMessage", vlog)
if err != nil {
log.Warn("Failed to unpack SentMessage event", "err", err)
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
return l2CrossMsg, relayedMsgs, l2SentMsg, err
}
msgHash := ComputeMessageHash(event.Sender, event.Target, event.Value, event.MessageNonce, event.Message)
msgHashes = append(msgHashes, MsgHashWrapper{
MsgHash: msgHash,
TxHash: vlog.TxHash})
l2SentMsg = append(l2SentMsg, &orm.L2SentMsg{
Sender: event.Sender.Hex(),
Target: event.Target.Hex(),
Value: event.Value.String(),
MsgHash: msgHash.Hex(),
Height: vlog.BlockNumber,
Nonce: event.MessageNonce.Uint64(),
MsgData: hexutil.Encode(event.Message),
})
l2SentMsg = append(l2SentMsg,
L2SentMsgWrapper{
TxHash: vlog.TxHash,
L2SentMsg: &orm.L2SentMsg{
Sender: event.Sender.Hex(),
Target: event.Target.Hex(),
Value: event.Value.String(),
MsgHash: msgHash.Hex(),
Height: vlog.BlockNumber,
Nonce: event.MessageNonce.Uint64(),
MsgData: hexutil.Encode(event.Message),
},
})
case backendabi.L2RelayedMessageEventSignature:
event := backendabi.L2RelayedMessageEvent{}
err := UnpackLog(backendabi.L2ScrollMessengerABI, &event, "RelayedMessage", vlog)
if err != nil {
log.Warn("Failed to unpack RelayedMessage event", "err", err)
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
return l2CrossMsg, relayedMsgs, l2SentMsg, err
}
relayedMsgs = append(relayedMsgs, &orm.RelayedMsg{
MsgHash: event.MessageHash.String(),
@@ -244,7 +249,7 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
}
}
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, nil
return l2CrossMsg, relayedMsgs, l2SentMsg, nil
}
func ParseBatchInfoFromScrollChain(ctx context.Context, client *ethclient.Client, logs []types.Log) ([]*orm.RollupBatch, error) {

View File

@@ -44,11 +44,10 @@ type Layer2Relayer struct {
l2Client *ethclient.Client
db *gorm.DB
batchOrm *orm.Batch
chunkOrm *orm.Chunk
l2BlockOrm *orm.L2Block
l2MessageOrm *orm.L2Message
db *gorm.DB
batchOrm *orm.Batch
chunkOrm *orm.Chunk
l2BlockOrm *orm.L2Block
cfg *config.RelayerConfig
@@ -120,10 +119,9 @@ func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db *gorm.
ctx: ctx,
db: db,
batchOrm: orm.NewBatch(db),
l2MessageOrm: orm.NewL2Message(db),
l2BlockOrm: orm.NewL2Block(db),
chunkOrm: orm.NewChunk(db),
batchOrm: orm.NewBatch(db),
l2BlockOrm: orm.NewL2Block(db),
chunkOrm: orm.NewChunk(db),
l2Client: l2Client,
@@ -521,7 +519,6 @@ func (r *Layer2Relayer) ProcessCommittedBatches() {
func (r *Layer2Relayer) handleConfirmation(confirmation *sender.Confirmation) {
transactionType := "Unknown"
// check whether it is CommitBatches transaction
if batchHash, ok := r.processingCommitment.Load(confirmation.ID); ok {
transactionType = "BatchesCommitment"

View File

@@ -24,10 +24,9 @@ import (
)
var (
bridgeL1MsgsSyncHeightGauge = gethMetrics.NewRegisteredGauge("bridge/l1/msgs/sync/height", metrics.ScrollRegistry)
bridgeL1MsgsSentEventsTotalCounter = gethMetrics.NewRegisteredCounter("bridge/l1/msgs/sent/events/total", metrics.ScrollRegistry)
bridgeL1MsgsRelayedEventsTotalCounter = gethMetrics.NewRegisteredCounter("bridge/l1/msgs/relayed/events/total", metrics.ScrollRegistry)
bridgeL1MsgsRollupEventsTotalCounter = gethMetrics.NewRegisteredCounter("bridge/l1/msgs/rollup/events/total", metrics.ScrollRegistry)
bridgeL1MsgsSyncHeightGauge = gethMetrics.NewRegisteredGauge("bridge/l1/msgs/sync/height", metrics.ScrollRegistry)
bridgeL1MsgsSentEventsTotalCounter = gethMetrics.NewRegisteredCounter("bridge/l1/msgs/sent/events/total", metrics.ScrollRegistry)
bridgeL1MsgsRollupEventsTotalCounter = gethMetrics.NewRegisteredCounter("bridge/l1/msgs/rollup/events/total", metrics.ScrollRegistry)
)
type rollupEvent struct {
@@ -41,7 +40,6 @@ type L1WatcherClient struct {
ctx context.Context
client *ethclient.Client
l1MessageOrm *orm.L1Message
l2MessageOrm *orm.L2Message
l1BlockOrm *orm.L1Block
batchOrm *orm.Batch
@@ -91,7 +89,6 @@ func NewL1WatcherClient(ctx context.Context, client *ethclient.Client, startHeig
l1MessageOrm: l1MessageOrm,
l1BlockOrm: l1BlockOrm,
batchOrm: orm.NewBatch(db),
l2MessageOrm: orm.NewL2Message(db),
confirmations: confirmations,
messengerAddress: messengerAddress,
@@ -227,18 +224,16 @@ func (w *L1WatcherClient) FetchContractEvent() error {
}
log.Info("Received new L1 events", "fromBlock", from, "toBlock", to, "cnt", len(logs))
sentMessageEvents, relayedMessageEvents, rollupEvents, err := w.parseBridgeEventLogs(logs)
sentMessageEvents, rollupEvents, err := w.parseBridgeEventLogs(logs)
if err != nil {
log.Error("Failed to parse emitted events log", "err", err)
return err
}
sentMessageCount := int64(len(sentMessageEvents))
relayedMessageCount := int64(len(relayedMessageEvents))
rollupEventCount := int64(len(rollupEvents))
bridgeL1MsgsSentEventsTotalCounter.Inc(sentMessageCount)
bridgeL1MsgsRelayedEventsTotalCounter.Inc(relayedMessageCount)
bridgeL1MsgsRollupEventsTotalCounter.Inc(rollupEventCount)
log.Info("L1 events types", "SentMessageCount", sentMessageCount, "RelayedMessageCount", relayedMessageCount, "RollupEventCount", rollupEventCount)
log.Info("L1 events types", "SentMessageCount", sentMessageCount, "RollupEventCount", rollupEventCount)
// use rollup event to update rollup results db status
var batchHashes []string
@@ -272,21 +267,6 @@ func (w *L1WatcherClient) FetchContractEvent() error {
}
}
// Update relayed message first to make sure we don't forget to update submitted message.
// Since, we always start sync from the latest unprocessed message.
for _, msg := range relayedMessageEvents {
var msgStatus types.MsgStatus
if msg.isSuccessful {
msgStatus = types.MsgConfirmed
} else {
msgStatus = types.MsgFailed
}
if err = w.l2MessageOrm.UpdateLayer2StatusAndLayer1Hash(w.ctx, msg.msgHash.String(), msgStatus, msg.txHash.String()); err != nil {
log.Error("Failed to update layer1 status and layer2 hash", "err", err)
return err
}
}
if err = w.l1MessageOrm.SaveL1Messages(w.ctx, sentMessageEvents); err != nil {
return err
}
@@ -298,11 +278,10 @@ func (w *L1WatcherClient) FetchContractEvent() error {
return nil
}
func (w *L1WatcherClient) parseBridgeEventLogs(logs []gethTypes.Log) ([]*orm.L1Message, []relayedMessage, []rollupEvent, error) {
func (w *L1WatcherClient) parseBridgeEventLogs(logs []gethTypes.Log) ([]*orm.L1Message, []rollupEvent, error) {
// Need use contract abi to parse event Log
// Can only be tested after we have our contracts set up
var l1Messages []*orm.L1Message
var relayedMessages []relayedMessage
var rollupEvents []rollupEvent
for _, vLog := range logs {
switch vLog.Topics[0] {
@@ -311,7 +290,7 @@ func (w *L1WatcherClient) parseBridgeEventLogs(logs []gethTypes.Log) ([]*orm.L1M
err := utils.UnpackLog(w.messageQueueABI, &event, "QueueTransaction", vLog)
if err != nil {
log.Warn("Failed to unpack layer1 QueueTransaction event", "err", err)
return l1Messages, relayedMessages, rollupEvents, err
return l1Messages, rollupEvents, err
}
msgHash := common.BytesToHash(crypto.Keccak256(event.Data))
@@ -327,38 +306,12 @@ func (w *L1WatcherClient) parseBridgeEventLogs(logs []gethTypes.Log) ([]*orm.L1M
GasLimit: event.GasLimit.Uint64(),
Layer1Hash: vLog.TxHash.Hex(),
})
case bridgeAbi.L1RelayedMessageEventSignature:
event := bridgeAbi.L1RelayedMessageEvent{}
err := utils.UnpackLog(w.messengerABI, &event, "RelayedMessage", vLog)
if err != nil {
log.Warn("Failed to unpack layer1 RelayedMessage event", "err", err)
return l1Messages, relayedMessages, rollupEvents, err
}
relayedMessages = append(relayedMessages, relayedMessage{
msgHash: event.MessageHash,
txHash: vLog.TxHash,
isSuccessful: true,
})
case bridgeAbi.L1FailedRelayedMessageEventSignature:
event := bridgeAbi.L1FailedRelayedMessageEvent{}
err := utils.UnpackLog(w.messengerABI, &event, "FailedRelayedMessage", vLog)
if err != nil {
log.Warn("Failed to unpack layer1 FailedRelayedMessage event", "err", err)
return l1Messages, relayedMessages, rollupEvents, err
}
relayedMessages = append(relayedMessages, relayedMessage{
msgHash: event.MessageHash,
txHash: vLog.TxHash,
isSuccessful: false,
})
case bridgeAbi.L1CommitBatchEventSignature:
event := bridgeAbi.L1CommitBatchEvent{}
err := utils.UnpackLog(w.scrollChainABI, &event, "CommitBatch", vLog)
if err != nil {
log.Warn("Failed to unpack layer1 CommitBatch event", "err", err)
return l1Messages, relayedMessages, rollupEvents, err
return l1Messages, rollupEvents, err
}
rollupEvents = append(rollupEvents, rollupEvent{
@@ -371,7 +324,7 @@ func (w *L1WatcherClient) parseBridgeEventLogs(logs []gethTypes.Log) ([]*orm.L1M
err := utils.UnpackLog(w.scrollChainABI, &event, "FinalizeBatch", vLog)
if err != nil {
log.Warn("Failed to unpack layer1 FinalizeBatch event", "err", err)
return l1Messages, relayedMessages, rollupEvents, err
return l1Messages, rollupEvents, err
}
rollupEvents = append(rollupEvents, rollupEvent{
@@ -384,5 +337,5 @@ func (w *L1WatcherClient) parseBridgeEventLogs(logs []gethTypes.Log) ([]*orm.L1M
}
}
return l1Messages, relayedMessages, rollupEvents, nil
return l1Messages, rollupEvents, nil
}

View File

@@ -159,14 +159,14 @@ func testL1WatcherClientFetchContractEvent(t *testing.T) {
convey.Convey("parse bridge event logs failure", t, func() {
targetErr := errors.New("parse log failure")
patchGuard.ApplyPrivateMethod(watcher, "parseBridgeEventLogs", func(*L1WatcherClient, []types.Log) ([]*orm.L1Message, []relayedMessage, []rollupEvent, error) {
return nil, nil, nil, targetErr
patchGuard.ApplyPrivateMethod(watcher, "parseBridgeEventLogs", func(*L1WatcherClient, []types.Log) ([]*orm.L1Message, []rollupEvent, error) {
return nil, nil, targetErr
})
err := watcher.FetchContractEvent()
assert.EqualError(t, err, targetErr.Error())
})
patchGuard.ApplyPrivateMethod(watcher, "parseBridgeEventLogs", func(*L1WatcherClient, []types.Log) ([]*orm.L1Message, []relayedMessage, []rollupEvent, error) {
patchGuard.ApplyPrivateMethod(watcher, "parseBridgeEventLogs", func(*L1WatcherClient, []types.Log) ([]*orm.L1Message, []rollupEvent, error) {
rollupEvents := []rollupEvent{
{
batchHash: common.HexToHash("0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"),
@@ -179,20 +179,7 @@ func testL1WatcherClientFetchContractEvent(t *testing.T) {
status: commonTypes.RollupCommitted,
},
}
relayedMessageEvents := []relayedMessage{
{
msgHash: common.HexToHash("0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"),
txHash: common.HexToHash("0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5"),
isSuccessful: true,
},
{
msgHash: common.HexToHash("0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5"),
txHash: common.HexToHash("0xb4c11951957c6f8f642c4af61cd6b24640fec6dc7fc607ee8206a99e92410d30"),
isSuccessful: false,
},
}
return nil, relayedMessageEvents, rollupEvents, nil
return nil, rollupEvents, nil
})
var batchOrm *orm.Batch
@@ -250,20 +237,6 @@ func testL1WatcherClientFetchContractEvent(t *testing.T) {
return nil
})
var l2MessageOrm *orm.L2Message
convey.Convey("db update layer2 status and layer1 hash failure", t, func() {
targetErr := errors.New("UpdateLayer2StatusAndLayer1Hash failure")
patchGuard.ApplyMethodFunc(l2MessageOrm, "UpdateLayer2StatusAndLayer1Hash", func(context.Context, string, commonTypes.MsgStatus, string) error {
return targetErr
})
err := watcher.FetchContractEvent()
assert.Equal(t, targetErr.Error(), err.Error())
})
patchGuard.ApplyMethodFunc(l2MessageOrm, "UpdateLayer2StatusAndLayer1Hash", func(context.Context, string, commonTypes.MsgStatus, string) error {
return nil
})
var l1MessageOrm *orm.L1Message
convey.Convey("db save l1 message failure", t, func() {
targetErr := errors.New("SaveL1Messages failure")
@@ -303,10 +276,9 @@ func testParseBridgeEventLogsL1QueueTransactionEventSignature(t *testing.T) {
})
defer patchGuard.Reset()
l2Messages, relayedMessages, rollupEvents, err := watcher.parseBridgeEventLogs(logs)
l2Messages, rollupEvents, err := watcher.parseBridgeEventLogs(logs)
assert.EqualError(t, err, targetErr.Error())
assert.Empty(t, l2Messages)
assert.Empty(t, relayedMessages)
assert.Empty(t, rollupEvents)
})
@@ -323,102 +295,14 @@ func testParseBridgeEventLogsL1QueueTransactionEventSignature(t *testing.T) {
})
defer patchGuard.Reset()
l2Messages, relayedMessages, rollupEvents, err := watcher.parseBridgeEventLogs(logs)
l2Messages, rollupEvents, err := watcher.parseBridgeEventLogs(logs)
assert.NoError(t, err)
assert.Empty(t, relayedMessages)
assert.Empty(t, rollupEvents)
assert.Len(t, l2Messages, 1)
assert.Equal(t, l2Messages[0].Value, big.NewInt(1000).String())
})
}
func testParseBridgeEventLogsL1RelayedMessageEventSignature(t *testing.T) {
watcher, db := setupL1Watcher(t)
defer utils.CloseDB(db)
logs := []types.Log{
{
Topics: []common.Hash{bridgeAbi.L1RelayedMessageEventSignature},
BlockNumber: 100,
TxHash: common.HexToHash("0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"),
},
}
convey.Convey("unpack RelayedMessage log failure", t, func() {
targetErr := errors.New("UnpackLog RelayedMessage failure")
patchGuard := gomonkey.ApplyFunc(utils.UnpackLog, func(c *abi.ABI, out interface{}, event string, log types.Log) error {
return targetErr
})
defer patchGuard.Reset()
l2Messages, relayedMessages, rollupEvents, err := watcher.parseBridgeEventLogs(logs)
assert.EqualError(t, err, targetErr.Error())
assert.Empty(t, l2Messages)
assert.Empty(t, relayedMessages)
assert.Empty(t, rollupEvents)
})
convey.Convey("L1RelayedMessageEventSignature success", t, func() {
msgHash := common.HexToHash("0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5")
patchGuard := gomonkey.ApplyFunc(utils.UnpackLog, func(c *abi.ABI, out interface{}, event string, log types.Log) error {
tmpOut := out.(*bridgeAbi.L1RelayedMessageEvent)
tmpOut.MessageHash = msgHash
return nil
})
defer patchGuard.Reset()
l2Messages, relayedMessages, rollupEvents, err := watcher.parseBridgeEventLogs(logs)
assert.NoError(t, err)
assert.Empty(t, l2Messages)
assert.Empty(t, rollupEvents)
assert.Len(t, relayedMessages, 1)
assert.Equal(t, relayedMessages[0].msgHash, msgHash)
})
}
func testParseBridgeEventLogsL1FailedRelayedMessageEventSignature(t *testing.T) {
watcher, db := setupL1Watcher(t)
defer utils.CloseDB(db)
logs := []types.Log{
{
Topics: []common.Hash{bridgeAbi.L1FailedRelayedMessageEventSignature},
BlockNumber: 100,
TxHash: common.HexToHash("0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"),
},
}
convey.Convey("unpack FailedRelayedMessage log failure", t, func() {
targetErr := errors.New("UnpackLog FailedRelayedMessage failure")
patchGuard := gomonkey.ApplyFunc(utils.UnpackLog, func(c *abi.ABI, out interface{}, event string, log types.Log) error {
return targetErr
})
defer patchGuard.Reset()
l2Messages, relayedMessages, rollupEvents, err := watcher.parseBridgeEventLogs(logs)
assert.EqualError(t, err, targetErr.Error())
assert.Empty(t, l2Messages)
assert.Empty(t, relayedMessages)
assert.Empty(t, rollupEvents)
})
convey.Convey("L1FailedRelayedMessageEventSignature success", t, func() {
msgHash := common.HexToHash("0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5")
patchGuard := gomonkey.ApplyFunc(utils.UnpackLog, func(c *abi.ABI, out interface{}, event string, log types.Log) error {
tmpOut := out.(*bridgeAbi.L1FailedRelayedMessageEvent)
tmpOut.MessageHash = msgHash
return nil
})
defer patchGuard.Reset()
l2Messages, relayedMessages, rollupEvents, err := watcher.parseBridgeEventLogs(logs)
assert.NoError(t, err)
assert.Empty(t, l2Messages)
assert.Empty(t, rollupEvents)
assert.Len(t, relayedMessages, 1)
assert.Equal(t, relayedMessages[0].msgHash, msgHash)
})
}
func testParseBridgeEventLogsL1CommitBatchEventSignature(t *testing.T) {
watcher, db := setupL1Watcher(t)
defer utils.CloseDB(db)
@@ -437,10 +321,9 @@ func testParseBridgeEventLogsL1CommitBatchEventSignature(t *testing.T) {
})
defer patchGuard.Reset()
l2Messages, relayedMessages, rollupEvents, err := watcher.parseBridgeEventLogs(logs)
l2Messages, rollupEvents, err := watcher.parseBridgeEventLogs(logs)
assert.EqualError(t, err, targetErr.Error())
assert.Empty(t, l2Messages)
assert.Empty(t, relayedMessages)
assert.Empty(t, rollupEvents)
})
@@ -453,10 +336,9 @@ func testParseBridgeEventLogsL1CommitBatchEventSignature(t *testing.T) {
})
defer patchGuard.Reset()
l2Messages, relayedMessages, rollupEvents, err := watcher.parseBridgeEventLogs(logs)
l2Messages, rollupEvents, err := watcher.parseBridgeEventLogs(logs)
assert.NoError(t, err)
assert.Empty(t, l2Messages)
assert.Empty(t, relayedMessages)
assert.Len(t, rollupEvents, 1)
assert.Equal(t, rollupEvents[0].batchHash, msgHash)
assert.Equal(t, rollupEvents[0].status, commonTypes.RollupCommitted)
@@ -481,10 +363,9 @@ func testParseBridgeEventLogsL1FinalizeBatchEventSignature(t *testing.T) {
})
defer patchGuard.Reset()
l2Messages, relayedMessages, rollupEvents, err := watcher.parseBridgeEventLogs(logs)
l2Messages, rollupEvents, err := watcher.parseBridgeEventLogs(logs)
assert.EqualError(t, err, targetErr.Error())
assert.Empty(t, l2Messages)
assert.Empty(t, relayedMessages)
assert.Empty(t, rollupEvents)
})
@@ -497,10 +378,9 @@ func testParseBridgeEventLogsL1FinalizeBatchEventSignature(t *testing.T) {
})
defer patchGuard.Reset()
l2Messages, relayedMessages, rollupEvents, err := watcher.parseBridgeEventLogs(logs)
l2Messages, rollupEvents, err := watcher.parseBridgeEventLogs(logs)
assert.NoError(t, err)
assert.Empty(t, l2Messages)
assert.Empty(t, relayedMessages)
assert.Len(t, rollupEvents, 1)
assert.Equal(t, rollupEvents[0].batchHash, msgHash)
assert.Equal(t, rollupEvents[0].status, commonTypes.RollupFinalized)

View File

@@ -2,7 +2,6 @@ package watcher
import (
"context"
"errors"
"fmt"
"math/big"
@@ -32,8 +31,6 @@ var (
bridgeL2MsgsSyncHeightGauge = gethMetrics.NewRegisteredGauge("bridge/l2/msgs/sync/height", metrics.ScrollRegistry)
bridgeL2BlocksFetchedHeightGauge = gethMetrics.NewRegisteredGauge("bridge/l2/blocks/fetched/height", metrics.ScrollRegistry)
bridgeL2BlocksFetchedGapGauge = gethMetrics.NewRegisteredGauge("bridge/l2/blocks/fetched/gap", metrics.ScrollRegistry)
bridgeL2MsgsSentEventsTotalCounter = gethMetrics.NewRegisteredCounter("bridge/l2/msgs/sent/events/total", metrics.ScrollRegistry)
bridgeL2MsgsAppendEventsTotalCounter = gethMetrics.NewRegisteredCounter("bridge/l2/msgs/append/events/total", metrics.ScrollRegistry)
bridgeL2MsgsRelayedEventsTotalCounter = gethMetrics.NewRegisteredCounter("bridge/l2/msgs/relayed/events/total", metrics.ScrollRegistry)
)
@@ -46,7 +43,6 @@ type L2WatcherClient struct {
l2BlockOrm *orm.L2Block
l1MessageOrm *orm.L1Message
l2MessageOrm *orm.L2Message
confirmations rpc.BlockNumber
@@ -65,11 +61,20 @@ type L2WatcherClient struct {
// NewL2WatcherClient take a l2geth instance to generate a l2watcherclient instance
func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmations rpc.BlockNumber, messengerAddress, messageQueueAddress common.Address, withdrawTrieRootSlot common.Hash, db *gorm.DB) *L2WatcherClient {
l2MessageOrm := orm.NewL2Message(db)
savedHeight, err := l2MessageOrm.GetLayer2LatestWatchedHeight()
if err != nil {
l1MessageOrm := orm.NewL1Message(db)
var savedHeight uint64
l1msg, err := l1MessageOrm.GetLayer1LatestMessageWithLayer2Hash()
if err != nil || l1msg == nil {
log.Warn("fetch height from db failed", "err", err)
savedHeight = 0
} else {
receipt, err := client.TransactionReceipt(ctx, common.HexToHash(l1msg.Layer2Hash))
if err != nil || receipt == nil {
log.Warn("get tx from l2 failed", "err", err)
savedHeight = 0
} else {
savedHeight = receipt.BlockNumber.Uint64()
}
}
w := L2WatcherClient{
@@ -78,8 +83,7 @@ func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmat
l2BlockOrm: orm.NewL2Block(db),
l1MessageOrm: orm.NewL1Message(db),
l2MessageOrm: l2MessageOrm,
processedMsgHeight: uint64(savedHeight),
processedMsgHeight: savedHeight,
confirmations: confirmations,
messengerAddress: messengerAddress,
@@ -126,10 +130,20 @@ func txsToTxsData(txs gethTypes.Transactions) []*gethTypes.TransactionData {
txsData := make([]*gethTypes.TransactionData, len(txs))
for i, tx := range txs {
v, r, s := tx.RawSignatureValues()
nonce := tx.Nonce()
// We need QueueIndex in `NewBatchHeader`. However, `TransactionData`
// does not have this field. Since `L1MessageTx` do not have a nonce,
// we reuse this field for storing the queue index.
if msg := tx.AsL1MessageTx(); msg != nil {
nonce = msg.QueueIndex
}
txsData[i] = &gethTypes.TransactionData{
Type: tx.Type(),
TxHash: tx.Hash().String(),
Nonce: tx.Nonce(),
Nonce: nonce,
ChainId: (*hexutil.Big)(tx.ChainId()),
Gas: tx.Gas(),
GasPrice: (*hexutil.Big)(tx.GasPrice()),
@@ -227,17 +241,15 @@ func (w *L2WatcherClient) FetchContractEvent() {
}
log.Info("received new L2 messages", "fromBlock", from, "toBlock", to, "cnt", len(logs))
sentMessageEvents, relayedMessageEvents, err := w.parseBridgeEventLogs(logs)
relayedMessageEvents, err := w.parseBridgeEventLogs(logs)
if err != nil {
log.Error("failed to parse emitted event log", "err", err)
return
}
sentMessageCount := int64(len(sentMessageEvents))
relayedMessageCount := int64(len(relayedMessageEvents))
bridgeL2MsgsSentEventsTotalCounter.Inc(sentMessageCount)
bridgeL2MsgsRelayedEventsTotalCounter.Inc(relayedMessageCount)
log.Info("L2 events types", "SentMessageCount", sentMessageCount, "RelayedMessageCount", relayedMessageCount)
log.Info("L2 events types", "RelayedMessageCount", relayedMessageCount)
// Update relayed message first to make sure we don't forget to update submited message.
// Since, we always start sync from the latest unprocessed message.
@@ -254,71 +266,24 @@ func (w *L2WatcherClient) FetchContractEvent() {
}
}
if err = w.l2MessageOrm.SaveL2Messages(w.ctx, sentMessageEvents); err != nil {
log.Error("failed to save l2 messages", "err", err)
return
}
w.processedMsgHeight = uint64(to)
bridgeL2MsgsSyncHeightGauge.Update(to)
}
}
func (w *L2WatcherClient) parseBridgeEventLogs(logs []gethTypes.Log) ([]orm.L2Message, []relayedMessage, error) {
func (w *L2WatcherClient) parseBridgeEventLogs(logs []gethTypes.Log) ([]relayedMessage, error) {
// Need use contract abi to parse event Log
// Can only be tested after we have our contracts set up
var l2Messages []orm.L2Message
var relayedMessages []relayedMessage
var lastAppendMsgHash common.Hash
var lastAppendMsgNonce uint64
for _, vLog := range logs {
switch vLog.Topics[0] {
case bridgeAbi.L2SentMessageEventSignature:
event := bridgeAbi.L2SentMessageEvent{}
err := utils.UnpackLog(w.messengerABI, &event, "SentMessage", vLog)
if err != nil {
log.Error("failed to unpack layer2 SentMessage event", "err", err)
return l2Messages, relayedMessages, err
}
computedMsgHash := utils.ComputeMessageHash(
event.Sender,
event.Target,
event.Value,
event.MessageNonce,
event.Message,
)
// `AppendMessage` event is always emitted before `SentMessage` event
// So they should always match, just double check
if event.MessageNonce.Uint64() != lastAppendMsgNonce {
errMsg := fmt.Sprintf("l2 message nonces mismatch: AppendMessage.nonce=%v, SentMessage.nonce=%v, tx_hash=%v",
lastAppendMsgNonce, event.MessageNonce.Uint64(), vLog.TxHash.Hex())
return l2Messages, relayedMessages, errors.New(errMsg)
}
if computedMsgHash != lastAppendMsgHash {
errMsg := fmt.Sprintf("l2 message hashes mismatch: AppendMessage.msg_hash=%v, SentMessage.msg_hash=%v, tx_hash=%v",
lastAppendMsgHash.Hex(), computedMsgHash.Hex(), vLog.TxHash.Hex())
return l2Messages, relayedMessages, errors.New(errMsg)
}
l2Messages = append(l2Messages, orm.L2Message{
Nonce: event.MessageNonce.Uint64(),
MsgHash: computedMsgHash.String(),
Height: vLog.BlockNumber,
Sender: event.Sender.String(),
Value: event.Value.String(),
Target: event.Target.String(),
Calldata: common.Bytes2Hex(event.Message),
Layer2Hash: vLog.TxHash.Hex(),
})
case bridgeAbi.L2RelayedMessageEventSignature:
event := bridgeAbi.L2RelayedMessageEvent{}
err := utils.UnpackLog(w.messengerABI, &event, "RelayedMessage", vLog)
if err != nil {
log.Warn("Failed to unpack layer2 RelayedMessage event", "err", err)
return l2Messages, relayedMessages, err
return relayedMessages, err
}
relayedMessages = append(relayedMessages, relayedMessage{
@@ -331,7 +296,7 @@ func (w *L2WatcherClient) parseBridgeEventLogs(logs []gethTypes.Log) ([]orm.L2Me
err := utils.UnpackLog(w.messengerABI, &event, "FailedRelayedMessage", vLog)
if err != nil {
log.Warn("Failed to unpack layer2 FailedRelayedMessage event", "err", err)
return l2Messages, relayedMessages, err
return relayedMessages, err
}
relayedMessages = append(relayedMessages, relayedMessage{
@@ -339,21 +304,9 @@ func (w *L2WatcherClient) parseBridgeEventLogs(logs []gethTypes.Log) ([]orm.L2Me
txHash: vLog.TxHash,
isSuccessful: false,
})
case bridgeAbi.L2AppendMessageEventSignature:
event := bridgeAbi.L2AppendMessageEvent{}
err := utils.UnpackLog(w.messageQueueABI, &event, "AppendMessage", vLog)
if err != nil {
log.Warn("Failed to unpack layer2 AppendMessage event", "err", err)
return l2Messages, relayedMessages, err
}
lastAppendMsgHash = event.MessageHash
lastAppendMsgNonce = event.Index.Uint64()
bridgeL2MsgsAppendEventsTotalCounter.Inc(1)
default:
log.Error("Unknown event", "topic", vLog.Topics[0], "txHash", vLog.TxHash)
}
}
return l2Messages, relayedMessages, nil
return relayedMessages, nil
}

View File

@@ -21,7 +21,6 @@ import (
"github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/assert"
"scroll-tech/common/types"
cutils "scroll-tech/common/utils"
bridgeAbi "scroll-tech/bridge/abi"
@@ -67,137 +66,6 @@ func testCreateNewWatcherAndStop(t *testing.T) {
assert.GreaterOrEqual(t, blockNum, uint64(numTransactions))
}
func testMonitorBridgeContract(t *testing.T) {
wc, db := setupL2Watcher(t)
subCtx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
defer utils.CloseDB(db)
}()
loopToFetchEvent(subCtx, wc)
previousHeight, err := l2Cli.BlockNumber(context.Background())
assert.NoError(t, err)
auth := prepareAuth(t, l2Cli, cfg.L2Config.RelayerConfig.MessageSenderPrivateKeys[0])
// deploy mock bridge
_, tx, instance, err := mock_bridge.DeployMockBridgeL2(auth, l2Cli)
assert.NoError(t, err)
address, err := bind.WaitDeployed(context.Background(), l2Cli, tx)
assert.NoError(t, err)
rc := prepareWatcherClient(l2Cli, db, address)
loopToFetchEvent(subCtx, rc)
// Call mock_bridge instance sendMessage to trigger emit events
toAddress := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
message := []byte("testbridgecontract")
fee := big.NewInt(0)
gasLimit := big.NewInt(1)
tx, err = instance.SendMessage(auth, toAddress, fee, message, gasLimit)
assert.NoError(t, err)
receipt, err := bind.WaitMined(context.Background(), l2Cli, tx)
if receipt.Status != gethTypes.ReceiptStatusSuccessful || err != nil {
t.Fatalf("Call failed")
}
// extra block mined
toAddress = common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
message = []byte("testbridgecontract")
tx, err = instance.SendMessage(auth, toAddress, fee, message, gasLimit)
assert.NoError(t, err)
receipt, err = bind.WaitMined(context.Background(), l2Cli, tx)
if receipt.Status != gethTypes.ReceiptStatusSuccessful || err != nil {
t.Fatalf("Call failed")
}
l2MessageOrm := orm.NewL2Message(db)
// check if we successfully stored events
assert.True(t, cutils.TryTimes(10, func() bool {
height, err := l2MessageOrm.GetLayer2LatestWatchedHeight()
return err == nil && height > int64(previousHeight)
}))
// check l1 messages.
assert.True(t, cutils.TryTimes(10, func() bool {
msgs, err := l2MessageOrm.GetL2Messages(map[string]interface{}{"status": types.MsgPending}, nil, 0)
return err == nil && len(msgs) == 2
}))
}
func testFetchMultipleSentMessageInOneBlock(t *testing.T) {
_, db := setupL2Watcher(t)
subCtx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
defer utils.CloseDB(db)
}()
previousHeight, err := l2Cli.BlockNumber(context.Background()) // shallow the global previousHeight
assert.NoError(t, err)
auth := prepareAuth(t, l2Cli, cfg.L2Config.RelayerConfig.MessageSenderPrivateKeys[0])
_, trx, instance, err := mock_bridge.DeployMockBridgeL2(auth, l2Cli)
assert.NoError(t, err)
address, err := bind.WaitDeployed(context.Background(), l2Cli, trx)
assert.NoError(t, err)
wc := prepareWatcherClient(l2Cli, db, address)
loopToFetchEvent(subCtx, wc)
// Call mock_bridge instance sendMessage to trigger emit events multiple times
numTransactions := 4
var tx *gethTypes.Transaction
for i := 0; i < numTransactions; i++ {
addr := common.HexToAddress("0x1c5a77d9fa7ef466951b2f01f724bca3a5820b63")
nonce, nounceErr := l2Cli.PendingNonceAt(context.Background(), addr)
assert.NoError(t, nounceErr)
auth.Nonce = big.NewInt(int64(nonce))
toAddress := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
message := []byte("testbridgecontract")
fee := big.NewInt(0)
gasLimit := big.NewInt(1)
tx, err = instance.SendMessage(auth, toAddress, fee, message, gasLimit)
assert.NoError(t, err)
}
receipt, err := bind.WaitMined(context.Background(), l2Cli, tx)
if receipt.Status != gethTypes.ReceiptStatusSuccessful || err != nil {
t.Fatalf("Call failed")
}
// extra block mined
addr := common.HexToAddress("0x1c5a77d9fa7ef466951b2f01f724bca3a5820b63")
nonce, nounceErr := l2Cli.PendingNonceAt(context.Background(), addr)
assert.NoError(t, nounceErr)
auth.Nonce = big.NewInt(int64(nonce))
toAddress := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
message := []byte("testbridgecontract")
fee := big.NewInt(0)
gasLimit := big.NewInt(1)
tx, err = instance.SendMessage(auth, toAddress, fee, message, gasLimit)
assert.NoError(t, err)
receipt, err = bind.WaitMined(context.Background(), l2Cli, tx)
if receipt.Status != gethTypes.ReceiptStatusSuccessful || err != nil {
t.Fatalf("Call failed")
}
l2MessageOrm := orm.NewL2Message(db)
// check if we successfully stored events
assert.True(t, cutils.TryTimes(10, func() bool {
height, err := l2MessageOrm.GetLayer2LatestWatchedHeight()
return err == nil && height > int64(previousHeight)
}))
assert.True(t, cutils.TryTimes(10, func() bool {
msgs, err := l2MessageOrm.GetL2Messages(map[string]interface{}{"status": types.MsgPending}, nil, 0)
return err == nil && len(msgs) == 5
}))
}
func testFetchRunningMissingBlocks(t *testing.T) {
_, db := setupL2Watcher(t)
defer utils.CloseDB(db)
@@ -244,57 +112,6 @@ func loopToFetchEvent(subCtx context.Context, watcher *L2WatcherClient) {
go cutils.Loop(subCtx, 2*time.Second, watcher.FetchContractEvent)
}
func testParseBridgeEventLogsL2SentMessageEventSignature(t *testing.T) {
watcher, db := setupL2Watcher(t)
defer utils.CloseDB(db)
logs := []gethTypes.Log{
{
Topics: []common.Hash{
bridgeAbi.L2SentMessageEventSignature,
},
BlockNumber: 100,
TxHash: common.HexToHash("0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"),
},
}
convey.Convey("unpack SentMessage log failure", t, func() {
targetErr := errors.New("UnpackLog SentMessage failure")
patchGuard := gomonkey.ApplyFunc(utils.UnpackLog, func(c *abi.ABI, out interface{}, event string, log gethTypes.Log) error {
return targetErr
})
defer patchGuard.Reset()
l2Messages, relayedMessages, err := watcher.parseBridgeEventLogs(logs)
assert.EqualError(t, err, targetErr.Error())
assert.Empty(t, l2Messages)
assert.Empty(t, relayedMessages)
})
convey.Convey("L2SentMessageEventSignature success", t, func() {
tmpSendAddr := common.HexToAddress("0xb4c11951957c6f8f642c4af61cd6b24640fec6dc7fc607ee8206a99e92410d30")
tmpTargetAddr := common.HexToAddress("0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5")
tmpValue := big.NewInt(1000)
tmpMessageNonce := big.NewInt(100)
tmpMessage := []byte("test for L2SentMessageEventSignature")
patchGuard := gomonkey.ApplyFunc(utils.UnpackLog, func(c *abi.ABI, out interface{}, event string, log gethTypes.Log) error {
tmpOut := out.(*bridgeAbi.L2SentMessageEvent)
tmpOut.Sender = tmpSendAddr
tmpOut.Value = tmpValue
tmpOut.Target = tmpTargetAddr
tmpOut.MessageNonce = tmpMessageNonce
tmpOut.Message = tmpMessage
return nil
})
defer patchGuard.Reset()
l2Messages, relayedMessages, err := watcher.parseBridgeEventLogs(logs)
assert.Error(t, err)
assert.Empty(t, relayedMessages)
assert.Empty(t, l2Messages)
})
}
func testParseBridgeEventLogsL2RelayedMessageEventSignature(t *testing.T) {
watcher, db := setupL2Watcher(t)
defer utils.CloseDB(db)
@@ -314,9 +131,8 @@ func testParseBridgeEventLogsL2RelayedMessageEventSignature(t *testing.T) {
})
defer patchGuard.Reset()
l2Messages, relayedMessages, err := watcher.parseBridgeEventLogs(logs)
relayedMessages, err := watcher.parseBridgeEventLogs(logs)
assert.EqualError(t, err, targetErr.Error())
assert.Empty(t, l2Messages)
assert.Empty(t, relayedMessages)
})
@@ -329,9 +145,8 @@ func testParseBridgeEventLogsL2RelayedMessageEventSignature(t *testing.T) {
})
defer patchGuard.Reset()
l2Messages, relayedMessages, err := watcher.parseBridgeEventLogs(logs)
relayedMessages, err := watcher.parseBridgeEventLogs(logs)
assert.NoError(t, err)
assert.Empty(t, l2Messages)
assert.Len(t, relayedMessages, 1)
assert.Equal(t, relayedMessages[0].msgHash, msgHash)
})
@@ -356,9 +171,8 @@ func testParseBridgeEventLogsL2FailedRelayedMessageEventSignature(t *testing.T)
})
defer patchGuard.Reset()
l2Messages, relayedMessages, err := watcher.parseBridgeEventLogs(logs)
relayedMessages, err := watcher.parseBridgeEventLogs(logs)
assert.EqualError(t, err, targetErr.Error())
assert.Empty(t, l2Messages)
assert.Empty(t, relayedMessages)
})
@@ -371,51 +185,9 @@ func testParseBridgeEventLogsL2FailedRelayedMessageEventSignature(t *testing.T)
})
defer patchGuard.Reset()
l2Messages, relayedMessages, err := watcher.parseBridgeEventLogs(logs)
relayedMessages, err := watcher.parseBridgeEventLogs(logs)
assert.NoError(t, err)
assert.Empty(t, l2Messages)
assert.Len(t, relayedMessages, 1)
assert.Equal(t, relayedMessages[0].msgHash, msgHash)
})
}
func testParseBridgeEventLogsL2AppendMessageEventSignature(t *testing.T) {
watcher, db := setupL2Watcher(t)
defer utils.CloseDB(db)
logs := []gethTypes.Log{
{
Topics: []common.Hash{bridgeAbi.L2AppendMessageEventSignature},
BlockNumber: 100,
TxHash: common.HexToHash("0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"),
},
}
convey.Convey("unpack AppendMessage log failure", t, func() {
targetErr := errors.New("UnpackLog AppendMessage failure")
patchGuard := gomonkey.ApplyFunc(utils.UnpackLog, func(c *abi.ABI, out interface{}, event string, log gethTypes.Log) error {
return targetErr
})
defer patchGuard.Reset()
l2Messages, relayedMessages, err := watcher.parseBridgeEventLogs(logs)
assert.EqualError(t, err, targetErr.Error())
assert.Empty(t, l2Messages)
assert.Empty(t, relayedMessages)
})
convey.Convey("L2AppendMessageEventSignature success", t, func() {
msgHash := common.HexToHash("0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5")
patchGuard := gomonkey.ApplyFunc(utils.UnpackLog, func(c *abi.ABI, out interface{}, event string, log gethTypes.Log) error {
tmpOut := out.(*bridgeAbi.L2AppendMessageEvent)
tmpOut.MessageHash = msgHash
tmpOut.Index = big.NewInt(100)
return nil
})
defer patchGuard.Reset()
l2Messages, relayedMessages, err := watcher.parseBridgeEventLogs(logs)
assert.NoError(t, err)
assert.Empty(t, l2Messages)
assert.Empty(t, relayedMessages)
})
}

View File

@@ -100,20 +100,14 @@ func TestFunction(t *testing.T) {
t.Run("TestL1WatcherClientFetchBlockHeader", testL1WatcherClientFetchBlockHeader)
t.Run("TestL1WatcherClientFetchContractEvent", testL1WatcherClientFetchContractEvent)
t.Run("TestParseBridgeEventLogsL1QueueTransactionEventSignature", testParseBridgeEventLogsL1QueueTransactionEventSignature)
t.Run("TestParseBridgeEventLogsL1RelayedMessageEventSignature", testParseBridgeEventLogsL1RelayedMessageEventSignature)
t.Run("TestParseBridgeEventLogsL1FailedRelayedMessageEventSignature", testParseBridgeEventLogsL1FailedRelayedMessageEventSignature)
t.Run("TestParseBridgeEventLogsL1CommitBatchEventSignature", testParseBridgeEventLogsL1CommitBatchEventSignature)
t.Run("TestParseBridgeEventLogsL1FinalizeBatchEventSignature", testParseBridgeEventLogsL1FinalizeBatchEventSignature)
// Run l2 watcher test cases.
t.Run("TestCreateNewWatcherAndStop", testCreateNewWatcherAndStop)
t.Run("TestMonitorBridgeContract", testMonitorBridgeContract)
t.Run("TestFetchMultipleSentMessageInOneBlock", testFetchMultipleSentMessageInOneBlock)
t.Run("TestFetchRunningMissingBlocks", testFetchRunningMissingBlocks)
t.Run("TestParseBridgeEventLogsL2SentMessageEventSignature", testParseBridgeEventLogsL2SentMessageEventSignature)
t.Run("TestParseBridgeEventLogsL2RelayedMessageEventSignature", testParseBridgeEventLogsL2RelayedMessageEventSignature)
t.Run("TestParseBridgeEventLogsL2FailedRelayedMessageEventSignature", testParseBridgeEventLogsL2FailedRelayedMessageEventSignature)
t.Run("TestParseBridgeEventLogsL2AppendMessageEventSignature", testParseBridgeEventLogsL2AppendMessageEventSignature)
// Run chunk-proposer test cases.
t.Run("TestChunkProposer", testChunkProposer)

View File

@@ -52,6 +52,16 @@ func (m *L1Message) GetLayer1LatestWatchedHeight() (int64, error) {
return -1, nil
}
// GetLayer1LatestMessageWithLayer2Hash returns latest l1 message with layer2 hash
func (m *L1Message) GetLayer1LatestMessageWithLayer2Hash() (*L1Message, error) {
var msg *L1Message
err := m.db.Where("layer2_hash IS NOT NULL").Order("queue_index DESC").First(&msg).Error
if err != nil {
return nil, err
}
return msg, nil
}
// GetL1MessagesByStatus fetch list of unprocessed messages given msg status
func (m *L1Message) GetL1MessagesByStatus(status types.MsgStatus, limit uint64) ([]L1Message, error) {
var msgs []L1Message

View File

@@ -1,127 +0,0 @@
package orm
import (
"context"
"github.com/scroll-tech/go-ethereum/log"
"gorm.io/gorm"
"scroll-tech/common/types"
)
// L2Message is structure of stored layer2 bridge message
type L2Message struct {
db *gorm.DB `gorm:"column:-"`
Nonce uint64 `json:"nonce" gorm:"column:nonce"`
MsgHash string `json:"msg_hash" gorm:"column:msg_hash"`
Height uint64 `json:"height" gorm:"column:height"`
Sender string `json:"sender" gorm:"column:sender"`
Value string `json:"value" gorm:"column:value"`
Target string `json:"target" gorm:"column:target"`
Calldata string `json:"calldata" gorm:"column:calldata"`
Layer2Hash string `json:"layer2_hash" gorm:"column:layer2_hash"`
Layer1Hash string `json:"layer1_hash" gorm:"column:layer1_hash;default:NULL"`
Proof string `json:"proof" gorm:"column:proof;default:NULL"`
Status int `json:"status" gorm:"column:status;default:1"`
}
// NewL2Message create an L2Message instance
func NewL2Message(db *gorm.DB) *L2Message {
return &L2Message{db: db}
}
// TableName define the L2Message table name
func (*L2Message) TableName() string {
return "l2_message"
}
// GetL2Messages fetch list of messages given msg status
func (m *L2Message) GetL2Messages(fields map[string]interface{}, orderByList []string, limit int) ([]L2Message, error) {
var l2MsgList []L2Message
db := m.db
for key, value := range fields {
db = db.Where(key, value)
}
for _, orderBy := range orderByList {
db = db.Order(orderBy)
}
if limit != 0 {
db = db.Limit(limit)
}
if err := db.Find(&l2MsgList).Error; err != nil {
return nil, err
}
return l2MsgList, nil
}
// GetLayer2LatestWatchedHeight returns latest height stored in the table
func (m *L2Message) GetLayer2LatestWatchedHeight() (int64, error) {
// @note It's not correct, since we may don't have message in some blocks.
// But it will only be called at start, some redundancy is acceptable.
result := m.db.Model(&L2Message{}).Select("COALESCE(MAX(height), -1)").Row()
if result.Err() != nil {
return -1, result.Err()
}
var maxNumber int64
if err := result.Scan(&maxNumber); err != nil {
return 0, err
}
return maxNumber, nil
}
// GetL2MessageByNonce fetch message by nonce
// for unit test
func (m *L2Message) GetL2MessageByNonce(nonce uint64) (*L2Message, error) {
var msg L2Message
err := m.db.Where("nonce", nonce).First(&msg).Error
if err != nil {
return nil, err
}
return &msg, nil
}
// SaveL2Messages batch save a list of layer2 messages
func (m *L2Message) SaveL2Messages(ctx context.Context, messages []L2Message) error {
if len(messages) == 0 {
return nil
}
err := m.db.WithContext(ctx).Create(&messages).Error
if err != nil {
nonces := make([]uint64, 0, len(messages))
heights := make([]uint64, 0, len(messages))
for _, msg := range messages {
nonces = append(nonces, msg.Nonce)
heights = append(heights, msg.Height)
}
log.Error("failed to insert layer2Messages", "nonces", nonces, "heights", heights, "err", err)
}
return err
}
// UpdateLayer2Status updates message stauts, given message hash
func (m *L2Message) UpdateLayer2Status(ctx context.Context, msgHash string, status types.MsgStatus) error {
err := m.db.Model(&L2Message{}).WithContext(ctx).Where("msg_hash", msgHash).Update("status", int(status)).Error
if err != nil {
return err
}
return nil
}
// UpdateLayer2StatusAndLayer1Hash updates message stauts and layer1 transaction hash, given message hash
func (m *L2Message) UpdateLayer2StatusAndLayer1Hash(ctx context.Context, msgHash string, status types.MsgStatus, layer1Hash string) error {
updateFields := map[string]interface{}{
"status": int(status),
"layer1_hash": layer1Hash,
}
err := m.db.Model(&L2Message{}).WithContext(ctx).Where("msg_hash", msgHash).Updates(updateFields).Error
if err != nil {
return err
}
return nil
}

View File

@@ -63,7 +63,7 @@ func testResetDB(t *testing.T) {
cur, err := Current(pgDB.DB)
assert.NoError(t, err)
// total number of tables.
assert.Equal(t, 6, int(cur))
assert.Equal(t, 5, int(cur))
}
func testMigrate(t *testing.T) {

View File

@@ -30,4 +30,4 @@ on l1_block (number);
-- +goose Down
-- +goose StatementBegin
drop table if exists l1_block;
-- +goose StatementEnd
-- +goose StatementEnd

View File

@@ -1,37 +0,0 @@
-- +goose Up
-- +goose StatementBegin
create table l2_message
(
nonce BIGINT NOT NULL,
msg_hash VARCHAR NOT NULL,
height BIGINT NOT NULL,
sender VARCHAR NOT NULL,
target VARCHAR NOT NULL,
value VARCHAR NOT NULL,
calldata TEXT NOT NULL,
layer2_hash VARCHAR NOT NULL,
layer1_hash VARCHAR DEFAULT NULL,
proof TEXT DEFAULT NULL,
status INTEGER DEFAULT 1,
created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP
);
comment
on column l2_message.status is 'undefined, pending, submitted, confirmed, failed, expired, relay_failed';
create unique index l2_message_hash_uindex
on l2_message (msg_hash);
create unique index l2_message_nonce_uindex
on l2_message (nonce);
create index l2_message_height_index
on l2_message (height);
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
drop table if exists l2_message;
-- +goose StatementEnd

View File

@@ -36,7 +36,7 @@ func NewBatchHeader(version uint8, batchIndex, totalL1MessagePoppedBefore uint64
// the next queue index that we need to process
nextIndex := totalL1MessagePoppedBefore
for _, chunk := range chunks {
for chunkID, chunk := range chunks {
// build data hash
totalL1MessagePoppedBeforeChunk := nextIndex
chunkHash, err := chunk.Hash(totalL1MessagePoppedBeforeChunk)
@@ -46,7 +46,7 @@ func NewBatchHeader(version uint8, batchIndex, totalL1MessagePoppedBefore uint64
dataBytes = append(dataBytes, chunkHash.Bytes()...)
// build skip bitmap
for _, block := range chunk.Blocks {
for blockID, block := range chunk.Blocks {
for _, tx := range block.Transactions {
if tx.Type != types.L1MessageTxType {
continue
@@ -54,7 +54,7 @@ func NewBatchHeader(version uint8, batchIndex, totalL1MessagePoppedBefore uint64
currentIndex := tx.Nonce
if currentIndex < nextIndex {
return nil, fmt.Errorf("unexpected batch payload, expected queue index: %d, got: %d", nextIndex, currentIndex)
return nil, fmt.Errorf("unexpected batch payload, expected queue index: %d, got: %d. Batch index: %d, chunk index in batch: %d, block index in chunk: %d, block hash: %v, transaction hash: %v", nextIndex, currentIndex, batchIndex, chunkID, blockID, block.Header.Hash(), tx.TxHash)
}
// mark skipped messages

View File

@@ -5,8 +5,6 @@ import (
"database/sql"
"fmt"
"time"
"scroll-tech/common/types/message"
)
// L1BlockStatus represents current l1 block processing status
@@ -161,11 +159,18 @@ type RollerStatus struct {
// SessionInfo is assigned rollers info of a block batch (session)
type SessionInfo struct {
ID string `json:"id"`
Rollers map[string]*RollerStatus `json:"rollers"`
StartTimestamp int64 `json:"start_timestamp"`
Attempts uint8 `json:"attempts,omitempty"`
ProveType message.ProveType `json:"prove_type,omitempty"`
ID int `json:"id" db:"id"`
TaskID string `json:"task_id" db:"task_id"`
RollerPublicKey string `json:"roller_public_key" db:"roller_public_key"`
ProveType int16 `json:"prove_type" db:"prove_type"`
RollerName string `json:"roller_name" db:"roller_name"`
ProvingStatus int16 `json:"proving_status" db:"proving_status"`
FailureType int16 `json:"failure_type" db:"failure_type"`
Reward uint64 `json:"reward" db:"reward"`
Proof []byte `json:"proof" db:"proof"`
CreatedAt *time.Time `json:"created_at" db:"created_at"`
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
DeletedAt *time.Time `json:"deleted_at" db:"deleted_at"`
}
// ProvingStatus block_batch proving_status (unassigned, assigned, proved, verified, submitted)

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "v4.0.4"
var tag = "v4.0.7"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -51,10 +51,12 @@ func (m *Manager) ListRollers() ([]*RollerInfo, error) {
PublicKey: pk,
}
for id, sess := range m.sessions {
if _, ok := sess.info.Rollers[pk]; ok {
info.ActiveSessionStartTime = time.Unix(sess.info.StartTimestamp, 0)
info.ActiveSession = id
break
for _, sessionInfo := range sess.sessionInfos {
if sessionInfo.RollerPublicKey == pk {
info.ActiveSessionStartTime = *sessionInfo.CreatedAt
info.ActiveSession = id
break
}
}
}
res = append(res, info)
@@ -66,14 +68,14 @@ func (m *Manager) ListRollers() ([]*RollerInfo, error) {
func newSessionInfo(sess *session, status types.ProvingStatus, errMsg string, finished bool) *SessionInfo {
now := time.Now()
var nameList []string
for pk := range sess.info.Rollers {
nameList = append(nameList, sess.info.Rollers[pk].Name)
for _, sessionInfo := range sess.sessionInfos {
nameList = append(nameList, sessionInfo.RollerName)
}
info := SessionInfo{
ID: sess.info.ID,
ID: sess.taskID,
Status: status.String(),
AssignedRollers: nameList,
StartTime: time.Unix(sess.info.StartTimestamp, 0),
StartTime: *sess.sessionInfos[0].CreatedAt,
Error: errMsg,
}
if finished {

View File

@@ -57,7 +57,8 @@ type rollerProofStatus struct {
// Contains all the information on an ongoing proof generation session.
type session struct {
info *types.SessionInfo
taskID string
sessionInfos []*types.SessionInfo
// finish channel is used to pass the public key of the rollers who finished proving process.
finishChan chan rollerProofStatus
}
@@ -248,24 +249,21 @@ func (m *Manager) restorePrevSessions() {
log.Error("failed to recover roller session info from db", "error", err)
return
}
sessionInfosMaps := make(map[string][]*types.SessionInfo)
for _, v := range prevSessions {
log.Info("restore roller info for session", "session start time", v.CreatedAt, "session id", v.TaskID, "roller name",
v.RollerName, "prove type", v.ProveType, "public key", v.RollerPublicKey, "proof status", v.ProvingStatus)
sessionInfosMaps[v.TaskID] = append(sessionInfosMaps[v.TaskID], v)
}
for taskID, sessionInfos := range sessionInfosMaps {
sess := &session{
info: v,
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
taskID: taskID,
sessionInfos: sessionInfos,
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
m.sessions[sess.info.ID] = sess
log.Info("Coordinator restart reload sessions", "session start time", time.Unix(sess.info.StartTimestamp, 0))
for _, roller := range sess.info.Rollers {
log.Info(
"restore roller info for session",
"session id", sess.info.ID,
"roller name", roller.Name,
"prove type", sess.info.ProveType,
"public key", roller.PublicKey,
"proof status", roller.Status)
}
m.sessions[taskID] = sess
go m.CollectProofs(sess)
}
@@ -287,36 +285,40 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
if !ok {
return fmt.Errorf("proof generation session for id %v does not existID", msg.ID)
}
proofTime := time.Since(time.Unix(sess.info.StartTimestamp, 0))
var tmpSessionInfo *types.SessionInfo
for _, si := range sess.sessionInfos {
// get the send session info of this proof msg
if si.TaskID == msg.ID && si.RollerPublicKey == pk {
tmpSessionInfo = si
}
}
if tmpSessionInfo == nil {
return fmt.Errorf("proof generation session for id %v pk:%s does not existID", msg.ID, pk)
}
proofTime := time.Since(*tmpSessionInfo.CreatedAt)
proofTimeSec := uint64(proofTime.Seconds())
// Ensure this roller is eligible to participate in the session.
roller, ok := sess.info.Rollers[pk]
if !ok {
return fmt.Errorf("roller %s %s (%s) is not eligible to partake in proof session %v", roller.Name, sess.info.ProveType, roller.PublicKey, msg.ID)
}
if roller.Status == types.RollerProofValid {
if types.RollerProveStatus(tmpSessionInfo.ProvingStatus) == types.RollerProofValid {
// In order to prevent DoS attacks, it is forbidden to repeatedly submit valid proofs.
// TODO: Defend invalid proof resubmissions by one of the following two methods:
// (i) slash the roller for each submission of invalid proof
// (ii) set the maximum failure retry times
log.Warn(
"roller has already submitted valid proof in proof session",
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"prove type", sess.info.ProveType,
"roller name", tmpSessionInfo.RollerName,
"roller pk", tmpSessionInfo.RollerPublicKey,
"prove type", tmpSessionInfo.ProveType,
"proof id", msg.ID,
)
return nil
}
log.Info(
"handling zk proof",
"proof id", msg.ID,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"prove type", sess.info.ProveType,
"proof time", proofTimeSec,
)
log.Info("handling zk proof", "proof id", msg.ID, "roller name", tmpSessionInfo.RollerName, "roller pk",
tmpSessionInfo.RollerPublicKey, "prove type", tmpSessionInfo.ProveType, "proof time", proofTimeSec)
defer func() {
// TODO: maybe we should use db tx for the whole process?
@@ -344,12 +346,12 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
if msg.Status != message.StatusOk {
coordinatorProofsGeneratedFailedTimeTimer.Update(proofTime)
m.updateMetricRollerProofsGeneratedFailedTimeTimer(roller.PublicKey, proofTime)
m.updateMetricRollerProofsGeneratedFailedTimeTimer(tmpSessionInfo.RollerPublicKey, proofTime)
log.Info(
"proof generated by roller failed",
"proof id", msg.ID,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"roller name", tmpSessionInfo.RollerName,
"roller pk", tmpSessionInfo.RollerPublicKey,
"prove type", msg.Type,
"proof time", proofTimeSec,
"error", msg.Error,
@@ -383,8 +385,8 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
if verifyErr != nil {
// TODO: this is only a temp workaround for testnet, we should return err in real cases
success = false
log.Error("Failed to verify zk proof", "proof id", msg.ID, "roller name", roller.Name,
"roller pk", roller.PublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", verifyErr)
log.Error("Failed to verify zk proof", "proof id", msg.ID, "roller name", tmpSessionInfo.RollerName,
"roller pk", tmpSessionInfo.RollerPublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", verifyErr)
// TODO: Roller needs to be slashed if proof is invalid.
}
@@ -411,18 +413,32 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
}
coordinatorProofsVerifiedSuccessTimeTimer.Update(proofTime)
m.updateMetricRollerProofsVerifiedSuccessTimeTimer(roller.PublicKey, proofTime)
log.Info("proof verified by coordinator success", "proof id", msg.ID, "roller name", roller.Name,
"roller pk", roller.PublicKey, "prove type", msg.Type, "proof time", proofTimeSec)
m.updateMetricRollerProofsVerifiedSuccessTimeTimer(tmpSessionInfo.RollerPublicKey, proofTime)
log.Info("proof verified by coordinator success", "proof id", msg.ID, "roller name", tmpSessionInfo.RollerName,
"roller pk", tmpSessionInfo.RollerPublicKey, "prove type", msg.Type, "proof time", proofTimeSec)
} else {
coordinatorProofsVerifiedFailedTimeTimer.Update(proofTime)
m.updateMetricRollerProofsVerifiedFailedTimeTimer(roller.PublicKey, proofTime)
log.Info("proof verified by coordinator failed", "proof id", msg.ID, "roller name", roller.Name,
"roller pk", roller.PublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", verifyErr)
m.updateMetricRollerProofsVerifiedFailedTimeTimer(tmpSessionInfo.RollerPublicKey, proofTime)
log.Info("proof verified by coordinator failed", "proof id", msg.ID, "roller name", tmpSessionInfo.RollerName,
"roller pk", tmpSessionInfo.RollerPublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", verifyErr)
}
return nil
}
// checkAttempts use the count of session info to check the attempts
func (m *Manager) checkAttemptsExceeded(hash string) bool {
sessionInfos, err := m.orm.GetSessionInfosByHashes([]string{hash})
if err != nil {
log.Error("get session info error", "hash id", hash, "error", err)
return true
}
if len(sessionInfos) >= int(m.cfg.SessionAttempts) {
return true
}
return false
}
// CollectProofs collects proofs corresponding to a proof generation session.
func (m *Manager) CollectProofs(sess *session) {
coordinatorSessionsActiveNumberGauge.Inc(1)
@@ -432,48 +448,47 @@ func (m *Manager) CollectProofs(sess *session) {
select {
//Execute after timeout, set in config.json. Consider all rollers failed.
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
// Check if session can be replayed
if sess.info.Attempts < m.cfg.SessionAttempts {
if !m.checkAttemptsExceeded(sess.taskID) {
var success bool
if sess.info.ProveType == message.AggregatorProve {
if message.ProveType(sess.sessionInfos[0].ProveType) == message.AggregatorProve {
success = m.StartAggProofGenerationSession(nil, sess)
} else if sess.info.ProveType == message.BasicProve {
} else if message.ProveType(sess.sessionInfos[0].ProveType) == message.BasicProve {
success = m.StartBasicProofGenerationSession(nil, sess)
}
if success {
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
for _, v := range sess.sessionInfos {
m.freeTaskIDForRoller(v.RollerPublicKey, v.TaskID)
}
m.mu.Unlock()
log.Info("Retrying session", "session id:", sess.info.ID)
log.Info("Retrying session", "session id:", sess.taskID)
return
}
}
// record failed session.
errMsg := "proof generation session ended without receiving any valid proofs"
m.addFailedSession(sess, errMsg)
log.Warn(errMsg, "session id", sess.info.ID)
log.Warn(errMsg, "session id", sess.taskID)
// Set status as skipped.
// Note that this is only a workaround for testnet here.
// TODO: In real cases we should reset to orm.ProvingTaskUnassigned
// so as to re-distribute the task in the future
if sess.info.ProveType == message.BasicProve {
if err := m.orm.UpdateProvingStatus(sess.info.ID, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset basic task_status as Unassigned", "id", sess.info.ID, "err", err)
if message.ProveType(sess.sessionInfos[0].ProveType) == message.BasicProve {
if err := m.orm.UpdateProvingStatus(sess.taskID, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset basic task_status as Unassigned", "id", sess.taskID, "err", err)
}
}
if sess.info.ProveType == message.AggregatorProve {
if err := m.orm.UpdateAggTaskStatus(sess.info.ID, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset aggregator task_status as Unassigned", "id", sess.info.ID, "err", err)
if message.ProveType(sess.sessionInfos[0].ProveType) == message.AggregatorProve {
if err := m.orm.UpdateAggTaskStatus(sess.taskID, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset aggregator task_status as Unassigned", "id", sess.taskID, "err", err)
}
}
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
for _, v := range sess.sessionInfos {
m.freeTaskIDForRoller(v.RollerPublicKey, v.TaskID)
}
delete(m.sessions, sess.info.ID)
delete(m.sessions, sess.taskID)
m.mu.Unlock()
coordinatorSessionsTimeoutTotalCounter.Inc(1)
return
@@ -481,7 +496,12 @@ func (m *Manager) CollectProofs(sess *session) {
//Execute after one of the roller finishes sending proof, return early if all rollers had sent results.
case ret := <-sess.finishChan:
m.mu.Lock()
sess.info.Rollers[ret.pk].Status = ret.status
for idx := range sess.sessionInfos {
if sess.sessionInfos[idx].RollerPublicKey == ret.pk {
sess.sessionInfos[idx].ProvingStatus = int16(ret.status)
}
}
if sess.isSessionFailed() {
if ret.typ == message.BasicProve {
if err := m.orm.UpdateProvingStatus(ret.id, types.ProvingTaskFailed); err != nil {
@@ -493,12 +513,13 @@ func (m *Manager) CollectProofs(sess *session) {
log.Error("failed to update aggregator proving_status as failed", "msg.ID", ret.id, "error", err)
}
}
coordinatorSessionsFailedTotalCounter.Inc(1)
}
if err := m.orm.SetSessionInfo(sess.info); err != nil {
if err := m.orm.UpdateSessionInfoProvingStatus(m.ctx, ret.typ, ret.id, ret.pk, ret.status); err != nil {
log.Error("db set session info fail", "pk", ret.pk, "error", err)
}
//Check if all rollers have finished their tasks, and rollers with valid results are indexed by public key.
finished, validRollers := sess.isRollersFinished()
@@ -508,11 +529,10 @@ func (m *Manager) CollectProofs(sess *session) {
randIndex := rand.Int63n(int64(len(validRollers)))
_ = validRollers[randIndex]
// TODO: reward winner
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
for _, sessionInfo := range sess.sessionInfos {
m.freeTaskIDForRoller(sessionInfo.RollerPublicKey, sessionInfo.TaskID)
delete(m.sessions, sessionInfo.TaskID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
coordinatorSessionsSuccessTotalCounter.Inc(1)
@@ -528,14 +548,16 @@ func (m *Manager) CollectProofs(sess *session) {
// validRollers also records the public keys of rollers who have finished their tasks correctly as index.
func (s *session) isRollersFinished() (bool, []string) {
var validRollers []string
for pk, roller := range s.info.Rollers {
if roller.Status == types.RollerProofValid {
validRollers = append(validRollers, pk)
for _, sessionInfo := range s.sessionInfos {
if types.RollerProveStatus(sessionInfo.ProvingStatus) == types.RollerProofValid {
validRollers = append(validRollers, sessionInfo.RollerPublicKey)
continue
}
if roller.Status == types.RollerProofInvalid {
if types.RollerProveStatus(sessionInfo.ProvingStatus) == types.RollerProofInvalid {
continue
}
// Some rollers are still proving.
return false, nil
}
@@ -543,8 +565,8 @@ func (s *session) isRollersFinished() (bool, []string) {
}
func (s *session) isSessionFailed() bool {
for _, roller := range s.info.Rollers {
if roller.Status != types.RollerProofInvalid {
for _, sessionInfo := range s.sessionInfos {
if types.RollerProveStatus(sessionInfo.ProvingStatus) != types.RollerProofInvalid {
return false
}
}
@@ -573,7 +595,7 @@ func (m *Manager) StartBasicProofGenerationSession(task *types.BlockBatch, prevS
if task != nil {
taskID = task.Hash
} else {
taskID = prevSession.info.ID
taskID = prevSession.taskID
}
if m.GetNumberOfIdleRollers(message.BasicProve) == 0 {
log.Warn("no idle basic roller when starting proof generation session", "id", taskID)
@@ -612,7 +634,7 @@ func (m *Manager) StartBasicProofGenerationSession(task *types.BlockBatch, prevS
}
// Dispatch task to basic rollers.
rollers := make(map[string]*types.RollerStatus)
var sessionInfos []*types.SessionInfo
for i := 0; i < int(m.cfg.RollersPerSession); i++ {
roller := m.selectRoller(message.BasicProve)
if roller == nil {
@@ -626,10 +648,27 @@ func (m *Manager) StartBasicProofGenerationSession(task *types.BlockBatch, prevS
continue
}
m.updateMetricRollerProofsLastAssignedTimestampGauge(roller.PublicKey)
rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned}
now := time.Now()
tmpSessionInfo := types.SessionInfo{
TaskID: taskID,
RollerPublicKey: roller.PublicKey,
ProveType: int16(message.BasicProve),
RollerName: roller.Name,
CreatedAt: &now,
ProvingStatus: int16(types.RollerAssigned),
}
// Store session info.
if err = m.orm.SetSessionInfo(&tmpSessionInfo); err != nil {
log.Error("db set session info fail", "session id", taskID, "error", err)
return false
}
sessionInfos = append(sessionInfos, &tmpSessionInfo)
log.Info("assigned proof to roller", "session id", taskID, "session type", message.BasicProve, "roller name", roller.Name,
"roller pk", roller.PublicKey, "proof status", tmpSessionInfo.ProvingStatus)
}
// No roller assigned.
if len(rollers) == 0 {
if len(sessionInfos) == 0 {
log.Error("no roller assigned", "id", taskID, "number of idle basic rollers", m.GetNumberOfIdleRollers(message.BasicProve))
return false
}
@@ -642,33 +681,9 @@ func (m *Manager) StartBasicProofGenerationSession(task *types.BlockBatch, prevS
// Create a proof generation session.
sess := &session{
info: &types.SessionInfo{
ID: taskID,
Rollers: rollers,
ProveType: message.BasicProve,
StartTimestamp: time.Now().Unix(),
Attempts: 1,
},
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
if prevSession != nil {
sess.info.Attempts += prevSession.info.Attempts
}
for _, roller := range sess.info.Rollers {
log.Info(
"assigned proof to roller",
"session id", sess.info.ID,
"session type", sess.info.ProveType,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"proof status", roller.Status)
}
// Store session info.
if err = m.orm.SetSessionInfo(sess.info); err != nil {
log.Error("db set session info fail", "session id", sess.info.ID, "error", err)
return false
taskID: taskID,
sessionInfos: sessionInfos,
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
m.mu.Lock()
@@ -685,7 +700,7 @@ func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSessio
if task != nil {
taskID = task.ID
} else {
taskID = prevSession.info.ID
taskID = prevSession.taskID
}
if m.GetNumberOfIdleRollers(message.AggregatorProve) == 0 {
log.Warn("no idle common roller when starting proof generation session", "id", taskID)
@@ -715,7 +730,7 @@ func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSessio
}
// Dispatch task to basic rollers.
rollers := make(map[string]*types.RollerStatus)
var sessionInfos []*types.SessionInfo
for i := 0; i < int(m.cfg.RollersPerSession); i++ {
roller := m.selectRoller(message.AggregatorProve)
if roller == nil {
@@ -732,11 +747,29 @@ func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSessio
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", taskID)
continue
}
now := time.Now()
tmpSessionInfo := types.SessionInfo{
TaskID: taskID,
RollerPublicKey: roller.PublicKey,
ProveType: int16(message.AggregatorProve),
RollerName: roller.Name,
CreatedAt: &now,
ProvingStatus: int16(types.RollerAssigned),
}
// Store session info.
if err = m.orm.SetSessionInfo(&tmpSessionInfo); err != nil {
log.Error("db set session info fail", "session id", taskID, "error", err)
return false
}
m.updateMetricRollerProofsLastAssignedTimestampGauge(roller.PublicKey)
rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned}
sessionInfos = append(sessionInfos, &tmpSessionInfo)
log.Info("assigned proof to roller", "session id", taskID, "session type", message.AggregatorProve, "roller name", roller.Name,
"roller pk", roller.PublicKey, "proof status", tmpSessionInfo.ProvingStatus)
}
// No roller assigned.
if len(rollers) == 0 {
if len(sessionInfos) == 0 {
log.Error("no roller assigned", "id", taskID, "number of idle aggregator rollers", m.GetNumberOfIdleRollers(message.AggregatorProve))
return false
}
@@ -749,33 +782,9 @@ func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSessio
// Create a proof generation session.
sess := &session{
info: &types.SessionInfo{
ID: taskID,
Rollers: rollers,
ProveType: message.AggregatorProve,
StartTimestamp: time.Now().Unix(),
Attempts: 1,
},
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
if prevSession != nil {
sess.info.Attempts += prevSession.info.Attempts
}
for _, roller := range sess.info.Rollers {
log.Info(
"assigned proof to roller",
"session id", sess.info.ID,
"session type", sess.info.ProveType,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"proof status", roller.Status)
}
// Store session info.
if err = m.orm.SetSessionInfo(sess.info); err != nil {
log.Error("db set session info fail", "session id", sess.info.ID, "error", err)
return false
taskID: taskID,
sessionInfos: sessionInfos,
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
m.mu.Lock()
@@ -789,7 +798,7 @@ func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSessio
func (m *Manager) addFailedSession(sess *session, errMsg string) {
m.mu.Lock()
defer m.mu.Unlock()
m.failedSessionInfos[sess.info.ID] = newSessionInfo(sess, types.ProvingTaskFailed, errMsg, true)
m.failedSessionInfos[sess.taskID] = newSessionInfo(sess, types.ProvingTaskFailed, errMsg, true)
}
// VerifyToken verifies pukey for token and expiration time

View File

@@ -53,8 +53,8 @@ func (m *Manager) reloadRollerAssignedTasks(pubkey string) *cmap.ConcurrentMap {
defer m.mu.RUnlock()
taskIDs := cmap.New()
for id, sess := range m.sessions {
for pk, roller := range sess.info.Rollers {
if pk == pubkey && roller.Status == types.RollerAssigned {
for _, sessionInfo := range sess.sessionInfos {
if sessionInfo.RollerPublicKey == pubkey && sessionInfo.ProvingStatus == int16(types.RollerAssigned) {
taskIDs.Set(id, struct{}{})
}
}

View File

@@ -3,12 +3,21 @@
create table session_info
(
hash VARCHAR NOT NULL,
rollers_info BYTEA NOT NULL
);
id BIGSERIAL PRIMARY KEY,
task_id VARCHAR NOT NULL,
roller_public_key VARCHAR NOT NULL,
prove_type SMALLINT DEFAULT 0,
roller_name VARCHAR NOT NULL,
proving_status SMALLINT DEFAULT 1,
failure_type SMALLINT DEFAULT 0,
reward BIGINT DEFAULT 0,
proof BYTEA DEFAULT NULL,
created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP(0) DEFAULT NULL,
create unique index session_info_hash_uindex
on session_info (hash);
CONSTRAINT uk_session_unique UNIQUE (task_id, roller_public_key)
);
-- +goose StatementEnd

View File

@@ -44,6 +44,7 @@ type BlockTraceOrm interface {
type SessionInfoOrm interface {
GetSessionInfosByHashes(hashes []string) ([]*types.SessionInfo, error)
SetSessionInfo(rollersInfo *types.SessionInfo) error
UpdateSessionInfoProvingStatus(ctx context.Context, proveType message.ProveType, taskID string, pk string, status types.RollerProveStatus) error
}
// AggTaskOrm is aggregator task

View File

@@ -1,11 +1,12 @@
package orm
import (
"encoding/json"
"context"
"github.com/jmoiron/sqlx"
"scroll-tech/common/types"
"scroll-tech/common/types/message"
)
type sessionInfoOrm struct {
@@ -23,7 +24,7 @@ func (o *sessionInfoOrm) GetSessionInfosByHashes(hashes []string) ([]*types.Sess
if len(hashes) == 0 {
return nil, nil
}
query, args, err := sqlx.In("SELECT rollers_info FROM session_info WHERE hash IN (?);", hashes)
query, args, err := sqlx.In("SELECT * FROM session_info WHERE task_id IN (?);", hashes)
if err != nil {
return nil, err
}
@@ -35,15 +36,11 @@ func (o *sessionInfoOrm) GetSessionInfosByHashes(hashes []string) ([]*types.Sess
var sessionInfos []*types.SessionInfo
for rows.Next() {
var infoBytes []byte
if err = rows.Scan(&infoBytes); err != nil {
var sessionInfo types.SessionInfo
if err = rows.StructScan(&sessionInfo); err != nil {
return nil, err
}
sessionInfo := &types.SessionInfo{}
if err = json.Unmarshal(infoBytes, sessionInfo); err != nil {
return nil, err
}
sessionInfos = append(sessionInfos, sessionInfo)
sessionInfos = append(sessionInfos, &sessionInfo)
}
if err = rows.Err(); err != nil {
return nil, err
@@ -53,11 +50,16 @@ func (o *sessionInfoOrm) GetSessionInfosByHashes(hashes []string) ([]*types.Sess
}
func (o *sessionInfoOrm) SetSessionInfo(rollersInfo *types.SessionInfo) error {
infoBytes, err := json.Marshal(rollersInfo)
if err != nil {
return err
}
sqlStr := "INSERT INTO session_info (hash, rollers_info) VALUES ($1, $2) ON CONFLICT (hash) DO UPDATE SET rollers_info = EXCLUDED.rollers_info;"
_, err = o.db.Exec(sqlStr, rollersInfo.ID, infoBytes)
sqlStr := "INSERT INTO session_info (task_id, roller_public_key, prove_type, roller_name, proving_status, failure_type, reward, proof, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (task_id, roller_public_key) DO UPDATE SET proving_status = EXCLUDED.proving_status;"
_, err := o.db.Exec(sqlStr, rollersInfo.TaskID, rollersInfo.RollerPublicKey, rollersInfo.ProveType, rollersInfo.RollerName,
rollersInfo.ProvingStatus, rollersInfo.FailureType, rollersInfo.Reward, rollersInfo.Proof, rollersInfo.CreatedAt)
return err
}
// UpdateSessionInfoProvingStatus update the session info proving status
func (o *sessionInfoOrm) UpdateSessionInfoProvingStatus(ctx context.Context, proveType message.ProveType, taskID string, pk string, status types.RollerProveStatus) error {
if _, err := o.db.ExecContext(ctx, o.db.Rebind("update session_info set proving_status = ? where prove_type = ? and task_id = ? and roller_public_key = ? ;"), int(proveType), int(status), taskID, pk); err != nil {
return err
}
return nil
}

View File

@@ -410,31 +410,29 @@ func testOrmSessionInfo(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 0, len(sessionInfos))
now := time.Now()
sessionInfo := types.SessionInfo{
ID: batchHash,
Rollers: map[string]*types.RollerStatus{
"0": {
PublicKey: "0",
Name: "roller-0",
Status: types.RollerAssigned,
},
},
StartTimestamp: time.Now().Unix()}
TaskID: batchHash,
RollerName: "roller-0",
RollerPublicKey: "0",
ProvingStatus: int16(types.RollerAssigned),
CreatedAt: &now,
}
// insert
assert.NoError(t, ormSession.SetSessionInfo(&sessionInfo))
sessionInfos, err = ormSession.GetSessionInfosByHashes(hashes)
assert.NoError(t, err)
assert.Equal(t, 1, len(sessionInfos))
assert.Equal(t, sessionInfo, *sessionInfos[0])
assert.Equal(t, sessionInfo.RollerName, sessionInfos[0].RollerName)
// update
sessionInfo.Rollers["0"].Status = types.RollerProofValid
sessionInfo.ProvingStatus = int16(types.RollerProofValid)
assert.NoError(t, ormSession.SetSessionInfo(&sessionInfo))
sessionInfos, err = ormSession.GetSessionInfosByHashes(hashes)
assert.NoError(t, err)
assert.Equal(t, 1, len(sessionInfos))
assert.Equal(t, sessionInfo, *sessionInfos[0])
assert.Equal(t, sessionInfo.ProvingStatus, sessionInfos[0].ProvingStatus)
// delete
assert.NoError(t, ormBatch.UpdateProvingStatus(batchHash, types.ProvingTaskVerified))