Compare commits

...

2 Commits

Author SHA1 Message Date
georgehao
999f3ceaaf feat: add grom logger 2023-07-04 18:24:37 +08:00
georgehao
a5ab53161a feat: remove unused uitls floder 2023-07-03 10:19:20 +08:00
43 changed files with 453 additions and 737 deletions

View File

@@ -75,7 +75,7 @@ func (c *CrossMsgFetcher) Start() {
return
case <-tick.C:
c.mu.Lock()
c.forwardFetchAndSaveMissingEvents(0)
c.forwardFetchAndSaveMissingEvents(1)
c.mu.Unlock()
}
}

View File

@@ -12,6 +12,7 @@ import (
backendabi "bridge-history-api/abi"
"bridge-history-api/db"
"bridge-history-api/db/orm"
"bridge-history-api/utils"
)
@@ -104,6 +105,14 @@ func L1FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, databas
log.Error("l1FetchAndSaveEvents: Failed to parse cross msg event logs", "err", err)
return err
}
for i := range depositL1CrossMsgs {
for _, msgHash := range msgHashes {
if depositL1CrossMsgs[i].Layer1Hash == msgHash.TxHash.Hex() {
depositL1CrossMsgs[i].MsgHash = msgHash.MsgHash.Hex()
break
}
}
}
dbTx, err := database.Beginx()
if err != nil {
log.Error("l2FetchAndSaveEvents: Failed to begin db transaction", "err", err)
@@ -120,11 +129,6 @@ func L1FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, databas
dbTx.Rollback()
log.Crit("l1FetchAndSaveEvents: Failed to insert relayed message event logs", "err", err)
}
err = updateL1CrossMsgMsgHash(ctx, dbTx, database, msgHashes)
if err != nil {
dbTx.Rollback()
log.Crit("l1FetchAndSaveEvents: Failed to update msgHash in L1 cross msg", "err", err)
}
err = dbTx.Commit()
if err != nil {
// if we can not insert into DB, there must something wrong, need a on-call member handle the dababase manually
@@ -157,11 +161,22 @@ func L2FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, databas
log.Warn("Failed to get l2 event logs", "err", err)
return err
}
depositL2CrossMsgs, msgHashes, relayedMsg, l2sentMsgs, err := utils.ParseBackendL2EventLogs(logs)
depositL2CrossMsgs, relayedMsg, L2SentMsgWrappers, err := utils.ParseBackendL2EventLogs(logs)
if err != nil {
log.Error("l2FetchAndSaveEvents: Failed to parse cross msg event logs", "err", err)
return err
}
var l2SentMsgs []*orm.L2SentMsg
for i := range depositL2CrossMsgs {
for _, l2SentMsgWrapper := range L2SentMsgWrappers {
if depositL2CrossMsgs[i].Layer2Hash == l2SentMsgWrapper.TxHash.Hex() {
depositL2CrossMsgs[i].MsgHash = l2SentMsgWrapper.L2SentMsg.MsgHash
l2SentMsgWrapper.L2SentMsg.TxSender = depositL2CrossMsgs[i].Sender
l2SentMsgs = append(l2SentMsgs, l2SentMsgWrapper.L2SentMsg)
break
}
}
}
dbTx, err := database.Beginx()
if err != nil {
log.Error("l2FetchAndSaveEvents: Failed to begin db transaction", "err", err)
@@ -179,16 +194,12 @@ func L2FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, databas
log.Crit("l2FetchAndSaveEvents: Failed to insert relayed message event logs", "err", err)
}
err = updateL2CrossMsgMsgHash(ctx, dbTx, database, msgHashes)
if err != nil {
dbTx.Rollback()
log.Crit("l2FetchAndSaveEvents: Failed to update msgHash in L2 cross msg", "err", err)
}
err = database.BatchInsertL2SentMsgDBTx(dbTx, l2sentMsgs)
if err != nil {
dbTx.Rollback()
log.Crit("l2FetchAndSaveEvents: Failed to insert l2 sent message", "err", err)
if len(l2SentMsgs) > 0 {
err = database.BatchInsertL2SentMsgDBTx(dbTx, l2SentMsgs)
if err != nil {
dbTx.Rollback()
log.Crit("l2FetchAndSaveEvents: Failed to insert l2 sent message", "err", err)
}
}
err = dbTx.Commit()

View File

@@ -12,30 +12,32 @@ create table cross_message
layer2_hash VARCHAR NOT NULL DEFAULT '',
layer1_token VARCHAR NOT NULL DEFAULT '',
layer2_token VARCHAR NOT NULL DEFAULT '',
token_id BIGINT NOT NULL DEFAULT 0,
asset SMALLINT NOT NULL,
msg_type SMALLINT NOT NULL,
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
-- use array to support nft bridge
token_ids VARCHAR[] NOT NULL DEFAULT '{}',
-- use array to support nft bridge
token_amounts VARCHAR[] NOT NULL DEFAULT '{}',
block_timestamp TIMESTAMP(0) DEFAULT NULL,
created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP(0) DEFAULT NULL
);
create unique index uk_msg_hash_msg_type
on cross_message (msg_hash, msg_type) where deleted_at IS NULL;
comment
on column cross_message.asset is 'ETH, ERC20, ERC721, ERC1155';
comment
on column cross_message.msg_type is 'unknown, l1msg, l2msg';
comment
on column cross_message.is_deleted is 'NotDeleted false, Deleted true';
CREATE INDEX idx_l1_msg_index ON cross_message (layer1_hash, deleted_at);
CREATE INDEX valid_l1_msg_index ON cross_message (layer1_hash, is_deleted);
CREATE INDEX idx_l2_msg_index ON cross_message (layer2_hash, deleted_at);
CREATE INDEX valid_l2_msg_index ON cross_message (layer2_hash, is_deleted);
CREATE INDEX valid_height_index ON cross_message (height, msg_type, is_deleted);
CREATE INDEX idx_height_msg_type_index ON cross_message (height, msg_type, deleted_at);
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
@@ -49,22 +51,6 @@ CREATE TRIGGER update_timestamp BEFORE UPDATE
ON cross_message FOR EACH ROW EXECUTE PROCEDURE
update_timestamp();
CREATE OR REPLACE FUNCTION deleted_at_trigger()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.is_deleted AND OLD.is_deleted != NEW.is_deleted THEN
UPDATE cross_message SET deleted_at = NOW() WHERE id = NEW.id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER deleted_at_trigger
AFTER UPDATE ON cross_message
FOR EACH ROW
EXECUTE FUNCTION deleted_at_trigger();
-- +goose StatementEnd
-- +goose Down

View File

@@ -7,17 +7,17 @@ create table relayed_msg
height BIGINT NOT NULL,
layer1_hash VARCHAR NOT NULL DEFAULT '',
layer2_hash VARCHAR NOT NULL DEFAULT '',
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP(0) DEFAULT NULL
);
comment
on column relayed_msg.is_deleted is 'NotDeleted, Deleted';
create unique index uk_msg_hash
on relayed_msg (msg_hash) where deleted_at IS NULL;
create unique index relayed_msg_hash_uindex
on relayed_msg (msg_hash);
CREATE INDEX idx_l1_msg_index ON relayed_msg (layer1_hash, deleted_at);
CREATE INDEX idx_l2_msg_index ON relayed_msg (layer2_hash, deleted_at);
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
@@ -31,22 +31,6 @@ CREATE TRIGGER update_timestamp BEFORE UPDATE
ON relayed_msg FOR EACH ROW EXECUTE PROCEDURE
update_timestamp();
CREATE OR REPLACE FUNCTION deleted_at_trigger()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.is_deleted AND OLD.is_deleted != NEW.is_deleted THEN
UPDATE relayed_msg SET deleted_at = NOW() WHERE id = NEW.id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER deleted_at_trigger
AFTER UPDATE ON relayed_msg
FOR EACH ROW
EXECUTE FUNCTION deleted_at_trigger();
-- +goose StatementEnd
-- +goose Down

View File

@@ -3,6 +3,7 @@
create table l2_sent_msg
(
id BIGSERIAL PRIMARY KEY,
tx_sender VARCHAR NOT NULL,
sender VARCHAR NOT NULL,
target VARCHAR NOT NULL,
value VARCHAR NOT NULL,
@@ -12,14 +13,16 @@ create table l2_sent_msg
batch_index BIGINT NOT NULL DEFAULT 0,
msg_proof TEXT NOT NULL DEFAULT '',
msg_data TEXT NOT NULL DEFAULT '',
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP(0) DEFAULT NULL
);
comment
on column l2_sent_msg.is_deleted is 'NotDeleted, Deleted';
create unique index uk_msg_hash
on l2_sent_msg (msg_hash) where deleted_at IS NULL;
create unique index uk_nonce
on l2_sent_msg (nonce) where deleted_at IS NULL;
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
@@ -33,22 +36,6 @@ CREATE TRIGGER update_timestamp BEFORE UPDATE
ON l2_sent_msg FOR EACH ROW EXECUTE PROCEDURE
update_timestamp();
CREATE OR REPLACE FUNCTION deleted_at_trigger()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.is_deleted AND OLD.is_deleted != NEW.is_deleted THEN
UPDATE l2_sent_msg SET deleted_at = NOW() WHERE id = NEW.id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER deleted_at_trigger
AFTER UPDATE ON l2_sent_msg
FOR EACH ROW
EXECUTE FUNCTION deleted_at_trigger();
-- +goose StatementEnd
-- +goose Down

View File

@@ -8,12 +8,17 @@ create table rollup_batch
start_block_number BIGINT NOT NULL,
end_block_number BIGINT NOT NULL,
batch_hash VARCHAR NOT NULL,
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP(0) DEFAULT NULL
);
create unique index uk_batch_index
on rollup_batch (batch_index) where deleted_at IS NULL;
create unique index uk_batch_hash
on rollup_batch (batch_hash) where deleted_at IS NULL;
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
@@ -26,21 +31,6 @@ CREATE TRIGGER update_timestamp BEFORE UPDATE
ON rollup_batch FOR EACH ROW EXECUTE PROCEDURE
update_timestamp();
CREATE OR REPLACE FUNCTION deleted_at_trigger()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.is_deleted AND OLD.is_deleted != NEW.is_deleted THEN
UPDATE rollup_batch SET deleted_at = NOW() WHERE id = NEW.id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER deleted_at_trigger
AFTER UPDATE ON rollup_batch
FOR EACH ROW
EXECUTE FUNCTION deleted_at_trigger();
-- +goose StatementEnd
-- +goose Down

View File

@@ -2,7 +2,6 @@ package orm
import (
"database/sql"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/jmoiron/sqlx"
@@ -40,14 +39,6 @@ func (b *rollupBatchOrm) BatchInsertRollupBatchDBTx(dbTx *sqlx.Tx, batches []*Ro
"start_block_number": batch.StartBlockNumber,
"end_block_number": batch.EndBlockNumber,
}
var exists bool
err = dbTx.QueryRow(`SELECT EXISTS(SELECT 1 FROM rollup_batch WHERE batch_index = $1 AND NOT is_deleted)`, batch.BatchIndex).Scan(&exists)
if err != nil {
return err
}
if exists {
return fmt.Errorf("BatchInsertRollupBatchDBTx: batch index %v already exists at height %v", batch.BatchIndex, batch.CommitHeight)
}
}
_, err = dbTx.NamedExec(`insert into rollup_batch(commit_height, batch_index, batch_hash, start_block_number, end_block_number) values(:commit_height, :batch_index, :batch_hash, :start_block_number, :end_block_number);`, batchMaps)
if err != nil {

View File

@@ -40,31 +40,24 @@ const (
// CrossMsg represents a cross message from layer 1 to layer 2
type CrossMsg struct {
ID uint64 `json:"id" db:"id"`
MsgHash string `json:"msg_hash" db:"msg_hash"`
Height uint64 `json:"height" db:"height"`
Sender string `json:"sender" db:"sender"`
Target string `json:"target" db:"target"`
Amount string `json:"amount" db:"amount"`
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
Layer1Token string `json:"layer1_token" db:"layer1_token"`
Layer2Token string `json:"layer2_token" db:"layer2_token"`
TokenID uint64 `json:"token_id" db:"token_id"`
Asset int `json:"asset" db:"asset"`
MsgType int `json:"msg_type" db:"msg_type"`
IsDeleted bool `json:"is_deleted" db:"is_deleted"`
Timestamp *time.Time `json:"timestamp" db:"block_timestamp"`
CreatedAt *time.Time `json:"created_at" db:"created_at"`
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
DeletedAt *time.Time `json:"deleted_at" db:"deleted_at"`
}
type RelayedMsg struct {
MsgHash string `json:"msg_hash" db:"msg_hash"`
Height uint64 `json:"height" db:"height"`
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
ID uint64 `json:"id" db:"id"`
MsgHash string `json:"msg_hash" db:"msg_hash"`
Height uint64 `json:"height" db:"height"`
Sender string `json:"sender" db:"sender"`
Target string `json:"target" db:"target"`
Amount string `json:"amount" db:"amount"`
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
Layer1Token string `json:"layer1_token" db:"layer1_token"`
Layer2Token string `json:"layer2_token" db:"layer2_token"`
TokenIDs []string `json:"token_ids" db:"token_ids"`
TokenAmounts []string `json:"token_amounts" db:"token_amounts"`
Asset int `json:"asset" db:"asset"`
MsgType int `json:"msg_type" db:"msg_type"`
Timestamp *time.Time `json:"timestamp" db:"block_timestamp"`
CreatedAt *time.Time `json:"created_at" db:"created_at"`
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
DeletedAt *time.Time `json:"deleted_at" db:"deleted_at"`
}
// L1CrossMsgOrm provides operations on l1_cross_message table

View File

@@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"errors"
"fmt"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -23,7 +22,7 @@ func NewL1CrossMsgOrm(db *sqlx.DB) L1CrossMsgOrm {
func (l *l1CrossMsgOrm) GetL1CrossMsgByHash(l1Hash common.Hash) (*CrossMsg, error) {
result := &CrossMsg{}
row := l.db.QueryRowx(`SELECT * FROM cross_message WHERE layer1_hash = $1 AND msg_type = $2 AND NOT is_deleted;`, l1Hash.String(), Layer1Msg)
row := l.db.QueryRowx(`SELECT * FROM cross_message WHERE layer1_hash = $1 AND msg_type = $2 AND deleted_at IS NULL;`, l1Hash.String(), Layer1Msg)
if err := row.StructScan(result); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
@@ -37,7 +36,7 @@ func (l *l1CrossMsgOrm) GetL1CrossMsgByHash(l1Hash common.Hash) (*CrossMsg, erro
// Warning: return empty slice if no data found
func (l *l1CrossMsgOrm) GetL1CrossMsgsByAddress(sender common.Address) ([]*CrossMsg, error) {
var results []*CrossMsg
rows, err := l.db.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND msg_type = 1 AND NOT is_deleted;`, sender.String(), Layer1Msg)
rows, err := l.db.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND msg_type = 1 AND deleted_at IS NULL;`, sender.String(), Layer1Msg)
for rows.Next() {
msg := &CrossMsg{}
@@ -69,19 +68,11 @@ func (l *l1CrossMsgOrm) BatchInsertL1CrossMsgDBTx(dbTx *sqlx.Tx, messages []*Cro
"layer1_hash": msg.Layer1Hash,
"layer1_token": msg.Layer1Token,
"layer2_token": msg.Layer2Token,
"token_id": msg.TokenID,
"token_ids": msg.TokenIDs,
"msg_type": Layer1Msg,
}
var exists bool
err = dbTx.QueryRow(`SELECT EXISTS(SELECT 1 FROM cross_message WHERE layer1_hash = $1 AND NOT is_deleted)`, msg.Layer1Hash).Scan(&exists)
if err != nil {
return err
}
if exists {
return fmt.Errorf("BatchInsertL1CrossMsgDBTx: l1 cross msg layer1Hash %v already exists at height %v", msg.Layer1Hash, msg.Height)
}
}
_, err = dbTx.NamedExec(`insert into cross_message(height, sender, target, asset, layer1_hash, layer1_token, layer2_token, token_id, amount, msg_type) values(:height, :sender, :target, :asset, :layer1_hash, :layer1_token, :layer2_token, :token_id, :amount, :msg_type);`, messageMaps)
_, err = dbTx.NamedExec(`insert into cross_message(height, sender, target, asset, layer1_hash, layer1_token, layer2_token, token_ids, amount, msg_type) values(:height, :sender, :target, :asset, :layer1_hash, :layer1_token, :layer2_token, :token_ids, :amount, :msg_type);`, messageMaps)
if err != nil {
log.Error("BatchInsertL1CrossMsgDBTx: failed to insert l1 cross msgs", "err", err)
return err
@@ -92,7 +83,7 @@ func (l *l1CrossMsgOrm) BatchInsertL1CrossMsgDBTx(dbTx *sqlx.Tx, messages []*Cro
// UpdateL1CrossMsgHashDBTx update l1 cross msg hash in db, no need to check msg_type since layer1_hash wont be empty if its layer1 msg
func (l *l1CrossMsgOrm) UpdateL1CrossMsgHashDBTx(ctx context.Context, dbTx *sqlx.Tx, l1Hash, msgHash common.Hash) error {
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update public.cross_message set msg_hash = ? where layer1_hash = ? AND NOT is_deleted;"), msgHash.String(), l1Hash.String()); err != nil {
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update public.cross_message set msg_hash = ? where layer1_hash = ? AND deleted_at IS NULL;"), msgHash.String(), l1Hash.String()); err != nil {
return err
}
return nil
@@ -100,7 +91,7 @@ func (l *l1CrossMsgOrm) UpdateL1CrossMsgHashDBTx(ctx context.Context, dbTx *sqlx
}
func (l *l1CrossMsgOrm) UpdateL1CrossMsgHash(ctx context.Context, l1Hash, msgHash common.Hash) error {
if _, err := l.db.ExecContext(ctx, l.db.Rebind("update public.l1_cross_message set msg_hash = ? where layer1_hash = ? AND NOT is_deleted;"), msgHash.String(), l1Hash.String()); err != nil {
if _, err := l.db.ExecContext(ctx, l.db.Rebind("update public.l1_cross_message set msg_hash = ? where layer1_hash = ? AND deleted_at IS NULL;"), msgHash.String(), l1Hash.String()); err != nil {
return err
}
return nil
@@ -108,7 +99,7 @@ func (l *l1CrossMsgOrm) UpdateL1CrossMsgHash(ctx context.Context, l1Hash, msgHas
}
func (l *l1CrossMsgOrm) GetLatestL1ProcessedHeight() (int64, error) {
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE msg_type = $1 AND NOT is_deleted ORDER BY id DESC LIMIT 1;`, Layer1Msg)
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE msg_type = $1 AND deleted_at IS NULL ORDER BY id DESC LIMIT 1;`, Layer1Msg)
var result sql.NullInt64
if err := row.Scan(&result); err != nil {
if err == sql.ErrNoRows || !result.Valid {
@@ -123,21 +114,21 @@ func (l *l1CrossMsgOrm) GetLatestL1ProcessedHeight() (int64, error) {
}
func (l *l1CrossMsgOrm) DeleteL1CrossMsgAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error {
if _, err := l.db.Exec(`UPDATE cross_message SET is_deleted = true WHERE height > $1 AND msg_type = $2;`, height, Layer1Msg); err != nil {
if _, err := l.db.Exec(`UPDATE cross_message SET deleted_at = current_timestamp WHERE height > $1 AND msg_type = $2;`, height, Layer1Msg); err != nil {
return err
}
return nil
}
func (l *l1CrossMsgOrm) UpdateL1BlockTimestamp(height uint64, timestamp time.Time) error {
if _, err := l.db.Exec(`UPDATE cross_message SET block_timestamp = $1 where height = $2 AND msg_type = $3 AND NOT is_deleted`, timestamp, height, Layer1Msg); err != nil {
if _, err := l.db.Exec(`UPDATE cross_message SET block_timestamp = $1 where height = $2 AND msg_type = $3 AND deleted_at IS NULL`, timestamp, height, Layer1Msg); err != nil {
return err
}
return nil
}
func (l *l1CrossMsgOrm) GetL1EarliestNoBlockTimestampHeight() (uint64, error) {
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE block_timestamp IS NULL AND msg_type = $1 AND NOT is_deleted ORDER BY height ASC LIMIT 1;`, Layer1Msg)
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE block_timestamp IS NULL AND msg_type = $1 AND deleted_at IS NULL ORDER BY height ASC LIMIT 1;`, Layer1Msg)
var result uint64
if err := row.Scan(&result); err != nil {
if err == sql.ErrNoRows {

View File

@@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"errors"
"fmt"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -23,7 +22,7 @@ func NewL2CrossMsgOrm(db *sqlx.DB) L2CrossMsgOrm {
func (l *l2CrossMsgOrm) GetL2CrossMsgByHash(l2Hash common.Hash) (*CrossMsg, error) {
result := &CrossMsg{}
row := l.db.QueryRowx(`SELECT * FROM cross_message WHERE layer2_hash = $1 AND NOT is_deleted;`, l2Hash.String())
row := l.db.QueryRowx(`SELECT * FROM cross_message WHERE layer2_hash = $1 AND deleted_at IS NULL;`, l2Hash.String())
if err := row.StructScan(result); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
@@ -37,7 +36,7 @@ func (l *l2CrossMsgOrm) GetL2CrossMsgByHash(l2Hash common.Hash) (*CrossMsg, erro
// Warning: return empty slice if no data found
func (l *l2CrossMsgOrm) GetL2CrossMsgByAddress(sender common.Address) ([]*CrossMsg, error) {
var results []*CrossMsg
rows, err := l.db.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND msg_type = $2 AND NOT is_deleted;`, sender.String(), Layer2Msg)
rows, err := l.db.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND msg_type = $2 AND deleted_at IS NULL;`, sender.String(), Layer2Msg)
for rows.Next() {
msg := &CrossMsg{}
@@ -56,7 +55,7 @@ func (l *l2CrossMsgOrm) GetL2CrossMsgByAddress(sender common.Address) ([]*CrossM
}
func (l *l2CrossMsgOrm) DeleteL2CrossMsgFromHeightDBTx(dbTx *sqlx.Tx, height int64) error {
_, err := dbTx.Exec(`UPDATE cross_message SET is_deleted = true where height > $1 AND msg_type = $2 ;`, height, Layer2Msg)
_, err := dbTx.Exec(`UPDATE cross_message SET deleted_at = current_timestamp where height > $1 AND msg_type = $2 ;`, height, Layer2Msg)
if err != nil {
log.Error("DeleteL1CrossMsgAfterHeightDBTx: failed to delete", "height", height, "err", err)
return err
@@ -81,20 +80,12 @@ func (l *l2CrossMsgOrm) BatchInsertL2CrossMsgDBTx(dbTx *sqlx.Tx, messages []*Cro
"layer2_hash": msg.Layer2Hash,
"layer1_token": msg.Layer1Token,
"layer2_token": msg.Layer2Token,
"token_id": msg.TokenID,
"token_ids": msg.TokenIDs,
"amount": msg.Amount,
"msg_type": Layer2Msg,
}
var exists bool
err = dbTx.QueryRow(`SELECT EXISTS(SELECT 1 FROM cross_message WHERE layer2_hash = $1 AND NOT is_deleted)`, msg.Layer2Hash).Scan(&exists)
if err != nil {
return err
}
if exists {
return fmt.Errorf("BatchInsertL2CrossMsgDBTx: l2 cross msg layer2Hash %v already exists at height %v", msg.Layer2Hash, msg.Height)
}
}
_, err = dbTx.NamedExec(`insert into cross_message(height, sender, target, asset, layer2_hash, layer1_token, layer2_token, token_id, amount, msg_type) values(:height, :sender, :target, :asset, :layer2_hash, :layer1_token, :layer2_token, :token_id, :amount, :msg_type);`, messageMaps)
_, err = dbTx.NamedExec(`insert into cross_message(height, sender, target, asset, layer2_hash, layer1_token, layer2_token, token_ids, amount, msg_type) values(:height, :sender, :target, :asset, :layer2_hash, :layer1_token, :layer2_token, :token_ids, :amount, :msg_type);`, messageMaps)
if err != nil {
log.Error("BatchInsertL2CrossMsgDBTx: failed to insert l2 cross msgs", "err", err)
return err
@@ -103,21 +94,21 @@ func (l *l2CrossMsgOrm) BatchInsertL2CrossMsgDBTx(dbTx *sqlx.Tx, messages []*Cro
}
func (l *l2CrossMsgOrm) UpdateL2CrossMsgHashDBTx(ctx context.Context, dbTx *sqlx.Tx, l2Hash, msgHash common.Hash) error {
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update public.cross_message set msg_hash = ? where layer2_hash = ? AND NOT is_deleted;"), msgHash.String(), l2Hash.String()); err != nil {
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update cross_message set msg_hash = ? where layer2_hash = ? AND deleted_at IS NULL;"), msgHash.String(), l2Hash.String()); err != nil {
return err
}
return nil
}
func (l *l2CrossMsgOrm) UpdateL2CrossMsgHash(ctx context.Context, l2Hash, msgHash common.Hash) error {
if _, err := l.db.ExecContext(ctx, l.db.Rebind("update public.cross_message set msg_hash = ? where layer2_hash = ? AND NOT is_deleted;"), msgHash.String(), l2Hash.String()); err != nil {
if _, err := l.db.ExecContext(ctx, l.db.Rebind("update cross_message set msg_hash = ? where layer2_hash = ? AND deleted_at IS NULL;"), msgHash.String(), l2Hash.String()); err != nil {
return err
}
return nil
}
func (l *l2CrossMsgOrm) GetLatestL2ProcessedHeight() (int64, error) {
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE msg_type = $1 AND NOT is_deleted ORDER BY id DESC LIMIT 1;`, Layer2Msg)
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE msg_type = $1 AND deleted_at IS NULL ORDER BY id DESC LIMIT 1;`, Layer2Msg)
var result sql.NullInt64
if err := row.Scan(&result); err != nil {
if err == sql.ErrNoRows || !result.Valid {
@@ -132,14 +123,14 @@ func (l *l2CrossMsgOrm) GetLatestL2ProcessedHeight() (int64, error) {
}
func (l *l2CrossMsgOrm) UpdateL2BlockTimestamp(height uint64, timestamp time.Time) error {
if _, err := l.db.Exec(`UPDATE cross_message SET block_timestamp = $1 where height = $2 AND msg_type = $3 AND NOT is_deleted`, timestamp, height, Layer2Msg); err != nil {
if _, err := l.db.Exec(`UPDATE cross_message SET block_timestamp = $1 where height = $2 AND msg_type = $3 AND deleted_at IS NULL`, timestamp, height, Layer2Msg); err != nil {
return err
}
return nil
}
func (l *l2CrossMsgOrm) GetL2EarliestNoBlockTimestampHeight() (uint64, error) {
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE block_timestamp IS NULL AND msg_type = $1 AND NOT is_deleted ORDER BY height ASC LIMIT 1;`, Layer2Msg)
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE block_timestamp IS NULL AND msg_type = $1 AND deleted_at IS NULL ORDER BY height ASC LIMIT 1;`, Layer2Msg)
var result uint64
if err := row.Scan(&result); err != nil {
if err == sql.ErrNoRows {

View File

@@ -3,7 +3,6 @@ package orm
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/ethereum/go-ethereum/log"
@@ -12,6 +11,7 @@ import (
type L2SentMsg struct {
ID uint64 `json:"id" db:"id"`
TxSender string `json:"tx_sender" db:"tx_sender"`
MsgHash string `json:"msg_hash" db:"msg_hash"`
Sender string `json:"sender" db:"sender"`
Target string `json:"target" db:"target"`
@@ -21,7 +21,6 @@ type L2SentMsg struct {
BatchIndex uint64 `json:"batch_index" db:"batch_index"`
MsgProof string `json:"msg_proof" db:"msg_proof"`
MsgData string `json:"msg_data" db:"msg_data"`
IsDeleted bool `json:"is_deleted" db:"is_deleted"`
CreatedAt *time.Time `json:"created_at" db:"created_at"`
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
DeletedAt *time.Time `json:"deleted_at" db:"deleted_at"`
@@ -38,7 +37,7 @@ func NewL2SentMsgOrm(db *sqlx.DB) L2SentMsgOrm {
func (l *l2SentMsgOrm) GetL2SentMsgByHash(msgHash string) (*L2SentMsg, error) {
result := &L2SentMsg{}
row := l.db.QueryRowx(`SELECT * FROM l2_sent_msg WHERE msg_hash = $1 AND NOT is_deleted;`, msgHash)
row := l.db.QueryRowx(`SELECT * FROM l2_sent_msg WHERE msg_hash = $1 AND deleted_at IS NULL;`, msgHash)
if err := row.StructScan(result); err != nil {
return nil, err
}
@@ -53,6 +52,7 @@ func (l *l2SentMsgOrm) BatchInsertL2SentMsgDBTx(dbTx *sqlx.Tx, messages []*L2Sen
messageMaps := make([]map[string]interface{}, len(messages))
for i, msg := range messages {
messageMaps[i] = map[string]interface{}{
"tx_sender": msg.TxSender,
"sender": msg.Sender,
"target": msg.Target,
"value": msg.Value,
@@ -63,25 +63,17 @@ func (l *l2SentMsgOrm) BatchInsertL2SentMsgDBTx(dbTx *sqlx.Tx, messages []*L2Sen
"msg_proof": msg.MsgProof,
"msg_data": msg.MsgData,
}
var exists bool
err = dbTx.QueryRow(`SELECT EXISTS(SELECT 1 FROM l2_sent_msg WHERE (msg_hash = $1 OR nonce = $2) AND NOT is_deleted)`, msg.MsgHash, msg.Nonce).Scan(&exists)
if err != nil {
return err
}
if exists {
return fmt.Errorf("BatchInsertL2SentMsgDBTx: l2 sent msg_hash %v already exists at height %v", msg.MsgHash, msg.Height)
}
}
_, err = dbTx.NamedExec(`insert into l2_sent_msg(sender, target, value, msg_hash, height, nonce, batch_index, msg_proof, msg_data) values(:sender, :target, :value, :msg_hash, :height, :nonce, :batch_index, :msg_proof, :msg_data);`, messageMaps)
_, err = dbTx.NamedExec(`insert into l2_sent_msg(tx_sender, sender, target, value, msg_hash, height, nonce, batch_index, msg_proof, msg_data) values(:tx_sender, :sender, :target, :value, :msg_hash, :height, :nonce, :batch_index, :msg_proof, :msg_data);`, messageMaps)
if err != nil {
log.Error("BatchInsertL2SentMsgDBTx: failed to insert l2 sent msgs", "msg_Hash", "err", err)
log.Error("BatchInsertL2SentMsgDBTx: failed to insert l2 sent msgs", "err", err)
return err
}
return err
}
func (l *l2SentMsgOrm) GetLatestSentMsgHeightOnL2() (int64, error) {
row := l.db.QueryRow(`SELECT height FROM l2_sent_msg WHERE NOT is_deleted ORDER BY nonce DESC LIMIT 1;`)
row := l.db.QueryRow(`SELECT height FROM l2_sent_msg WHERE deleted_at IS NULL ORDER BY nonce DESC LIMIT 1;`)
var result sql.NullInt64
if err := row.Scan(&result); err != nil {
if err == sql.ErrNoRows || !result.Valid {
@@ -96,14 +88,14 @@ func (l *l2SentMsgOrm) GetLatestSentMsgHeightOnL2() (int64, error) {
}
func (l *l2SentMsgOrm) UpdateL2MessageProofInDBTx(ctx context.Context, dbTx *sqlx.Tx, msgHash string, proof string, batch_index uint64) error {
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update l2_sent_msg set msg_proof = ?, batch_index = ? where msg_hash = ? AND NOT is_deleted;"), proof, batch_index, msgHash); err != nil {
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update l2_sent_msg set msg_proof = ?, batch_index = ? where msg_hash = ? AND deleted_at IS NULL;"), proof, batch_index, msgHash); err != nil {
return err
}
return nil
}
func (l *l2SentMsgOrm) GetLatestL2SentMsgBatchIndex() (int64, error) {
row := l.db.QueryRow(`SELECT batch_index FROM l2_sent_msg WHERE msg_proof != '' AND NOT is_deleted ORDER BY batch_index DESC LIMIT 1;`)
row := l.db.QueryRow(`SELECT batch_index FROM l2_sent_msg WHERE msg_proof != '' AND deleted_at IS NULL ORDER BY batch_index DESC LIMIT 1;`)
var result sql.NullInt64
if err := row.Scan(&result); err != nil {
if err == sql.ErrNoRows || !result.Valid {
@@ -119,7 +111,7 @@ func (l *l2SentMsgOrm) GetLatestL2SentMsgBatchIndex() (int64, error) {
func (l *l2SentMsgOrm) GetL2SentMsgMsgHashByHeightRange(startHeight, endHeight uint64) ([]*L2SentMsg, error) {
var results []*L2SentMsg
rows, err := l.db.Queryx(`SELECT * FROM l2_sent_msg WHERE height >= $1 AND height <= $2 AND NOT is_deleted ORDER BY nonce ASC;`, startHeight, endHeight)
rows, err := l.db.Queryx(`SELECT * FROM l2_sent_msg WHERE height >= $1 AND height <= $2 AND deleted_at IS NULL ORDER BY nonce ASC;`, startHeight, endHeight)
if err != nil {
return nil, err
}
@@ -135,7 +127,7 @@ func (l *l2SentMsgOrm) GetL2SentMsgMsgHashByHeightRange(startHeight, endHeight u
func (l *l2SentMsgOrm) GetL2SentMessageByNonce(nonce uint64) (*L2SentMsg, error) {
result := &L2SentMsg{}
row := l.db.QueryRowx(`SELECT * FROM l2_sent_msg WHERE nonce = $1 AND NOT is_deleted;`, nonce)
row := l.db.QueryRowx(`SELECT * FROM l2_sent_msg WHERE nonce = $1 AND deleted_at IS NULL;`, nonce)
err := row.StructScan(result)
if err != nil {
return nil, err
@@ -145,7 +137,7 @@ func (l *l2SentMsgOrm) GetL2SentMessageByNonce(nonce uint64) (*L2SentMsg, error)
func (l *l2SentMsgOrm) GetLatestL2SentMsgLEHeight(endBlockNumber uint64) (*L2SentMsg, error) {
result := &L2SentMsg{}
row := l.db.QueryRowx(`select * from l2_sent_msg where height <= $1 AND NOT is_deleted order by nonce desc limit 1`, endBlockNumber)
row := l.db.QueryRowx(`select * from l2_sent_msg where height <= $1 AND deleted_at IS NULL order by nonce desc limit 1`, endBlockNumber)
err := row.StructScan(result)
if err != nil {
return nil, err
@@ -154,6 +146,6 @@ func (l *l2SentMsgOrm) GetLatestL2SentMsgLEHeight(endBlockNumber uint64) (*L2Sen
}
func (l *l2SentMsgOrm) DeleteL2SentMsgAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error {
_, err := dbTx.Exec(`UPDATE l2_sent_msg SET is_deleted = true WHERE height > $1;`, height)
_, err := dbTx.Exec(`UPDATE l2_sent_msg SET deleted_at = current_timestamp WHERE height > $1;`, height)
return err
}

View File

@@ -8,6 +8,13 @@ import (
"github.com/jmoiron/sqlx"
)
type RelayedMsg struct {
MsgHash string `json:"msg_hash" db:"msg_hash"`
Height uint64 `json:"height" db:"height"`
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
}
type relayedMsgOrm struct {
db *sqlx.DB
}
@@ -33,7 +40,7 @@ func (l *relayedMsgOrm) BatchInsertRelayedMsgDBTx(dbTx *sqlx.Tx, messages []*Rel
}
_, err = dbTx.NamedExec(`insert into relayed_msg(msg_hash, height, layer1_hash, layer2_hash) values(:msg_hash, :height, :layer1_hash, :layer2_hash);`, messageMaps)
if err != nil {
log.Error("BatchInsertRelayedMsgDBTx: failed to insert l1 cross msgs", "msg_Hashe", "err", err)
log.Error("BatchInsertRelayedMsgDBTx: failed to insert relayed msgs", "err", err)
return err
}
return nil
@@ -41,7 +48,7 @@ func (l *relayedMsgOrm) BatchInsertRelayedMsgDBTx(dbTx *sqlx.Tx, messages []*Rel
func (l *relayedMsgOrm) GetRelayedMsgByHash(msg_hash string) (*RelayedMsg, error) {
result := &RelayedMsg{}
row := l.db.QueryRowx(`SELECT msg_hash, height, layer1_hash, layer2_hash FROM relayed_msg WHERE msg_hash = $1 AND NOT is_deleted;`, msg_hash)
row := l.db.QueryRowx(`SELECT msg_hash, height, layer1_hash, layer2_hash FROM relayed_msg WHERE msg_hash = $1 AND deleted_at IS NULL;`, msg_hash)
if err := row.StructScan(result); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
@@ -52,7 +59,7 @@ func (l *relayedMsgOrm) GetRelayedMsgByHash(msg_hash string) (*RelayedMsg, error
}
func (l *relayedMsgOrm) GetLatestRelayedHeightOnL1() (int64, error) {
row := l.db.QueryRow(`SELECT height FROM relayed_msg WHERE layer1_hash != '' AND NOT is_deleted ORDER BY height DESC LIMIT 1;`)
row := l.db.QueryRow(`SELECT height FROM relayed_msg WHERE layer1_hash != '' AND deleted_at IS NULL ORDER BY height DESC LIMIT 1;`)
var result sql.NullInt64
if err := row.Scan(&result); err != nil {
if err == sql.ErrNoRows || !result.Valid {
@@ -67,7 +74,7 @@ func (l *relayedMsgOrm) GetLatestRelayedHeightOnL1() (int64, error) {
}
func (l *relayedMsgOrm) GetLatestRelayedHeightOnL2() (int64, error) {
row := l.db.QueryRow(`SELECT height FROM relayed_msg WHERE layer2_hash != '' AND NOT is_deleted ORDER BY height DESC LIMIT 1;`)
row := l.db.QueryRow(`SELECT height FROM relayed_msg WHERE layer2_hash != '' AND deleted_at IS NULL ORDER BY height DESC LIMIT 1;`)
var result sql.NullInt64
if err := row.Scan(&result); err != nil {
if err == sql.ErrNoRows || !result.Valid {
@@ -82,11 +89,11 @@ func (l *relayedMsgOrm) GetLatestRelayedHeightOnL2() (int64, error) {
}
func (l *relayedMsgOrm) DeleteL1RelayedHashAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error {
_, err := dbTx.Exec(`UPDATE relayed_msg SET is_deleted = true WHERE height > $1 AND layer1_hash != '';`, height)
_, err := dbTx.Exec(`UPDATE relayed_msg SET deleted_at = current_timestamp WHERE height > $1 AND layer1_hash != '';`, height)
return err
}
func (l *relayedMsgOrm) DeleteL2RelayedHashAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error {
_, err := dbTx.Exec(`UPDATE relayed_msg SET is_deleted = true WHERE height > $1 AND layer2_hash != '';`, height)
_, err := dbTx.Exec(`UPDATE relayed_msg SET deleted_at = current_timestamp WHERE height > $1 AND layer2_hash != '';`, height)
return err
}

View File

@@ -68,7 +68,7 @@ func (o *ormFactory) Beginx() (*sqlx.Tx, error) {
func (o *ormFactory) GetTotalCrossMsgCountByAddress(sender string) (uint64, error) {
var count uint64
row := o.DB.QueryRowx(`SELECT COUNT(*) FROM cross_message WHERE sender = $1 AND NOT is_deleted;`, sender)
row := o.DB.QueryRowx(`SELECT COUNT(*) FROM cross_message WHERE sender = $1 AND deleted_at IS NULL;`, sender)
if err := row.Scan(&count); err != nil {
return 0, err
}
@@ -78,7 +78,7 @@ func (o *ormFactory) GetTotalCrossMsgCountByAddress(sender string) (uint64, erro
func (o *ormFactory) GetCrossMsgsByAddressWithOffset(sender string, offset int64, limit int64) ([]*orm.CrossMsg, error) {
para := sender
var results []*orm.CrossMsg
rows, err := o.DB.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND NOT is_deleted ORDER BY block_timestamp DESC NULLS FIRST, id DESC LIMIT $2 OFFSET $3;`, para, limit, offset)
rows, err := o.DB.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND deleted_at IS NULL ORDER BY block_timestamp DESC NULLS FIRST, id DESC LIMIT $2 OFFSET $3;`, para, limit, offset)
if err != nil || rows == nil {
return nil, err
}

View File

@@ -77,7 +77,7 @@ func GetCrossTxClaimInfo(msgHash string, db db.OrmFactory) *UserClaimInfo {
Value: l2sentMsg.Value,
Nonce: strconv.FormatUint(l2sentMsg.Nonce, 10),
Message: l2sentMsg.MsgData,
Proof: l2sentMsg.MsgProof,
Proof: "0x" + l2sentMsg.MsgProof,
BatchHash: batch.BatchHash,
BatchIndex: strconv.FormatUint(l2sentMsg.BatchIndex, 10),
}

View File

@@ -18,6 +18,11 @@ type MsgHashWrapper struct {
TxHash common.Hash
}
type L2SentMsgWrapper struct {
L2SentMsg *orm.L2SentMsg
TxHash common.Hash
}
type CachedParsedTxCalldata struct {
CallDataIndex uint64
BatchIndices []uint64
@@ -81,7 +86,7 @@ func ParseBackendL1EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
Layer1Hash: vlog.TxHash.Hex(),
Layer1Token: event.L1Token.Hex(),
Layer2Token: event.L2Token.Hex(),
TokenID: event.TokenID.Uint64(),
TokenIDs: []string{event.TokenID.String()},
})
case backendabi.L1DepositERC1155Sig:
event := backendabi.ERC1155MessageEvent{}
@@ -98,7 +103,7 @@ func ParseBackendL1EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
Layer1Hash: vlog.TxHash.Hex(),
Layer1Token: event.L1Token.Hex(),
Layer2Token: event.L2Token.Hex(),
TokenID: event.TokenID.Uint64(),
TokenIDs: []string{event.TokenID.String()},
Amount: event.Amount.String(),
})
case backendabi.L1SentMessageEventSignature:
@@ -131,15 +136,14 @@ func ParseBackendL1EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
return l1CrossMsg, msgHashes, relayedMsgs, nil
}
func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrapper, []*orm.RelayedMsg, []*orm.L2SentMsg, error) {
func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []*orm.RelayedMsg, []L2SentMsgWrapper, error) {
// Need use contract abi to parse event Log
// Can only be tested after we have our contracts set up
var l2CrossMsg []*orm.CrossMsg
// this is use to confirm finalized l1 msg
var relayedMsgs []*orm.RelayedMsg
var l2SentMsg []*orm.L2SentMsg
var msgHashes []MsgHashWrapper
var l2SentMsg []L2SentMsgWrapper
for _, vlog := range logs {
switch vlog.Topics[0] {
case backendabi.L2WithdrawETHSig:
@@ -147,7 +151,7 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
err := UnpackLog(backendabi.L2ETHGatewayABI, &event, "WithdrawETH", vlog)
if err != nil {
log.Warn("Failed to unpack WithdrawETH event", "err", err)
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
return l2CrossMsg, relayedMsgs, l2SentMsg, err
}
l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{
Height: vlog.BlockNumber,
@@ -162,7 +166,7 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
err := UnpackLog(backendabi.L2StandardERC20GatewayABI, &event, "WithdrawERC20", vlog)
if err != nil {
log.Warn("Failed to unpack WithdrawERC20 event", "err", err)
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
return l2CrossMsg, relayedMsgs, l2SentMsg, err
}
l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{
Height: vlog.BlockNumber,
@@ -179,7 +183,7 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
err := UnpackLog(backendabi.L2ERC721GatewayABI, &event, "WithdrawERC721", vlog)
if err != nil {
log.Warn("Failed to unpack WithdrawERC721 event", "err", err)
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
return l2CrossMsg, relayedMsgs, l2SentMsg, err
}
l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{
Height: vlog.BlockNumber,
@@ -189,14 +193,14 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
Layer2Hash: vlog.TxHash.Hex(),
Layer1Token: event.L1Token.Hex(),
Layer2Token: event.L2Token.Hex(),
TokenID: event.TokenID.Uint64(),
TokenIDs: []string{event.TokenID.String()},
})
case backendabi.L2WithdrawERC1155Sig:
event := backendabi.ERC1155MessageEvent{}
err := UnpackLog(backendabi.L2ERC1155GatewayABI, &event, "WithdrawERC1155", vlog)
if err != nil {
log.Warn("Failed to unpack WithdrawERC1155 event", "err", err)
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
return l2CrossMsg, relayedMsgs, l2SentMsg, err
}
l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{
Height: vlog.BlockNumber,
@@ -206,7 +210,7 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
Layer2Hash: vlog.TxHash.Hex(),
Layer1Token: event.L1Token.Hex(),
Layer2Token: event.L2Token.Hex(),
TokenID: event.TokenID.Uint64(),
TokenIDs: []string{event.TokenID.String()},
Amount: event.Amount.String(),
})
case backendabi.L2SentMessageEventSignature:
@@ -214,27 +218,28 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
err := UnpackLog(backendabi.L2ScrollMessengerABI, &event, "SentMessage", vlog)
if err != nil {
log.Warn("Failed to unpack SentMessage event", "err", err)
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
return l2CrossMsg, relayedMsgs, l2SentMsg, err
}
msgHash := ComputeMessageHash(event.Sender, event.Target, event.Value, event.MessageNonce, event.Message)
msgHashes = append(msgHashes, MsgHashWrapper{
MsgHash: msgHash,
TxHash: vlog.TxHash})
l2SentMsg = append(l2SentMsg, &orm.L2SentMsg{
Sender: event.Sender.Hex(),
Target: event.Target.Hex(),
Value: event.Value.String(),
MsgHash: msgHash.Hex(),
Height: vlog.BlockNumber,
Nonce: event.MessageNonce.Uint64(),
MsgData: hexutil.Encode(event.Message),
})
l2SentMsg = append(l2SentMsg,
L2SentMsgWrapper{
TxHash: vlog.TxHash,
L2SentMsg: &orm.L2SentMsg{
Sender: event.Sender.Hex(),
Target: event.Target.Hex(),
Value: event.Value.String(),
MsgHash: msgHash.Hex(),
Height: vlog.BlockNumber,
Nonce: event.MessageNonce.Uint64(),
MsgData: hexutil.Encode(event.Message),
},
})
case backendabi.L2RelayedMessageEventSignature:
event := backendabi.L2RelayedMessageEvent{}
err := UnpackLog(backendabi.L2ScrollMessengerABI, &event, "RelayedMessage", vlog)
if err != nil {
log.Warn("Failed to unpack RelayedMessage event", "err", err)
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
return l2CrossMsg, relayedMsgs, l2SentMsg, err
}
relayedMsgs = append(relayedMsgs, &orm.RelayedMsg{
MsgHash: event.MessageHash.String(),
@@ -244,7 +249,7 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
}
}
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, nil
return l2CrossMsg, relayedMsgs, l2SentMsg, nil
}
func ParseBatchInfoFromScrollChain(ctx context.Context, client *ethclient.Client, logs []types.Log) ([]*orm.RollupBatch, error) {

View File

@@ -21,6 +21,7 @@ import (
)
var app *cli.App
var logger log.Logger
func init() {
// Set up event-watcher app info.
@@ -32,7 +33,9 @@ func init() {
app.Flags = append(app.Flags, cutils.CommonFlags...)
app.Commands = []*cli.Command{}
app.Before = func(ctx *cli.Context) error {
return cutils.LogSetup(ctx)
var err error
logger, err = cutils.LogSetup(ctx)
return err
}
// Register `event-watcher-test` app for integration-test.
cutils.RegisterSimulation(app, cutils.EventWatcherApp)
@@ -48,7 +51,7 @@ func action(ctx *cli.Context) error {
subCtx, cancel := context.WithCancel(ctx.Context)
// Init db connection
db, err := utils.InitDB(cfg.DBConfig)
db, err := utils.InitDB(cfg.DBConfig, logger)
if err != nil {
log.Crit("failed to init db connection", "err", err)
}

View File

@@ -22,6 +22,7 @@ import (
)
var app *cli.App
var logger log.Logger
func init() {
// Set up gas-oracle app info.
@@ -34,7 +35,9 @@ func init() {
app.Flags = append(app.Flags, cutils.CommonFlags...)
app.Commands = []*cli.Command{}
app.Before = func(ctx *cli.Context) error {
return cutils.LogSetup(ctx)
var err error
logger, err = cutils.LogSetup(ctx)
return err
}
// Register `gas-oracle-test` app for integration-test.
cutils.RegisterSimulation(app, cutils.GasOracleApp)
@@ -49,7 +52,7 @@ func action(ctx *cli.Context) error {
}
subCtx, cancel := context.WithCancel(ctx.Context)
// Init db connection
db, err := utils.InitDB(cfg.DBConfig)
db, err := utils.InitDB(cfg.DBConfig, logger)
if err != nil {
log.Crit("failed to init db connection", "err", err)
}

View File

@@ -20,6 +20,7 @@ import (
)
var app *cli.App
var logger log.Logger
func init() {
// Set up message-relayer app info.
@@ -32,7 +33,9 @@ func init() {
app.Flags = append(app.Flags, cutils.CommonFlags...)
app.Commands = []*cli.Command{}
app.Before = func(ctx *cli.Context) error {
return cutils.LogSetup(ctx)
var err error
logger, err = cutils.LogSetup(ctx)
return err
}
// Register `message-relayer-test` app for integration-test.
cutils.RegisterSimulation(app, cutils.MessageRelayerApp)
@@ -48,7 +51,7 @@ func action(ctx *cli.Context) error {
subCtx, cancel := context.WithCancel(ctx.Context)
// Init db connection
db, err := utils.InitDB(cfg.DBConfig)
db, err := utils.InitDB(cfg.DBConfig, logger)
if err != nil {
log.Crit("failed to init db connection", "err", err)
}

View File

@@ -22,6 +22,7 @@ import (
)
var app *cli.App
var logger log.Logger
func init() {
// Set up rollup-relayer app info.
@@ -34,7 +35,9 @@ func init() {
app.Flags = append(app.Flags, cutils.RollupRelayerFlags...)
app.Commands = []*cli.Command{}
app.Before = func(ctx *cli.Context) error {
return cutils.LogSetup(ctx)
var err error
logger, err = cutils.LogSetup(ctx)
return err
}
// Register `rollup-relayer-test` app for integration-test.
cutils.RegisterSimulation(app, cutils.RollupRelayerApp)
@@ -50,7 +53,7 @@ func action(ctx *cli.Context) error {
subCtx, cancel := context.WithCancel(ctx.Context)
// Init db connection
db, err := utils.InitDB(cfg.DBConfig)
db, err := utils.InitDB(cfg.DBConfig, logger)
if err != nil {
log.Crit("failed to init db connection", "err", err)
}

View File

@@ -49,7 +49,9 @@ var (
)
func setupL1RelayerDB(t *testing.T) *gorm.DB {
db, err := bridgeUtils.InitDB(cfg.DBConfig)
logger, err1 := utils.LogSetup(nil)
assert.NoError(t, err1)
db, err := bridgeUtils.InitDB(cfg.DBConfig, logger)
assert.NoError(t, err)
sqlDB, err := db.DB()
assert.NoError(t, err)

View File

@@ -24,7 +24,9 @@ import (
)
func setupL2RelayerDB(t *testing.T) *gorm.DB {
db, err := bridgeUtils.InitDB(cfg.DBConfig)
logger, err1 := utils.LogSetup(nil)
assert.NoError(t, err1)
db, err := bridgeUtils.InitDB(cfg.DBConfig, logger)
assert.NoError(t, err)
sqlDB, err := db.DB()
assert.NoError(t, err)

View File

@@ -10,6 +10,7 @@ import (
"gorm.io/gorm"
"scroll-tech/common/docker"
cutils "scroll-tech/common/utils"
"scroll-tech/bridge/internal/config"
"scroll-tech/bridge/internal/orm/migrate"
@@ -74,7 +75,9 @@ func setupEnv(t *testing.T) (err error) {
}
func setupDB(t *testing.T) *gorm.DB {
db, err := utils.InitDB(cfg.DBConfig)
logger, err1 := cutils.LogSetup(nil)
assert.NoError(t, err1)
db, err := utils.InitDB(cfg.DBConfig, logger)
assert.NoError(t, err)
sqlDB, err := db.DB()
assert.NoError(t, err)

View File

@@ -12,6 +12,7 @@ import (
"scroll-tech/common/docker"
"scroll-tech/common/types"
cutils "scroll-tech/common/utils"
"scroll-tech/bridge/internal/config"
"scroll-tech/bridge/internal/orm/migrate"
@@ -45,6 +46,10 @@ func TestMain(m *testing.M) {
func setupEnv(t *testing.T) {
base = docker.NewDockerApp()
base.RunDBImage(t)
logger, err1 := cutils.LogSetup(nil)
assert.NoError(t, err1)
var err error
db, err = utils.InitDB(
&config.DBConfig{
@@ -53,6 +58,7 @@ func setupEnv(t *testing.T) {
MaxOpenNum: base.DBConfig.MaxOpenNum,
MaxIdleNum: base.DBConfig.MaxIdleNum,
},
logger,
)
assert.NoError(t, err)
sqlDB, err := db.DB()

View File

@@ -1,17 +1,52 @@
package utils
import (
"context"
"time"
"github.com/scroll-tech/go-ethereum/log"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"gorm.io/gorm/utils"
"scroll-tech/bridge/internal/config"
)
type gormLogger struct {
gethLogger log.Logger
}
func (g *gormLogger) LogMode(level logger.LogLevel) logger.Interface {
return g
}
func (g *gormLogger) Info(_ context.Context, msg string, data ...interface{}) {
g.gethLogger.Info(msg, data)
}
func (g *gormLogger) Warn(_ context.Context, msg string, data ...interface{}) {
g.gethLogger.Warn(msg, data)
}
func (g *gormLogger) Error(_ context.Context, msg string, data ...interface{}) {
g.gethLogger.Error(msg, data)
}
func (g *gormLogger) Trace(_ context.Context, begin time.Time, fc func() (string, int64), err error) {
elapsed := time.Since(begin)
rows, sql := fc()
g.gethLogger.Debug("gorm", "line", utils.FileWithLineNum(), "cost", elapsed, "rows", sql, "sql", rows)
}
// InitDB init the db handler
func InitDB(config *config.DBConfig) (*gorm.DB, error) {
func InitDB(config *config.DBConfig, gethLogger log.Logger) (*gorm.DB, error) {
tmpGormLogger := gormLogger{
gethLogger: gethLogger,
}
db, err := gorm.Open(postgres.Open(config.DSN), &gorm.Config{
Logger: logger.Default.LogMode(logger.Info),
Logger: &tmpGormLogger,
})
if err != nil {
return nil, err

View File

@@ -12,6 +12,7 @@ import (
"gorm.io/gorm"
"scroll-tech/common/docker"
cutils "scroll-tech/common/utils"
bcmd "scroll-tech/bridge/cmd"
"scroll-tech/bridge/internal/config"
@@ -52,7 +53,9 @@ func setupDB(t *testing.T) *gorm.DB {
MaxOpenNum: base.DBConfig.MaxOpenNum,
MaxIdleNum: base.DBConfig.MaxIdleNum,
}
db, err := utils.InitDB(cfg)
logger, err1 := cutils.LogSetup(nil)
assert.NoError(t, err1)
db, err := utils.InitDB(cfg, logger)
assert.NoError(t, err)
sqlDB, err := db.DB()
assert.NoError(t, err)

View File

@@ -1,56 +0,0 @@
package utils
import (
"context"
"fmt"
"math/big"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/rpc"
)
type ethClient interface {
BlockNumber(ctx context.Context) (uint64, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
}
// GetLatestConfirmedBlockNumber get confirmed block number by rpc.BlockNumber type.
func GetLatestConfirmedBlockNumber(ctx context.Context, client ethClient, confirm rpc.BlockNumber) (uint64, error) {
switch true {
case confirm == rpc.SafeBlockNumber || confirm == rpc.FinalizedBlockNumber:
var tag *big.Int
if confirm == rpc.FinalizedBlockNumber {
tag = big.NewInt(int64(rpc.FinalizedBlockNumber))
} else {
tag = big.NewInt(int64(rpc.SafeBlockNumber))
}
header, err := client.HeaderByNumber(ctx, tag)
if err != nil {
return 0, err
}
if !header.Number.IsInt64() {
return 0, fmt.Errorf("received invalid block confirm: %v", header.Number)
}
return header.Number.Uint64(), nil
case confirm == rpc.LatestBlockNumber:
number, err := client.BlockNumber(ctx)
if err != nil {
return 0, err
}
return number, nil
case confirm.Int64() >= 0: // If it's positive integer, consider it as a certain confirm value.
number, err := client.BlockNumber(ctx)
if err != nil {
return 0, err
}
cfmNum := uint64(confirm.Int64())
if number >= cfmNum {
return number - cfmNum, nil
}
return 0, nil
default:
return 0, fmt.Errorf("unknown confirmation type: %v", confirm)
}
}

View File

@@ -1,134 +0,0 @@
package utils
import (
"context"
"encoding/json"
"math/big"
"testing"
"github.com/stretchr/testify/assert"
"github.com/scroll-tech/go-ethereum/common/math"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/rpc"
)
var (
tests = []struct {
input string
mustFail bool
expected rpc.BlockNumber
}{
{`"0x"`, true, rpc.BlockNumber(0)},
{`"0x0"`, false, rpc.BlockNumber(0)},
{`"0X1"`, false, rpc.BlockNumber(1)},
{`"0x00"`, true, rpc.BlockNumber(0)},
{`"0x01"`, true, rpc.BlockNumber(0)},
{`"0x1"`, false, rpc.BlockNumber(1)},
{`"0x12"`, false, rpc.BlockNumber(18)},
{`"0x7fffffffffffffff"`, false, rpc.BlockNumber(math.MaxInt64)},
{`"0x8000000000000000"`, true, rpc.BlockNumber(0)},
{"0", true, rpc.BlockNumber(0)},
{`"ff"`, true, rpc.BlockNumber(0)},
{`"safe"`, false, rpc.SafeBlockNumber},
{`"finalized"`, false, rpc.FinalizedBlockNumber},
{`"pending"`, false, rpc.PendingBlockNumber},
{`"latest"`, false, rpc.LatestBlockNumber},
{`"earliest"`, false, rpc.EarliestBlockNumber},
{`someString`, true, rpc.BlockNumber(0)},
{`""`, true, rpc.BlockNumber(0)},
{``, true, rpc.BlockNumber(0)},
}
)
func TestUnmarshalJSON(t *testing.T) {
for i, test := range tests {
var num rpc.BlockNumber
err := json.Unmarshal([]byte(test.input), &num)
if test.mustFail && err == nil {
t.Errorf("Test %d should fail", i)
continue
}
if !test.mustFail && err != nil {
t.Errorf("Test %d should pass but got err: %v", i, err)
continue
}
if num != test.expected {
t.Errorf("Test %d got unexpected value, want %d, got %d", i, test.expected, num)
}
}
}
func TestMarshalJSON(t *testing.T) {
for i, test := range tests {
var num rpc.BlockNumber
want, err := json.Marshal(test.expected)
assert.NoError(t, err)
if !test.mustFail {
err = json.Unmarshal([]byte(test.input), &num)
assert.NoError(t, err)
got, err := json.Marshal(&num)
assert.NoError(t, err)
if string(want) != string(got) {
t.Errorf("Test %d got unexpected value, want %d, got %d", i, test.expected, num)
}
}
}
}
type MockEthClient struct {
val uint64
}
func (e MockEthClient) BlockNumber(ctx context.Context) (uint64, error) {
return e.val, nil
}
func (e MockEthClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
var blockNumber int64
switch number.Int64() {
case int64(rpc.LatestBlockNumber):
blockNumber = int64(e.val)
case int64(rpc.SafeBlockNumber):
blockNumber = int64(e.val) - 6
case int64(rpc.FinalizedBlockNumber):
blockNumber = int64(e.val) - 12
default:
blockNumber = number.Int64()
}
if blockNumber < 0 {
blockNumber = 0
}
return &types.Header{Number: new(big.Int).SetInt64(blockNumber)}, nil
}
func TestGetLatestConfirmedBlockNumber(t *testing.T) {
ctx := context.Background()
client := MockEthClient{}
testCases := []struct {
blockNumber uint64
confirmation rpc.BlockNumber
expectedResult uint64
}{
{5, 6, 0},
{7, 6, 1},
{10, 2, 8},
{0, 1, 0},
{3, 0, 3},
{15, 15, 0},
{16, rpc.SafeBlockNumber, 10},
{22, rpc.FinalizedBlockNumber, 10},
{10, rpc.LatestBlockNumber, 10},
{5, rpc.SafeBlockNumber, 0},
{11, rpc.FinalizedBlockNumber, 0},
}
for _, testCase := range testCases {
client.val = testCase.blockNumber
confirmed, err := GetLatestConfirmedBlockNumber(ctx, &client, testCase.confirmation)
assert.NoError(t, err)
assert.Equal(t, testCase.expectedResult, confirmed)
}
}

View File

@@ -1,65 +0,0 @@
package utils
import (
"fmt"
"math/big"
"github.com/scroll-tech/go-ethereum/accounts/abi"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/crypto"
bridgeabi "scroll-tech/bridge/abi"
)
// Keccak2 compute the keccack256 of two concatenations of bytes32
func Keccak2(a common.Hash, b common.Hash) common.Hash {
return common.BytesToHash(crypto.Keccak256(append(a.Bytes()[:], b.Bytes()[:]...)))
}
// ComputeMessageHash compute the message hash
func ComputeMessageHash(
sender common.Address,
target common.Address,
value *big.Int,
messageNonce *big.Int,
message []byte,
) common.Hash {
data, _ := bridgeabi.L2ScrollMessengerABI.Pack("relayMessage", sender, target, value, messageNonce, message)
return common.BytesToHash(crypto.Keccak256(data))
}
// BufferToUint256Le convert bytes array to uint256 array assuming little-endian
func BufferToUint256Le(buffer []byte) []*big.Int {
buffer256 := make([]*big.Int, len(buffer)/32)
for i := 0; i < len(buffer)/32; i++ {
v := big.NewInt(0)
shft := big.NewInt(1)
for j := 0; j < 32; j++ {
v = new(big.Int).Add(v, new(big.Int).Mul(shft, big.NewInt(int64(buffer[i*32+j]))))
shft = new(big.Int).Mul(shft, big.NewInt(256))
}
buffer256[i] = v
}
return buffer256
}
// UnpackLog unpacks a retrieved log into the provided output structure.
// @todo: add unit test.
func UnpackLog(c *abi.ABI, out interface{}, event string, log types.Log) error {
if log.Topics[0] != c.Events[event].ID {
return fmt.Errorf("event signature mismatch")
}
if len(log.Data) > 0 {
if err := c.UnpackIntoInterface(out, event, log.Data); err != nil {
return err
}
}
var indexed abi.Arguments
for _, arg := range c.Events[event].Inputs {
if arg.Indexed {
indexed = append(indexed, arg)
}
}
return abi.ParseTopics(out, indexed, log.Topics[1:])
}

View File

@@ -1,49 +0,0 @@
package utils
import (
"math/big"
"testing"
"github.com/scroll-tech/go-ethereum/common"
"github.com/stretchr/testify/assert"
)
func TestKeccak2(t *testing.T) {
hash := Keccak2(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"), common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"))
if hash != common.HexToHash("0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5") {
t.Fatalf("Invalid keccak, want %s, got %s", "0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5", hash.Hex())
}
hash = Keccak2(common.HexToHash("0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5"), common.HexToHash("0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5"))
if hash != common.HexToHash("0xb4c11951957c6f8f642c4af61cd6b24640fec6dc7fc607ee8206a99e92410d30") {
t.Fatalf("Invalid keccak, want %s, got %s", "0xb4c11951957c6f8f642c4af61cd6b24640fec6dc7fc607ee8206a99e92410d30", hash.Hex())
}
hash = Keccak2(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001"), common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000002"))
if hash != common.HexToHash("0xe90b7bceb6e7df5418fb78d8ee546e97c83a08bbccc01a0644d599ccd2a7c2e0") {
t.Fatalf("Invalid keccak, want %s, got %s", "0xe90b7bceb6e7df5418fb78d8ee546e97c83a08bbccc01a0644d599ccd2a7c2e0", hash.Hex())
}
}
func TestComputeMessageHash(t *testing.T) {
hash := ComputeMessageHash(
common.HexToAddress("0x1C5A77d9FA7eF466951B2F01F724BCa3A5820b63"),
common.HexToAddress("0x4592D8f8D7B001e72Cb26A73e4Fa1806a51aC79d"),
big.NewInt(0),
big.NewInt(1),
[]byte("testbridgecontract"),
)
assert.Equal(t, "0xda253c04595a49017bb54b1b46088c69752b5ad2f0c47971ac76b8b25abec202", hash.String())
}
func TestBufferToUint256Le(t *testing.T) {
input := []byte{
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
}
expectedOutput := []*big.Int{big.NewInt(1)}
result := BufferToUint256Le(input)
assert.Equal(t, expectedOutput, result)
}

View File

@@ -5,8 +5,6 @@ import (
"database/sql"
"fmt"
"time"
"scroll-tech/common/types/message"
)
// L1BlockStatus represents current l1 block processing status
@@ -161,11 +159,18 @@ type RollerStatus struct {
// SessionInfo is assigned rollers info of a block batch (session)
type SessionInfo struct {
ID string `json:"id"`
Rollers map[string]*RollerStatus `json:"rollers"`
StartTimestamp int64 `json:"start_timestamp"`
Attempts uint8 `json:"attempts,omitempty"`
ProveType message.ProveType `json:"prove_type,omitempty"`
ID int `json:"id" db:"id"`
TaskID string `json:"task_id" db:"task_id"`
RollerPublicKey string `json:"roller_public_key" db:"roller_public_key"`
ProveType int16 `json:"prove_type" db:"prove_type"`
RollerName string `json:"roller_name" db:"roller_name"`
ProvingStatus int16 `json:"proving_status" db:"proving_status"`
FailureType int16 `json:"failure_type" db:"failure_type"`
Reward uint64 `json:"reward" db:"reward"`
Proof []byte `json:"proof" db:"proof"`
CreatedAt *time.Time `json:"created_at" db:"created_at"`
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
DeletedAt *time.Time `json:"deleted_at" db:"deleted_at"`
}
// ProvingStatus block_batch proving_status (unassigned, assigned, proved, verified, submitted)

View File

@@ -13,7 +13,7 @@ import (
)
// LogSetup is for setup logger
func LogSetup(ctx *cli.Context) error {
func LogSetup(ctx *cli.Context) (log.Logger, error) {
var ostream log.Handler
if logFile := ctx.String(LogFileFlag.Name); len(logFile) > 0 {
fp, err := os.OpenFile(filepath.Clean(logFile), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
@@ -38,6 +38,7 @@ func LogSetup(ctx *cli.Context) error {
glogger := log.NewGlogHandler(ostream)
// Set log level
glogger.Verbosity(log.Lvl(ctx.Int(VerbosityFlag.Name)))
log.Root().SetHandler(glogger)
return nil
logger := log.Root()
logger.SetHandler(glogger)
return logger, nil
}

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "v4.0.6"
var tag = "v4.0.7"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -51,10 +51,12 @@ func (m *Manager) ListRollers() ([]*RollerInfo, error) {
PublicKey: pk,
}
for id, sess := range m.sessions {
if _, ok := sess.info.Rollers[pk]; ok {
info.ActiveSessionStartTime = time.Unix(sess.info.StartTimestamp, 0)
info.ActiveSession = id
break
for _, sessionInfo := range sess.sessionInfos {
if sessionInfo.RollerPublicKey == pk {
info.ActiveSessionStartTime = *sessionInfo.CreatedAt
info.ActiveSession = id
break
}
}
}
res = append(res, info)
@@ -66,14 +68,14 @@ func (m *Manager) ListRollers() ([]*RollerInfo, error) {
func newSessionInfo(sess *session, status types.ProvingStatus, errMsg string, finished bool) *SessionInfo {
now := time.Now()
var nameList []string
for pk := range sess.info.Rollers {
nameList = append(nameList, sess.info.Rollers[pk].Name)
for _, sessionInfo := range sess.sessionInfos {
nameList = append(nameList, sessionInfo.RollerName)
}
info := SessionInfo{
ID: sess.info.ID,
ID: sess.taskID,
Status: status.String(),
AssignedRollers: nameList,
StartTime: time.Unix(sess.info.StartTimestamp, 0),
StartTime: *sess.sessionInfos[0].CreatedAt,
Error: errMsg,
}
if finished {

View File

@@ -7,7 +7,6 @@ import (
"os/signal"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
"github.com/urfave/cli/v2"
@@ -36,7 +35,8 @@ func init() {
app.Flags = append(app.Flags, apiFlags...)
app.Before = func(ctx *cli.Context) error {
return utils.LogSetup(ctx)
_, err := utils.LogSetup(ctx)
return err
}
// Register `coordinator-test` app for integration-test.

View File

@@ -57,7 +57,8 @@ type rollerProofStatus struct {
// Contains all the information on an ongoing proof generation session.
type session struct {
info *types.SessionInfo
taskID string
sessionInfos []*types.SessionInfo
// finish channel is used to pass the public key of the rollers who finished proving process.
finishChan chan rollerProofStatus
}
@@ -248,24 +249,21 @@ func (m *Manager) restorePrevSessions() {
log.Error("failed to recover roller session info from db", "error", err)
return
}
sessionInfosMaps := make(map[string][]*types.SessionInfo)
for _, v := range prevSessions {
log.Info("restore roller info for session", "session start time", v.CreatedAt, "session id", v.TaskID, "roller name",
v.RollerName, "prove type", v.ProveType, "public key", v.RollerPublicKey, "proof status", v.ProvingStatus)
sessionInfosMaps[v.TaskID] = append(sessionInfosMaps[v.TaskID], v)
}
for taskID, sessionInfos := range sessionInfosMaps {
sess := &session{
info: v,
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
taskID: taskID,
sessionInfos: sessionInfos,
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
m.sessions[sess.info.ID] = sess
log.Info("Coordinator restart reload sessions", "session start time", time.Unix(sess.info.StartTimestamp, 0))
for _, roller := range sess.info.Rollers {
log.Info(
"restore roller info for session",
"session id", sess.info.ID,
"roller name", roller.Name,
"prove type", sess.info.ProveType,
"public key", roller.PublicKey,
"proof status", roller.Status)
}
m.sessions[taskID] = sess
go m.CollectProofs(sess)
}
@@ -287,36 +285,40 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
if !ok {
return fmt.Errorf("proof generation session for id %v does not existID", msg.ID)
}
proofTime := time.Since(time.Unix(sess.info.StartTimestamp, 0))
var tmpSessionInfo *types.SessionInfo
for _, si := range sess.sessionInfos {
// get the send session info of this proof msg
if si.TaskID == msg.ID && si.RollerPublicKey == pk {
tmpSessionInfo = si
}
}
if tmpSessionInfo == nil {
return fmt.Errorf("proof generation session for id %v pk:%s does not existID", msg.ID, pk)
}
proofTime := time.Since(*tmpSessionInfo.CreatedAt)
proofTimeSec := uint64(proofTime.Seconds())
// Ensure this roller is eligible to participate in the session.
roller, ok := sess.info.Rollers[pk]
if !ok {
return fmt.Errorf("roller %s %s (%s) is not eligible to partake in proof session %v", roller.Name, sess.info.ProveType, roller.PublicKey, msg.ID)
}
if roller.Status == types.RollerProofValid {
if types.RollerProveStatus(tmpSessionInfo.ProvingStatus) == types.RollerProofValid {
// In order to prevent DoS attacks, it is forbidden to repeatedly submit valid proofs.
// TODO: Defend invalid proof resubmissions by one of the following two methods:
// (i) slash the roller for each submission of invalid proof
// (ii) set the maximum failure retry times
log.Warn(
"roller has already submitted valid proof in proof session",
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"prove type", sess.info.ProveType,
"roller name", tmpSessionInfo.RollerName,
"roller pk", tmpSessionInfo.RollerPublicKey,
"prove type", tmpSessionInfo.ProveType,
"proof id", msg.ID,
)
return nil
}
log.Info(
"handling zk proof",
"proof id", msg.ID,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"prove type", sess.info.ProveType,
"proof time", proofTimeSec,
)
log.Info("handling zk proof", "proof id", msg.ID, "roller name", tmpSessionInfo.RollerName, "roller pk",
tmpSessionInfo.RollerPublicKey, "prove type", tmpSessionInfo.ProveType, "proof time", proofTimeSec)
defer func() {
// TODO: maybe we should use db tx for the whole process?
@@ -344,12 +346,12 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
if msg.Status != message.StatusOk {
coordinatorProofsGeneratedFailedTimeTimer.Update(proofTime)
m.updateMetricRollerProofsGeneratedFailedTimeTimer(roller.PublicKey, proofTime)
m.updateMetricRollerProofsGeneratedFailedTimeTimer(tmpSessionInfo.RollerPublicKey, proofTime)
log.Info(
"proof generated by roller failed",
"proof id", msg.ID,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"roller name", tmpSessionInfo.RollerName,
"roller pk", tmpSessionInfo.RollerPublicKey,
"prove type", msg.Type,
"proof time", proofTimeSec,
"error", msg.Error,
@@ -383,8 +385,8 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
if verifyErr != nil {
// TODO: this is only a temp workaround for testnet, we should return err in real cases
success = false
log.Error("Failed to verify zk proof", "proof id", msg.ID, "roller name", roller.Name,
"roller pk", roller.PublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", verifyErr)
log.Error("Failed to verify zk proof", "proof id", msg.ID, "roller name", tmpSessionInfo.RollerName,
"roller pk", tmpSessionInfo.RollerPublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", verifyErr)
// TODO: Roller needs to be slashed if proof is invalid.
}
@@ -411,18 +413,32 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
}
coordinatorProofsVerifiedSuccessTimeTimer.Update(proofTime)
m.updateMetricRollerProofsVerifiedSuccessTimeTimer(roller.PublicKey, proofTime)
log.Info("proof verified by coordinator success", "proof id", msg.ID, "roller name", roller.Name,
"roller pk", roller.PublicKey, "prove type", msg.Type, "proof time", proofTimeSec)
m.updateMetricRollerProofsVerifiedSuccessTimeTimer(tmpSessionInfo.RollerPublicKey, proofTime)
log.Info("proof verified by coordinator success", "proof id", msg.ID, "roller name", tmpSessionInfo.RollerName,
"roller pk", tmpSessionInfo.RollerPublicKey, "prove type", msg.Type, "proof time", proofTimeSec)
} else {
coordinatorProofsVerifiedFailedTimeTimer.Update(proofTime)
m.updateMetricRollerProofsVerifiedFailedTimeTimer(roller.PublicKey, proofTime)
log.Info("proof verified by coordinator failed", "proof id", msg.ID, "roller name", roller.Name,
"roller pk", roller.PublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", verifyErr)
m.updateMetricRollerProofsVerifiedFailedTimeTimer(tmpSessionInfo.RollerPublicKey, proofTime)
log.Info("proof verified by coordinator failed", "proof id", msg.ID, "roller name", tmpSessionInfo.RollerName,
"roller pk", tmpSessionInfo.RollerPublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", verifyErr)
}
return nil
}
// checkAttempts use the count of session info to check the attempts
func (m *Manager) checkAttemptsExceeded(hash string) bool {
sessionInfos, err := m.orm.GetSessionInfosByHashes([]string{hash})
if err != nil {
log.Error("get session info error", "hash id", hash, "error", err)
return true
}
if len(sessionInfos) >= int(m.cfg.SessionAttempts) {
return true
}
return false
}
// CollectProofs collects proofs corresponding to a proof generation session.
func (m *Manager) CollectProofs(sess *session) {
coordinatorSessionsActiveNumberGauge.Inc(1)
@@ -432,48 +448,47 @@ func (m *Manager) CollectProofs(sess *session) {
select {
//Execute after timeout, set in config.json. Consider all rollers failed.
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
// Check if session can be replayed
if sess.info.Attempts < m.cfg.SessionAttempts {
if !m.checkAttemptsExceeded(sess.taskID) {
var success bool
if sess.info.ProveType == message.AggregatorProve {
if message.ProveType(sess.sessionInfos[0].ProveType) == message.AggregatorProve {
success = m.StartAggProofGenerationSession(nil, sess)
} else if sess.info.ProveType == message.BasicProve {
} else if message.ProveType(sess.sessionInfos[0].ProveType) == message.BasicProve {
success = m.StartBasicProofGenerationSession(nil, sess)
}
if success {
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
for _, v := range sess.sessionInfos {
m.freeTaskIDForRoller(v.RollerPublicKey, v.TaskID)
}
m.mu.Unlock()
log.Info("Retrying session", "session id:", sess.info.ID)
log.Info("Retrying session", "session id:", sess.taskID)
return
}
}
// record failed session.
errMsg := "proof generation session ended without receiving any valid proofs"
m.addFailedSession(sess, errMsg)
log.Warn(errMsg, "session id", sess.info.ID)
log.Warn(errMsg, "session id", sess.taskID)
// Set status as skipped.
// Note that this is only a workaround for testnet here.
// TODO: In real cases we should reset to orm.ProvingTaskUnassigned
// so as to re-distribute the task in the future
if sess.info.ProveType == message.BasicProve {
if err := m.orm.UpdateProvingStatus(sess.info.ID, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset basic task_status as Unassigned", "id", sess.info.ID, "err", err)
if message.ProveType(sess.sessionInfos[0].ProveType) == message.BasicProve {
if err := m.orm.UpdateProvingStatus(sess.taskID, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset basic task_status as Unassigned", "id", sess.taskID, "err", err)
}
}
if sess.info.ProveType == message.AggregatorProve {
if err := m.orm.UpdateAggTaskStatus(sess.info.ID, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset aggregator task_status as Unassigned", "id", sess.info.ID, "err", err)
if message.ProveType(sess.sessionInfos[0].ProveType) == message.AggregatorProve {
if err := m.orm.UpdateAggTaskStatus(sess.taskID, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset aggregator task_status as Unassigned", "id", sess.taskID, "err", err)
}
}
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
for _, v := range sess.sessionInfos {
m.freeTaskIDForRoller(v.RollerPublicKey, v.TaskID)
}
delete(m.sessions, sess.info.ID)
delete(m.sessions, sess.taskID)
m.mu.Unlock()
coordinatorSessionsTimeoutTotalCounter.Inc(1)
return
@@ -481,7 +496,12 @@ func (m *Manager) CollectProofs(sess *session) {
//Execute after one of the roller finishes sending proof, return early if all rollers had sent results.
case ret := <-sess.finishChan:
m.mu.Lock()
sess.info.Rollers[ret.pk].Status = ret.status
for idx := range sess.sessionInfos {
if sess.sessionInfos[idx].RollerPublicKey == ret.pk {
sess.sessionInfos[idx].ProvingStatus = int16(ret.status)
}
}
if sess.isSessionFailed() {
if ret.typ == message.BasicProve {
if err := m.orm.UpdateProvingStatus(ret.id, types.ProvingTaskFailed); err != nil {
@@ -493,12 +513,13 @@ func (m *Manager) CollectProofs(sess *session) {
log.Error("failed to update aggregator proving_status as failed", "msg.ID", ret.id, "error", err)
}
}
coordinatorSessionsFailedTotalCounter.Inc(1)
}
if err := m.orm.SetSessionInfo(sess.info); err != nil {
if err := m.orm.UpdateSessionInfoProvingStatus(m.ctx, ret.typ, ret.id, ret.pk, ret.status); err != nil {
log.Error("db set session info fail", "pk", ret.pk, "error", err)
}
//Check if all rollers have finished their tasks, and rollers with valid results are indexed by public key.
finished, validRollers := sess.isRollersFinished()
@@ -508,11 +529,10 @@ func (m *Manager) CollectProofs(sess *session) {
randIndex := rand.Int63n(int64(len(validRollers)))
_ = validRollers[randIndex]
// TODO: reward winner
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
for _, sessionInfo := range sess.sessionInfos {
m.freeTaskIDForRoller(sessionInfo.RollerPublicKey, sessionInfo.TaskID)
delete(m.sessions, sessionInfo.TaskID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
coordinatorSessionsSuccessTotalCounter.Inc(1)
@@ -528,14 +548,16 @@ func (m *Manager) CollectProofs(sess *session) {
// validRollers also records the public keys of rollers who have finished their tasks correctly as index.
func (s *session) isRollersFinished() (bool, []string) {
var validRollers []string
for pk, roller := range s.info.Rollers {
if roller.Status == types.RollerProofValid {
validRollers = append(validRollers, pk)
for _, sessionInfo := range s.sessionInfos {
if types.RollerProveStatus(sessionInfo.ProvingStatus) == types.RollerProofValid {
validRollers = append(validRollers, sessionInfo.RollerPublicKey)
continue
}
if roller.Status == types.RollerProofInvalid {
if types.RollerProveStatus(sessionInfo.ProvingStatus) == types.RollerProofInvalid {
continue
}
// Some rollers are still proving.
return false, nil
}
@@ -543,8 +565,8 @@ func (s *session) isRollersFinished() (bool, []string) {
}
func (s *session) isSessionFailed() bool {
for _, roller := range s.info.Rollers {
if roller.Status != types.RollerProofInvalid {
for _, sessionInfo := range s.sessionInfos {
if types.RollerProveStatus(sessionInfo.ProvingStatus) != types.RollerProofInvalid {
return false
}
}
@@ -573,7 +595,7 @@ func (m *Manager) StartBasicProofGenerationSession(task *types.BlockBatch, prevS
if task != nil {
taskID = task.Hash
} else {
taskID = prevSession.info.ID
taskID = prevSession.taskID
}
if m.GetNumberOfIdleRollers(message.BasicProve) == 0 {
log.Warn("no idle basic roller when starting proof generation session", "id", taskID)
@@ -612,7 +634,7 @@ func (m *Manager) StartBasicProofGenerationSession(task *types.BlockBatch, prevS
}
// Dispatch task to basic rollers.
rollers := make(map[string]*types.RollerStatus)
var sessionInfos []*types.SessionInfo
for i := 0; i < int(m.cfg.RollersPerSession); i++ {
roller := m.selectRoller(message.BasicProve)
if roller == nil {
@@ -626,10 +648,27 @@ func (m *Manager) StartBasicProofGenerationSession(task *types.BlockBatch, prevS
continue
}
m.updateMetricRollerProofsLastAssignedTimestampGauge(roller.PublicKey)
rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned}
now := time.Now()
tmpSessionInfo := types.SessionInfo{
TaskID: taskID,
RollerPublicKey: roller.PublicKey,
ProveType: int16(message.BasicProve),
RollerName: roller.Name,
CreatedAt: &now,
ProvingStatus: int16(types.RollerAssigned),
}
// Store session info.
if err = m.orm.SetSessionInfo(&tmpSessionInfo); err != nil {
log.Error("db set session info fail", "session id", taskID, "error", err)
return false
}
sessionInfos = append(sessionInfos, &tmpSessionInfo)
log.Info("assigned proof to roller", "session id", taskID, "session type", message.BasicProve, "roller name", roller.Name,
"roller pk", roller.PublicKey, "proof status", tmpSessionInfo.ProvingStatus)
}
// No roller assigned.
if len(rollers) == 0 {
if len(sessionInfos) == 0 {
log.Error("no roller assigned", "id", taskID, "number of idle basic rollers", m.GetNumberOfIdleRollers(message.BasicProve))
return false
}
@@ -642,33 +681,9 @@ func (m *Manager) StartBasicProofGenerationSession(task *types.BlockBatch, prevS
// Create a proof generation session.
sess := &session{
info: &types.SessionInfo{
ID: taskID,
Rollers: rollers,
ProveType: message.BasicProve,
StartTimestamp: time.Now().Unix(),
Attempts: 1,
},
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
if prevSession != nil {
sess.info.Attempts += prevSession.info.Attempts
}
for _, roller := range sess.info.Rollers {
log.Info(
"assigned proof to roller",
"session id", sess.info.ID,
"session type", sess.info.ProveType,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"proof status", roller.Status)
}
// Store session info.
if err = m.orm.SetSessionInfo(sess.info); err != nil {
log.Error("db set session info fail", "session id", sess.info.ID, "error", err)
return false
taskID: taskID,
sessionInfos: sessionInfos,
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
m.mu.Lock()
@@ -685,7 +700,7 @@ func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSessio
if task != nil {
taskID = task.ID
} else {
taskID = prevSession.info.ID
taskID = prevSession.taskID
}
if m.GetNumberOfIdleRollers(message.AggregatorProve) == 0 {
log.Warn("no idle common roller when starting proof generation session", "id", taskID)
@@ -715,7 +730,7 @@ func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSessio
}
// Dispatch task to basic rollers.
rollers := make(map[string]*types.RollerStatus)
var sessionInfos []*types.SessionInfo
for i := 0; i < int(m.cfg.RollersPerSession); i++ {
roller := m.selectRoller(message.AggregatorProve)
if roller == nil {
@@ -732,11 +747,29 @@ func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSessio
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", taskID)
continue
}
now := time.Now()
tmpSessionInfo := types.SessionInfo{
TaskID: taskID,
RollerPublicKey: roller.PublicKey,
ProveType: int16(message.AggregatorProve),
RollerName: roller.Name,
CreatedAt: &now,
ProvingStatus: int16(types.RollerAssigned),
}
// Store session info.
if err = m.orm.SetSessionInfo(&tmpSessionInfo); err != nil {
log.Error("db set session info fail", "session id", taskID, "error", err)
return false
}
m.updateMetricRollerProofsLastAssignedTimestampGauge(roller.PublicKey)
rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned}
sessionInfos = append(sessionInfos, &tmpSessionInfo)
log.Info("assigned proof to roller", "session id", taskID, "session type", message.AggregatorProve, "roller name", roller.Name,
"roller pk", roller.PublicKey, "proof status", tmpSessionInfo.ProvingStatus)
}
// No roller assigned.
if len(rollers) == 0 {
if len(sessionInfos) == 0 {
log.Error("no roller assigned", "id", taskID, "number of idle aggregator rollers", m.GetNumberOfIdleRollers(message.AggregatorProve))
return false
}
@@ -749,33 +782,9 @@ func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSessio
// Create a proof generation session.
sess := &session{
info: &types.SessionInfo{
ID: taskID,
Rollers: rollers,
ProveType: message.AggregatorProve,
StartTimestamp: time.Now().Unix(),
Attempts: 1,
},
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
if prevSession != nil {
sess.info.Attempts += prevSession.info.Attempts
}
for _, roller := range sess.info.Rollers {
log.Info(
"assigned proof to roller",
"session id", sess.info.ID,
"session type", sess.info.ProveType,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"proof status", roller.Status)
}
// Store session info.
if err = m.orm.SetSessionInfo(sess.info); err != nil {
log.Error("db set session info fail", "session id", sess.info.ID, "error", err)
return false
taskID: taskID,
sessionInfos: sessionInfos,
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
m.mu.Lock()
@@ -789,7 +798,7 @@ func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSessio
func (m *Manager) addFailedSession(sess *session, errMsg string) {
m.mu.Lock()
defer m.mu.Unlock()
m.failedSessionInfos[sess.info.ID] = newSessionInfo(sess, types.ProvingTaskFailed, errMsg, true)
m.failedSessionInfos[sess.taskID] = newSessionInfo(sess, types.ProvingTaskFailed, errMsg, true)
}
// VerifyToken verifies pukey for token and expiration time

View File

@@ -53,8 +53,8 @@ func (m *Manager) reloadRollerAssignedTasks(pubkey string) *cmap.ConcurrentMap {
defer m.mu.RUnlock()
taskIDs := cmap.New()
for id, sess := range m.sessions {
for pk, roller := range sess.info.Rollers {
if pk == pubkey && roller.Status == types.RollerAssigned {
for _, sessionInfo := range sess.sessionInfos {
if sessionInfo.RollerPublicKey == pubkey && sessionInfo.ProvingStatus == int16(types.RollerAssigned) {
taskIDs.Set(id, struct{}{})
}
}

View File

@@ -24,7 +24,8 @@ func init() {
app.Flags = append(app.Flags, utils.CommonFlags...)
app.Before = func(ctx *cli.Context) error {
return utils.LogSetup(ctx)
_, err := utils.LogSetup(ctx)
return err
}
app.Commands = []*cli.Command{

View File

@@ -3,12 +3,21 @@
create table session_info
(
hash VARCHAR NOT NULL,
rollers_info BYTEA NOT NULL
);
id BIGSERIAL PRIMARY KEY,
task_id VARCHAR NOT NULL,
roller_public_key VARCHAR NOT NULL,
prove_type SMALLINT DEFAULT 0,
roller_name VARCHAR NOT NULL,
proving_status SMALLINT DEFAULT 1,
failure_type SMALLINT DEFAULT 0,
reward BIGINT DEFAULT 0,
proof BYTEA DEFAULT NULL,
created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP(0) DEFAULT NULL,
create unique index session_info_hash_uindex
on session_info (hash);
CONSTRAINT uk_session_unique UNIQUE (task_id, roller_public_key)
);
-- +goose StatementEnd

View File

@@ -44,6 +44,7 @@ type BlockTraceOrm interface {
type SessionInfoOrm interface {
GetSessionInfosByHashes(hashes []string) ([]*types.SessionInfo, error)
SetSessionInfo(rollersInfo *types.SessionInfo) error
UpdateSessionInfoProvingStatus(ctx context.Context, proveType message.ProveType, taskID string, pk string, status types.RollerProveStatus) error
}
// AggTaskOrm is aggregator task

View File

@@ -1,11 +1,12 @@
package orm
import (
"encoding/json"
"context"
"github.com/jmoiron/sqlx"
"scroll-tech/common/types"
"scroll-tech/common/types/message"
)
type sessionInfoOrm struct {
@@ -23,7 +24,7 @@ func (o *sessionInfoOrm) GetSessionInfosByHashes(hashes []string) ([]*types.Sess
if len(hashes) == 0 {
return nil, nil
}
query, args, err := sqlx.In("SELECT rollers_info FROM session_info WHERE hash IN (?);", hashes)
query, args, err := sqlx.In("SELECT * FROM session_info WHERE task_id IN (?);", hashes)
if err != nil {
return nil, err
}
@@ -35,15 +36,11 @@ func (o *sessionInfoOrm) GetSessionInfosByHashes(hashes []string) ([]*types.Sess
var sessionInfos []*types.SessionInfo
for rows.Next() {
var infoBytes []byte
if err = rows.Scan(&infoBytes); err != nil {
var sessionInfo types.SessionInfo
if err = rows.StructScan(&sessionInfo); err != nil {
return nil, err
}
sessionInfo := &types.SessionInfo{}
if err = json.Unmarshal(infoBytes, sessionInfo); err != nil {
return nil, err
}
sessionInfos = append(sessionInfos, sessionInfo)
sessionInfos = append(sessionInfos, &sessionInfo)
}
if err = rows.Err(); err != nil {
return nil, err
@@ -53,11 +50,16 @@ func (o *sessionInfoOrm) GetSessionInfosByHashes(hashes []string) ([]*types.Sess
}
func (o *sessionInfoOrm) SetSessionInfo(rollersInfo *types.SessionInfo) error {
infoBytes, err := json.Marshal(rollersInfo)
if err != nil {
return err
}
sqlStr := "INSERT INTO session_info (hash, rollers_info) VALUES ($1, $2) ON CONFLICT (hash) DO UPDATE SET rollers_info = EXCLUDED.rollers_info;"
_, err = o.db.Exec(sqlStr, rollersInfo.ID, infoBytes)
sqlStr := "INSERT INTO session_info (task_id, roller_public_key, prove_type, roller_name, proving_status, failure_type, reward, proof, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (task_id, roller_public_key) DO UPDATE SET proving_status = EXCLUDED.proving_status;"
_, err := o.db.Exec(sqlStr, rollersInfo.TaskID, rollersInfo.RollerPublicKey, rollersInfo.ProveType, rollersInfo.RollerName,
rollersInfo.ProvingStatus, rollersInfo.FailureType, rollersInfo.Reward, rollersInfo.Proof, rollersInfo.CreatedAt)
return err
}
// UpdateSessionInfoProvingStatus update the session info proving status
func (o *sessionInfoOrm) UpdateSessionInfoProvingStatus(ctx context.Context, proveType message.ProveType, taskID string, pk string, status types.RollerProveStatus) error {
if _, err := o.db.ExecContext(ctx, o.db.Rebind("update session_info set proving_status = ? where prove_type = ? and task_id = ? and roller_public_key = ? ;"), int(proveType), int(status), taskID, pk); err != nil {
return err
}
return nil
}

View File

@@ -410,31 +410,29 @@ func testOrmSessionInfo(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 0, len(sessionInfos))
now := time.Now()
sessionInfo := types.SessionInfo{
ID: batchHash,
Rollers: map[string]*types.RollerStatus{
"0": {
PublicKey: "0",
Name: "roller-0",
Status: types.RollerAssigned,
},
},
StartTimestamp: time.Now().Unix()}
TaskID: batchHash,
RollerName: "roller-0",
RollerPublicKey: "0",
ProvingStatus: int16(types.RollerAssigned),
CreatedAt: &now,
}
// insert
assert.NoError(t, ormSession.SetSessionInfo(&sessionInfo))
sessionInfos, err = ormSession.GetSessionInfosByHashes(hashes)
assert.NoError(t, err)
assert.Equal(t, 1, len(sessionInfos))
assert.Equal(t, sessionInfo, *sessionInfos[0])
assert.Equal(t, sessionInfo.RollerName, sessionInfos[0].RollerName)
// update
sessionInfo.Rollers["0"].Status = types.RollerProofValid
sessionInfo.ProvingStatus = int16(types.RollerProofValid)
assert.NoError(t, ormSession.SetSessionInfo(&sessionInfo))
sessionInfos, err = ormSession.GetSessionInfosByHashes(hashes)
assert.NoError(t, err)
assert.Equal(t, 1, len(sessionInfos))
assert.Equal(t, sessionInfo, *sessionInfos[0])
assert.Equal(t, sessionInfo.ProvingStatus, sessionInfos[0].ProvingStatus)
// delete
assert.NoError(t, ormBatch.UpdateProvingStatus(batchHash, types.ProvingTaskVerified))

2
l2geth

Submodule l2geth updated: 983d630244...127af384ed

View File

@@ -26,7 +26,8 @@ func init() {
app.Version = version.Version
app.Flags = append(app.Flags, utils.CommonFlags...)
app.Before = func(ctx *cli.Context) error {
return utils.LogSetup(ctx)
_, err := utils.LogSetup(ctx)
return err
}
// Register `roller-test` app for integration-test.