mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-11 15:08:09 -05:00
Compare commits
2 Commits
fix_insert
...
feat/gorm_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
999f3ceaaf | ||
|
|
a5ab53161a |
@@ -75,7 +75,7 @@ func (c *CrossMsgFetcher) Start() {
|
||||
return
|
||||
case <-tick.C:
|
||||
c.mu.Lock()
|
||||
c.forwardFetchAndSaveMissingEvents(0)
|
||||
c.forwardFetchAndSaveMissingEvents(1)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
backendabi "bridge-history-api/abi"
|
||||
"bridge-history-api/db"
|
||||
"bridge-history-api/db/orm"
|
||||
"bridge-history-api/utils"
|
||||
)
|
||||
|
||||
@@ -104,6 +105,14 @@ func L1FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, databas
|
||||
log.Error("l1FetchAndSaveEvents: Failed to parse cross msg event logs", "err", err)
|
||||
return err
|
||||
}
|
||||
for i := range depositL1CrossMsgs {
|
||||
for _, msgHash := range msgHashes {
|
||||
if depositL1CrossMsgs[i].Layer1Hash == msgHash.TxHash.Hex() {
|
||||
depositL1CrossMsgs[i].MsgHash = msgHash.MsgHash.Hex()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
dbTx, err := database.Beginx()
|
||||
if err != nil {
|
||||
log.Error("l2FetchAndSaveEvents: Failed to begin db transaction", "err", err)
|
||||
@@ -120,11 +129,6 @@ func L1FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, databas
|
||||
dbTx.Rollback()
|
||||
log.Crit("l1FetchAndSaveEvents: Failed to insert relayed message event logs", "err", err)
|
||||
}
|
||||
err = updateL1CrossMsgMsgHash(ctx, dbTx, database, msgHashes)
|
||||
if err != nil {
|
||||
dbTx.Rollback()
|
||||
log.Crit("l1FetchAndSaveEvents: Failed to update msgHash in L1 cross msg", "err", err)
|
||||
}
|
||||
err = dbTx.Commit()
|
||||
if err != nil {
|
||||
// if we can not insert into DB, there must something wrong, need a on-call member handle the dababase manually
|
||||
@@ -157,11 +161,22 @@ func L2FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, databas
|
||||
log.Warn("Failed to get l2 event logs", "err", err)
|
||||
return err
|
||||
}
|
||||
depositL2CrossMsgs, msgHashes, relayedMsg, l2sentMsgs, err := utils.ParseBackendL2EventLogs(logs)
|
||||
depositL2CrossMsgs, relayedMsg, L2SentMsgWrappers, err := utils.ParseBackendL2EventLogs(logs)
|
||||
if err != nil {
|
||||
log.Error("l2FetchAndSaveEvents: Failed to parse cross msg event logs", "err", err)
|
||||
return err
|
||||
}
|
||||
var l2SentMsgs []*orm.L2SentMsg
|
||||
for i := range depositL2CrossMsgs {
|
||||
for _, l2SentMsgWrapper := range L2SentMsgWrappers {
|
||||
if depositL2CrossMsgs[i].Layer2Hash == l2SentMsgWrapper.TxHash.Hex() {
|
||||
depositL2CrossMsgs[i].MsgHash = l2SentMsgWrapper.L2SentMsg.MsgHash
|
||||
l2SentMsgWrapper.L2SentMsg.TxSender = depositL2CrossMsgs[i].Sender
|
||||
l2SentMsgs = append(l2SentMsgs, l2SentMsgWrapper.L2SentMsg)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
dbTx, err := database.Beginx()
|
||||
if err != nil {
|
||||
log.Error("l2FetchAndSaveEvents: Failed to begin db transaction", "err", err)
|
||||
@@ -179,16 +194,12 @@ func L2FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, databas
|
||||
log.Crit("l2FetchAndSaveEvents: Failed to insert relayed message event logs", "err", err)
|
||||
}
|
||||
|
||||
err = updateL2CrossMsgMsgHash(ctx, dbTx, database, msgHashes)
|
||||
if err != nil {
|
||||
dbTx.Rollback()
|
||||
log.Crit("l2FetchAndSaveEvents: Failed to update msgHash in L2 cross msg", "err", err)
|
||||
}
|
||||
|
||||
err = database.BatchInsertL2SentMsgDBTx(dbTx, l2sentMsgs)
|
||||
if err != nil {
|
||||
dbTx.Rollback()
|
||||
log.Crit("l2FetchAndSaveEvents: Failed to insert l2 sent message", "err", err)
|
||||
if len(l2SentMsgs) > 0 {
|
||||
err = database.BatchInsertL2SentMsgDBTx(dbTx, l2SentMsgs)
|
||||
if err != nil {
|
||||
dbTx.Rollback()
|
||||
log.Crit("l2FetchAndSaveEvents: Failed to insert l2 sent message", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
err = dbTx.Commit()
|
||||
|
||||
@@ -12,30 +12,32 @@ create table cross_message
|
||||
layer2_hash VARCHAR NOT NULL DEFAULT '',
|
||||
layer1_token VARCHAR NOT NULL DEFAULT '',
|
||||
layer2_token VARCHAR NOT NULL DEFAULT '',
|
||||
token_id BIGINT NOT NULL DEFAULT 0,
|
||||
asset SMALLINT NOT NULL,
|
||||
msg_type SMALLINT NOT NULL,
|
||||
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
-- use array to support nft bridge
|
||||
token_ids VARCHAR[] NOT NULL DEFAULT '{}',
|
||||
-- use array to support nft bridge
|
||||
token_amounts VARCHAR[] NOT NULL DEFAULT '{}',
|
||||
block_timestamp TIMESTAMP(0) DEFAULT NULL,
|
||||
created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
deleted_at TIMESTAMP(0) DEFAULT NULL
|
||||
);
|
||||
|
||||
create unique index uk_msg_hash_msg_type
|
||||
on cross_message (msg_hash, msg_type) where deleted_at IS NULL;
|
||||
|
||||
comment
|
||||
on column cross_message.asset is 'ETH, ERC20, ERC721, ERC1155';
|
||||
|
||||
comment
|
||||
on column cross_message.msg_type is 'unknown, l1msg, l2msg';
|
||||
|
||||
comment
|
||||
on column cross_message.is_deleted is 'NotDeleted false, Deleted true';
|
||||
CREATE INDEX idx_l1_msg_index ON cross_message (layer1_hash, deleted_at);
|
||||
|
||||
CREATE INDEX valid_l1_msg_index ON cross_message (layer1_hash, is_deleted);
|
||||
CREATE INDEX idx_l2_msg_index ON cross_message (layer2_hash, deleted_at);
|
||||
|
||||
CREATE INDEX valid_l2_msg_index ON cross_message (layer2_hash, is_deleted);
|
||||
|
||||
CREATE INDEX valid_height_index ON cross_message (height, msg_type, is_deleted);
|
||||
CREATE INDEX idx_height_msg_type_index ON cross_message (height, msg_type, deleted_at);
|
||||
|
||||
CREATE OR REPLACE FUNCTION update_timestamp()
|
||||
RETURNS TRIGGER AS $$
|
||||
@@ -49,22 +51,6 @@ CREATE TRIGGER update_timestamp BEFORE UPDATE
|
||||
ON cross_message FOR EACH ROW EXECUTE PROCEDURE
|
||||
update_timestamp();
|
||||
|
||||
CREATE OR REPLACE FUNCTION deleted_at_trigger()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
IF NEW.is_deleted AND OLD.is_deleted != NEW.is_deleted THEN
|
||||
UPDATE cross_message SET deleted_at = NOW() WHERE id = NEW.id;
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER deleted_at_trigger
|
||||
AFTER UPDATE ON cross_message
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION deleted_at_trigger();
|
||||
|
||||
|
||||
-- +goose StatementEnd
|
||||
|
||||
-- +goose Down
|
||||
|
||||
@@ -7,17 +7,17 @@ create table relayed_msg
|
||||
height BIGINT NOT NULL,
|
||||
layer1_hash VARCHAR NOT NULL DEFAULT '',
|
||||
layer2_hash VARCHAR NOT NULL DEFAULT '',
|
||||
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
deleted_at TIMESTAMP(0) DEFAULT NULL
|
||||
);
|
||||
|
||||
comment
|
||||
on column relayed_msg.is_deleted is 'NotDeleted, Deleted';
|
||||
create unique index uk_msg_hash
|
||||
on relayed_msg (msg_hash) where deleted_at IS NULL;
|
||||
|
||||
create unique index relayed_msg_hash_uindex
|
||||
on relayed_msg (msg_hash);
|
||||
CREATE INDEX idx_l1_msg_index ON relayed_msg (layer1_hash, deleted_at);
|
||||
|
||||
CREATE INDEX idx_l2_msg_index ON relayed_msg (layer2_hash, deleted_at);
|
||||
|
||||
CREATE OR REPLACE FUNCTION update_timestamp()
|
||||
RETURNS TRIGGER AS $$
|
||||
@@ -31,22 +31,6 @@ CREATE TRIGGER update_timestamp BEFORE UPDATE
|
||||
ON relayed_msg FOR EACH ROW EXECUTE PROCEDURE
|
||||
update_timestamp();
|
||||
|
||||
CREATE OR REPLACE FUNCTION deleted_at_trigger()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
IF NEW.is_deleted AND OLD.is_deleted != NEW.is_deleted THEN
|
||||
UPDATE relayed_msg SET deleted_at = NOW() WHERE id = NEW.id;
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER deleted_at_trigger
|
||||
AFTER UPDATE ON relayed_msg
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION deleted_at_trigger();
|
||||
|
||||
|
||||
-- +goose StatementEnd
|
||||
|
||||
-- +goose Down
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
create table l2_sent_msg
|
||||
(
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
tx_sender VARCHAR NOT NULL,
|
||||
sender VARCHAR NOT NULL,
|
||||
target VARCHAR NOT NULL,
|
||||
value VARCHAR NOT NULL,
|
||||
@@ -12,14 +13,16 @@ create table l2_sent_msg
|
||||
batch_index BIGINT NOT NULL DEFAULT 0,
|
||||
msg_proof TEXT NOT NULL DEFAULT '',
|
||||
msg_data TEXT NOT NULL DEFAULT '',
|
||||
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
deleted_at TIMESTAMP(0) DEFAULT NULL
|
||||
);
|
||||
|
||||
comment
|
||||
on column l2_sent_msg.is_deleted is 'NotDeleted, Deleted';
|
||||
create unique index uk_msg_hash
|
||||
on l2_sent_msg (msg_hash) where deleted_at IS NULL;
|
||||
|
||||
create unique index uk_nonce
|
||||
on l2_sent_msg (nonce) where deleted_at IS NULL;
|
||||
|
||||
CREATE OR REPLACE FUNCTION update_timestamp()
|
||||
RETURNS TRIGGER AS $$
|
||||
@@ -33,22 +36,6 @@ CREATE TRIGGER update_timestamp BEFORE UPDATE
|
||||
ON l2_sent_msg FOR EACH ROW EXECUTE PROCEDURE
|
||||
update_timestamp();
|
||||
|
||||
CREATE OR REPLACE FUNCTION deleted_at_trigger()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
IF NEW.is_deleted AND OLD.is_deleted != NEW.is_deleted THEN
|
||||
UPDATE l2_sent_msg SET deleted_at = NOW() WHERE id = NEW.id;
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER deleted_at_trigger
|
||||
AFTER UPDATE ON l2_sent_msg
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION deleted_at_trigger();
|
||||
|
||||
|
||||
-- +goose StatementEnd
|
||||
|
||||
-- +goose Down
|
||||
|
||||
@@ -8,12 +8,17 @@ create table rollup_batch
|
||||
start_block_number BIGINT NOT NULL,
|
||||
end_block_number BIGINT NOT NULL,
|
||||
batch_hash VARCHAR NOT NULL,
|
||||
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
deleted_at TIMESTAMP(0) DEFAULT NULL
|
||||
);
|
||||
|
||||
create unique index uk_batch_index
|
||||
on rollup_batch (batch_index) where deleted_at IS NULL;
|
||||
|
||||
create unique index uk_batch_hash
|
||||
on rollup_batch (batch_hash) where deleted_at IS NULL;
|
||||
|
||||
CREATE OR REPLACE FUNCTION update_timestamp()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
@@ -26,21 +31,6 @@ CREATE TRIGGER update_timestamp BEFORE UPDATE
|
||||
ON rollup_batch FOR EACH ROW EXECUTE PROCEDURE
|
||||
update_timestamp();
|
||||
|
||||
CREATE OR REPLACE FUNCTION deleted_at_trigger()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
IF NEW.is_deleted AND OLD.is_deleted != NEW.is_deleted THEN
|
||||
UPDATE rollup_batch SET deleted_at = NOW() WHERE id = NEW.id;
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER deleted_at_trigger
|
||||
AFTER UPDATE ON rollup_batch
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION deleted_at_trigger();
|
||||
|
||||
-- +goose StatementEnd
|
||||
|
||||
-- +goose Down
|
||||
|
||||
@@ -2,7 +2,6 @@ package orm
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/jmoiron/sqlx"
|
||||
@@ -40,14 +39,6 @@ func (b *rollupBatchOrm) BatchInsertRollupBatchDBTx(dbTx *sqlx.Tx, batches []*Ro
|
||||
"start_block_number": batch.StartBlockNumber,
|
||||
"end_block_number": batch.EndBlockNumber,
|
||||
}
|
||||
var exists bool
|
||||
err = dbTx.QueryRow(`SELECT EXISTS(SELECT 1 FROM rollup_batch WHERE batch_index = $1 AND NOT is_deleted)`, batch.BatchIndex).Scan(&exists)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if exists {
|
||||
return fmt.Errorf("BatchInsertRollupBatchDBTx: batch index %v already exists at height %v", batch.BatchIndex, batch.CommitHeight)
|
||||
}
|
||||
}
|
||||
_, err = dbTx.NamedExec(`insert into rollup_batch(commit_height, batch_index, batch_hash, start_block_number, end_block_number) values(:commit_height, :batch_index, :batch_hash, :start_block_number, :end_block_number);`, batchMaps)
|
||||
if err != nil {
|
||||
|
||||
@@ -40,31 +40,24 @@ const (
|
||||
|
||||
// CrossMsg represents a cross message from layer 1 to layer 2
|
||||
type CrossMsg struct {
|
||||
ID uint64 `json:"id" db:"id"`
|
||||
MsgHash string `json:"msg_hash" db:"msg_hash"`
|
||||
Height uint64 `json:"height" db:"height"`
|
||||
Sender string `json:"sender" db:"sender"`
|
||||
Target string `json:"target" db:"target"`
|
||||
Amount string `json:"amount" db:"amount"`
|
||||
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
|
||||
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
|
||||
Layer1Token string `json:"layer1_token" db:"layer1_token"`
|
||||
Layer2Token string `json:"layer2_token" db:"layer2_token"`
|
||||
TokenID uint64 `json:"token_id" db:"token_id"`
|
||||
Asset int `json:"asset" db:"asset"`
|
||||
MsgType int `json:"msg_type" db:"msg_type"`
|
||||
IsDeleted bool `json:"is_deleted" db:"is_deleted"`
|
||||
Timestamp *time.Time `json:"timestamp" db:"block_timestamp"`
|
||||
CreatedAt *time.Time `json:"created_at" db:"created_at"`
|
||||
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
|
||||
DeletedAt *time.Time `json:"deleted_at" db:"deleted_at"`
|
||||
}
|
||||
|
||||
type RelayedMsg struct {
|
||||
MsgHash string `json:"msg_hash" db:"msg_hash"`
|
||||
Height uint64 `json:"height" db:"height"`
|
||||
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
|
||||
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
|
||||
ID uint64 `json:"id" db:"id"`
|
||||
MsgHash string `json:"msg_hash" db:"msg_hash"`
|
||||
Height uint64 `json:"height" db:"height"`
|
||||
Sender string `json:"sender" db:"sender"`
|
||||
Target string `json:"target" db:"target"`
|
||||
Amount string `json:"amount" db:"amount"`
|
||||
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
|
||||
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
|
||||
Layer1Token string `json:"layer1_token" db:"layer1_token"`
|
||||
Layer2Token string `json:"layer2_token" db:"layer2_token"`
|
||||
TokenIDs []string `json:"token_ids" db:"token_ids"`
|
||||
TokenAmounts []string `json:"token_amounts" db:"token_amounts"`
|
||||
Asset int `json:"asset" db:"asset"`
|
||||
MsgType int `json:"msg_type" db:"msg_type"`
|
||||
Timestamp *time.Time `json:"timestamp" db:"block_timestamp"`
|
||||
CreatedAt *time.Time `json:"created_at" db:"created_at"`
|
||||
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
|
||||
DeletedAt *time.Time `json:"deleted_at" db:"deleted_at"`
|
||||
}
|
||||
|
||||
// L1CrossMsgOrm provides operations on l1_cross_message table
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
@@ -23,7 +22,7 @@ func NewL1CrossMsgOrm(db *sqlx.DB) L1CrossMsgOrm {
|
||||
|
||||
func (l *l1CrossMsgOrm) GetL1CrossMsgByHash(l1Hash common.Hash) (*CrossMsg, error) {
|
||||
result := &CrossMsg{}
|
||||
row := l.db.QueryRowx(`SELECT * FROM cross_message WHERE layer1_hash = $1 AND msg_type = $2 AND NOT is_deleted;`, l1Hash.String(), Layer1Msg)
|
||||
row := l.db.QueryRowx(`SELECT * FROM cross_message WHERE layer1_hash = $1 AND msg_type = $2 AND deleted_at IS NULL;`, l1Hash.String(), Layer1Msg)
|
||||
if err := row.StructScan(result); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, nil
|
||||
@@ -37,7 +36,7 @@ func (l *l1CrossMsgOrm) GetL1CrossMsgByHash(l1Hash common.Hash) (*CrossMsg, erro
|
||||
// Warning: return empty slice if no data found
|
||||
func (l *l1CrossMsgOrm) GetL1CrossMsgsByAddress(sender common.Address) ([]*CrossMsg, error) {
|
||||
var results []*CrossMsg
|
||||
rows, err := l.db.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND msg_type = 1 AND NOT is_deleted;`, sender.String(), Layer1Msg)
|
||||
rows, err := l.db.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND msg_type = 1 AND deleted_at IS NULL;`, sender.String(), Layer1Msg)
|
||||
|
||||
for rows.Next() {
|
||||
msg := &CrossMsg{}
|
||||
@@ -69,19 +68,11 @@ func (l *l1CrossMsgOrm) BatchInsertL1CrossMsgDBTx(dbTx *sqlx.Tx, messages []*Cro
|
||||
"layer1_hash": msg.Layer1Hash,
|
||||
"layer1_token": msg.Layer1Token,
|
||||
"layer2_token": msg.Layer2Token,
|
||||
"token_id": msg.TokenID,
|
||||
"token_ids": msg.TokenIDs,
|
||||
"msg_type": Layer1Msg,
|
||||
}
|
||||
var exists bool
|
||||
err = dbTx.QueryRow(`SELECT EXISTS(SELECT 1 FROM cross_message WHERE layer1_hash = $1 AND NOT is_deleted)`, msg.Layer1Hash).Scan(&exists)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if exists {
|
||||
return fmt.Errorf("BatchInsertL1CrossMsgDBTx: l1 cross msg layer1Hash %v already exists at height %v", msg.Layer1Hash, msg.Height)
|
||||
}
|
||||
}
|
||||
_, err = dbTx.NamedExec(`insert into cross_message(height, sender, target, asset, layer1_hash, layer1_token, layer2_token, token_id, amount, msg_type) values(:height, :sender, :target, :asset, :layer1_hash, :layer1_token, :layer2_token, :token_id, :amount, :msg_type);`, messageMaps)
|
||||
_, err = dbTx.NamedExec(`insert into cross_message(height, sender, target, asset, layer1_hash, layer1_token, layer2_token, token_ids, amount, msg_type) values(:height, :sender, :target, :asset, :layer1_hash, :layer1_token, :layer2_token, :token_ids, :amount, :msg_type);`, messageMaps)
|
||||
if err != nil {
|
||||
log.Error("BatchInsertL1CrossMsgDBTx: failed to insert l1 cross msgs", "err", err)
|
||||
return err
|
||||
@@ -92,7 +83,7 @@ func (l *l1CrossMsgOrm) BatchInsertL1CrossMsgDBTx(dbTx *sqlx.Tx, messages []*Cro
|
||||
|
||||
// UpdateL1CrossMsgHashDBTx update l1 cross msg hash in db, no need to check msg_type since layer1_hash wont be empty if its layer1 msg
|
||||
func (l *l1CrossMsgOrm) UpdateL1CrossMsgHashDBTx(ctx context.Context, dbTx *sqlx.Tx, l1Hash, msgHash common.Hash) error {
|
||||
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update public.cross_message set msg_hash = ? where layer1_hash = ? AND NOT is_deleted;"), msgHash.String(), l1Hash.String()); err != nil {
|
||||
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update public.cross_message set msg_hash = ? where layer1_hash = ? AND deleted_at IS NULL;"), msgHash.String(), l1Hash.String()); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -100,7 +91,7 @@ func (l *l1CrossMsgOrm) UpdateL1CrossMsgHashDBTx(ctx context.Context, dbTx *sqlx
|
||||
}
|
||||
|
||||
func (l *l1CrossMsgOrm) UpdateL1CrossMsgHash(ctx context.Context, l1Hash, msgHash common.Hash) error {
|
||||
if _, err := l.db.ExecContext(ctx, l.db.Rebind("update public.l1_cross_message set msg_hash = ? where layer1_hash = ? AND NOT is_deleted;"), msgHash.String(), l1Hash.String()); err != nil {
|
||||
if _, err := l.db.ExecContext(ctx, l.db.Rebind("update public.l1_cross_message set msg_hash = ? where layer1_hash = ? AND deleted_at IS NULL;"), msgHash.String(), l1Hash.String()); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -108,7 +99,7 @@ func (l *l1CrossMsgOrm) UpdateL1CrossMsgHash(ctx context.Context, l1Hash, msgHas
|
||||
}
|
||||
|
||||
func (l *l1CrossMsgOrm) GetLatestL1ProcessedHeight() (int64, error) {
|
||||
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE msg_type = $1 AND NOT is_deleted ORDER BY id DESC LIMIT 1;`, Layer1Msg)
|
||||
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE msg_type = $1 AND deleted_at IS NULL ORDER BY id DESC LIMIT 1;`, Layer1Msg)
|
||||
var result sql.NullInt64
|
||||
if err := row.Scan(&result); err != nil {
|
||||
if err == sql.ErrNoRows || !result.Valid {
|
||||
@@ -123,21 +114,21 @@ func (l *l1CrossMsgOrm) GetLatestL1ProcessedHeight() (int64, error) {
|
||||
}
|
||||
|
||||
func (l *l1CrossMsgOrm) DeleteL1CrossMsgAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error {
|
||||
if _, err := l.db.Exec(`UPDATE cross_message SET is_deleted = true WHERE height > $1 AND msg_type = $2;`, height, Layer1Msg); err != nil {
|
||||
if _, err := l.db.Exec(`UPDATE cross_message SET deleted_at = current_timestamp WHERE height > $1 AND msg_type = $2;`, height, Layer1Msg); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *l1CrossMsgOrm) UpdateL1BlockTimestamp(height uint64, timestamp time.Time) error {
|
||||
if _, err := l.db.Exec(`UPDATE cross_message SET block_timestamp = $1 where height = $2 AND msg_type = $3 AND NOT is_deleted`, timestamp, height, Layer1Msg); err != nil {
|
||||
if _, err := l.db.Exec(`UPDATE cross_message SET block_timestamp = $1 where height = $2 AND msg_type = $3 AND deleted_at IS NULL`, timestamp, height, Layer1Msg); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *l1CrossMsgOrm) GetL1EarliestNoBlockTimestampHeight() (uint64, error) {
|
||||
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE block_timestamp IS NULL AND msg_type = $1 AND NOT is_deleted ORDER BY height ASC LIMIT 1;`, Layer1Msg)
|
||||
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE block_timestamp IS NULL AND msg_type = $1 AND deleted_at IS NULL ORDER BY height ASC LIMIT 1;`, Layer1Msg)
|
||||
var result uint64
|
||||
if err := row.Scan(&result); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
@@ -23,7 +22,7 @@ func NewL2CrossMsgOrm(db *sqlx.DB) L2CrossMsgOrm {
|
||||
|
||||
func (l *l2CrossMsgOrm) GetL2CrossMsgByHash(l2Hash common.Hash) (*CrossMsg, error) {
|
||||
result := &CrossMsg{}
|
||||
row := l.db.QueryRowx(`SELECT * FROM cross_message WHERE layer2_hash = $1 AND NOT is_deleted;`, l2Hash.String())
|
||||
row := l.db.QueryRowx(`SELECT * FROM cross_message WHERE layer2_hash = $1 AND deleted_at IS NULL;`, l2Hash.String())
|
||||
if err := row.StructScan(result); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, nil
|
||||
@@ -37,7 +36,7 @@ func (l *l2CrossMsgOrm) GetL2CrossMsgByHash(l2Hash common.Hash) (*CrossMsg, erro
|
||||
// Warning: return empty slice if no data found
|
||||
func (l *l2CrossMsgOrm) GetL2CrossMsgByAddress(sender common.Address) ([]*CrossMsg, error) {
|
||||
var results []*CrossMsg
|
||||
rows, err := l.db.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND msg_type = $2 AND NOT is_deleted;`, sender.String(), Layer2Msg)
|
||||
rows, err := l.db.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND msg_type = $2 AND deleted_at IS NULL;`, sender.String(), Layer2Msg)
|
||||
|
||||
for rows.Next() {
|
||||
msg := &CrossMsg{}
|
||||
@@ -56,7 +55,7 @@ func (l *l2CrossMsgOrm) GetL2CrossMsgByAddress(sender common.Address) ([]*CrossM
|
||||
}
|
||||
|
||||
func (l *l2CrossMsgOrm) DeleteL2CrossMsgFromHeightDBTx(dbTx *sqlx.Tx, height int64) error {
|
||||
_, err := dbTx.Exec(`UPDATE cross_message SET is_deleted = true where height > $1 AND msg_type = $2 ;`, height, Layer2Msg)
|
||||
_, err := dbTx.Exec(`UPDATE cross_message SET deleted_at = current_timestamp where height > $1 AND msg_type = $2 ;`, height, Layer2Msg)
|
||||
if err != nil {
|
||||
log.Error("DeleteL1CrossMsgAfterHeightDBTx: failed to delete", "height", height, "err", err)
|
||||
return err
|
||||
@@ -81,20 +80,12 @@ func (l *l2CrossMsgOrm) BatchInsertL2CrossMsgDBTx(dbTx *sqlx.Tx, messages []*Cro
|
||||
"layer2_hash": msg.Layer2Hash,
|
||||
"layer1_token": msg.Layer1Token,
|
||||
"layer2_token": msg.Layer2Token,
|
||||
"token_id": msg.TokenID,
|
||||
"token_ids": msg.TokenIDs,
|
||||
"amount": msg.Amount,
|
||||
"msg_type": Layer2Msg,
|
||||
}
|
||||
var exists bool
|
||||
err = dbTx.QueryRow(`SELECT EXISTS(SELECT 1 FROM cross_message WHERE layer2_hash = $1 AND NOT is_deleted)`, msg.Layer2Hash).Scan(&exists)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if exists {
|
||||
return fmt.Errorf("BatchInsertL2CrossMsgDBTx: l2 cross msg layer2Hash %v already exists at height %v", msg.Layer2Hash, msg.Height)
|
||||
}
|
||||
}
|
||||
_, err = dbTx.NamedExec(`insert into cross_message(height, sender, target, asset, layer2_hash, layer1_token, layer2_token, token_id, amount, msg_type) values(:height, :sender, :target, :asset, :layer2_hash, :layer1_token, :layer2_token, :token_id, :amount, :msg_type);`, messageMaps)
|
||||
_, err = dbTx.NamedExec(`insert into cross_message(height, sender, target, asset, layer2_hash, layer1_token, layer2_token, token_ids, amount, msg_type) values(:height, :sender, :target, :asset, :layer2_hash, :layer1_token, :layer2_token, :token_ids, :amount, :msg_type);`, messageMaps)
|
||||
if err != nil {
|
||||
log.Error("BatchInsertL2CrossMsgDBTx: failed to insert l2 cross msgs", "err", err)
|
||||
return err
|
||||
@@ -103,21 +94,21 @@ func (l *l2CrossMsgOrm) BatchInsertL2CrossMsgDBTx(dbTx *sqlx.Tx, messages []*Cro
|
||||
}
|
||||
|
||||
func (l *l2CrossMsgOrm) UpdateL2CrossMsgHashDBTx(ctx context.Context, dbTx *sqlx.Tx, l2Hash, msgHash common.Hash) error {
|
||||
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update public.cross_message set msg_hash = ? where layer2_hash = ? AND NOT is_deleted;"), msgHash.String(), l2Hash.String()); err != nil {
|
||||
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update cross_message set msg_hash = ? where layer2_hash = ? AND deleted_at IS NULL;"), msgHash.String(), l2Hash.String()); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *l2CrossMsgOrm) UpdateL2CrossMsgHash(ctx context.Context, l2Hash, msgHash common.Hash) error {
|
||||
if _, err := l.db.ExecContext(ctx, l.db.Rebind("update public.cross_message set msg_hash = ? where layer2_hash = ? AND NOT is_deleted;"), msgHash.String(), l2Hash.String()); err != nil {
|
||||
if _, err := l.db.ExecContext(ctx, l.db.Rebind("update cross_message set msg_hash = ? where layer2_hash = ? AND deleted_at IS NULL;"), msgHash.String(), l2Hash.String()); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *l2CrossMsgOrm) GetLatestL2ProcessedHeight() (int64, error) {
|
||||
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE msg_type = $1 AND NOT is_deleted ORDER BY id DESC LIMIT 1;`, Layer2Msg)
|
||||
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE msg_type = $1 AND deleted_at IS NULL ORDER BY id DESC LIMIT 1;`, Layer2Msg)
|
||||
var result sql.NullInt64
|
||||
if err := row.Scan(&result); err != nil {
|
||||
if err == sql.ErrNoRows || !result.Valid {
|
||||
@@ -132,14 +123,14 @@ func (l *l2CrossMsgOrm) GetLatestL2ProcessedHeight() (int64, error) {
|
||||
}
|
||||
|
||||
func (l *l2CrossMsgOrm) UpdateL2BlockTimestamp(height uint64, timestamp time.Time) error {
|
||||
if _, err := l.db.Exec(`UPDATE cross_message SET block_timestamp = $1 where height = $2 AND msg_type = $3 AND NOT is_deleted`, timestamp, height, Layer2Msg); err != nil {
|
||||
if _, err := l.db.Exec(`UPDATE cross_message SET block_timestamp = $1 where height = $2 AND msg_type = $3 AND deleted_at IS NULL`, timestamp, height, Layer2Msg); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *l2CrossMsgOrm) GetL2EarliestNoBlockTimestampHeight() (uint64, error) {
|
||||
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE block_timestamp IS NULL AND msg_type = $1 AND NOT is_deleted ORDER BY height ASC LIMIT 1;`, Layer2Msg)
|
||||
row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE block_timestamp IS NULL AND msg_type = $1 AND deleted_at IS NULL ORDER BY height ASC LIMIT 1;`, Layer2Msg)
|
||||
var result uint64
|
||||
if err := row.Scan(&result); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
|
||||
@@ -3,7 +3,6 @@ package orm
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
@@ -12,6 +11,7 @@ import (
|
||||
|
||||
type L2SentMsg struct {
|
||||
ID uint64 `json:"id" db:"id"`
|
||||
TxSender string `json:"tx_sender" db:"tx_sender"`
|
||||
MsgHash string `json:"msg_hash" db:"msg_hash"`
|
||||
Sender string `json:"sender" db:"sender"`
|
||||
Target string `json:"target" db:"target"`
|
||||
@@ -21,7 +21,6 @@ type L2SentMsg struct {
|
||||
BatchIndex uint64 `json:"batch_index" db:"batch_index"`
|
||||
MsgProof string `json:"msg_proof" db:"msg_proof"`
|
||||
MsgData string `json:"msg_data" db:"msg_data"`
|
||||
IsDeleted bool `json:"is_deleted" db:"is_deleted"`
|
||||
CreatedAt *time.Time `json:"created_at" db:"created_at"`
|
||||
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
|
||||
DeletedAt *time.Time `json:"deleted_at" db:"deleted_at"`
|
||||
@@ -38,7 +37,7 @@ func NewL2SentMsgOrm(db *sqlx.DB) L2SentMsgOrm {
|
||||
|
||||
func (l *l2SentMsgOrm) GetL2SentMsgByHash(msgHash string) (*L2SentMsg, error) {
|
||||
result := &L2SentMsg{}
|
||||
row := l.db.QueryRowx(`SELECT * FROM l2_sent_msg WHERE msg_hash = $1 AND NOT is_deleted;`, msgHash)
|
||||
row := l.db.QueryRowx(`SELECT * FROM l2_sent_msg WHERE msg_hash = $1 AND deleted_at IS NULL;`, msgHash)
|
||||
if err := row.StructScan(result); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -53,6 +52,7 @@ func (l *l2SentMsgOrm) BatchInsertL2SentMsgDBTx(dbTx *sqlx.Tx, messages []*L2Sen
|
||||
messageMaps := make([]map[string]interface{}, len(messages))
|
||||
for i, msg := range messages {
|
||||
messageMaps[i] = map[string]interface{}{
|
||||
"tx_sender": msg.TxSender,
|
||||
"sender": msg.Sender,
|
||||
"target": msg.Target,
|
||||
"value": msg.Value,
|
||||
@@ -63,25 +63,17 @@ func (l *l2SentMsgOrm) BatchInsertL2SentMsgDBTx(dbTx *sqlx.Tx, messages []*L2Sen
|
||||
"msg_proof": msg.MsgProof,
|
||||
"msg_data": msg.MsgData,
|
||||
}
|
||||
var exists bool
|
||||
err = dbTx.QueryRow(`SELECT EXISTS(SELECT 1 FROM l2_sent_msg WHERE (msg_hash = $1 OR nonce = $2) AND NOT is_deleted)`, msg.MsgHash, msg.Nonce).Scan(&exists)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if exists {
|
||||
return fmt.Errorf("BatchInsertL2SentMsgDBTx: l2 sent msg_hash %v already exists at height %v", msg.MsgHash, msg.Height)
|
||||
}
|
||||
}
|
||||
_, err = dbTx.NamedExec(`insert into l2_sent_msg(sender, target, value, msg_hash, height, nonce, batch_index, msg_proof, msg_data) values(:sender, :target, :value, :msg_hash, :height, :nonce, :batch_index, :msg_proof, :msg_data);`, messageMaps)
|
||||
_, err = dbTx.NamedExec(`insert into l2_sent_msg(tx_sender, sender, target, value, msg_hash, height, nonce, batch_index, msg_proof, msg_data) values(:tx_sender, :sender, :target, :value, :msg_hash, :height, :nonce, :batch_index, :msg_proof, :msg_data);`, messageMaps)
|
||||
if err != nil {
|
||||
log.Error("BatchInsertL2SentMsgDBTx: failed to insert l2 sent msgs", "msg_Hash", "err", err)
|
||||
log.Error("BatchInsertL2SentMsgDBTx: failed to insert l2 sent msgs", "err", err)
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (l *l2SentMsgOrm) GetLatestSentMsgHeightOnL2() (int64, error) {
|
||||
row := l.db.QueryRow(`SELECT height FROM l2_sent_msg WHERE NOT is_deleted ORDER BY nonce DESC LIMIT 1;`)
|
||||
row := l.db.QueryRow(`SELECT height FROM l2_sent_msg WHERE deleted_at IS NULL ORDER BY nonce DESC LIMIT 1;`)
|
||||
var result sql.NullInt64
|
||||
if err := row.Scan(&result); err != nil {
|
||||
if err == sql.ErrNoRows || !result.Valid {
|
||||
@@ -96,14 +88,14 @@ func (l *l2SentMsgOrm) GetLatestSentMsgHeightOnL2() (int64, error) {
|
||||
}
|
||||
|
||||
func (l *l2SentMsgOrm) UpdateL2MessageProofInDBTx(ctx context.Context, dbTx *sqlx.Tx, msgHash string, proof string, batch_index uint64) error {
|
||||
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update l2_sent_msg set msg_proof = ?, batch_index = ? where msg_hash = ? AND NOT is_deleted;"), proof, batch_index, msgHash); err != nil {
|
||||
if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update l2_sent_msg set msg_proof = ?, batch_index = ? where msg_hash = ? AND deleted_at IS NULL;"), proof, batch_index, msgHash); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *l2SentMsgOrm) GetLatestL2SentMsgBatchIndex() (int64, error) {
|
||||
row := l.db.QueryRow(`SELECT batch_index FROM l2_sent_msg WHERE msg_proof != '' AND NOT is_deleted ORDER BY batch_index DESC LIMIT 1;`)
|
||||
row := l.db.QueryRow(`SELECT batch_index FROM l2_sent_msg WHERE msg_proof != '' AND deleted_at IS NULL ORDER BY batch_index DESC LIMIT 1;`)
|
||||
var result sql.NullInt64
|
||||
if err := row.Scan(&result); err != nil {
|
||||
if err == sql.ErrNoRows || !result.Valid {
|
||||
@@ -119,7 +111,7 @@ func (l *l2SentMsgOrm) GetLatestL2SentMsgBatchIndex() (int64, error) {
|
||||
|
||||
func (l *l2SentMsgOrm) GetL2SentMsgMsgHashByHeightRange(startHeight, endHeight uint64) ([]*L2SentMsg, error) {
|
||||
var results []*L2SentMsg
|
||||
rows, err := l.db.Queryx(`SELECT * FROM l2_sent_msg WHERE height >= $1 AND height <= $2 AND NOT is_deleted ORDER BY nonce ASC;`, startHeight, endHeight)
|
||||
rows, err := l.db.Queryx(`SELECT * FROM l2_sent_msg WHERE height >= $1 AND height <= $2 AND deleted_at IS NULL ORDER BY nonce ASC;`, startHeight, endHeight)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -135,7 +127,7 @@ func (l *l2SentMsgOrm) GetL2SentMsgMsgHashByHeightRange(startHeight, endHeight u
|
||||
|
||||
func (l *l2SentMsgOrm) GetL2SentMessageByNonce(nonce uint64) (*L2SentMsg, error) {
|
||||
result := &L2SentMsg{}
|
||||
row := l.db.QueryRowx(`SELECT * FROM l2_sent_msg WHERE nonce = $1 AND NOT is_deleted;`, nonce)
|
||||
row := l.db.QueryRowx(`SELECT * FROM l2_sent_msg WHERE nonce = $1 AND deleted_at IS NULL;`, nonce)
|
||||
err := row.StructScan(result)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -145,7 +137,7 @@ func (l *l2SentMsgOrm) GetL2SentMessageByNonce(nonce uint64) (*L2SentMsg, error)
|
||||
|
||||
func (l *l2SentMsgOrm) GetLatestL2SentMsgLEHeight(endBlockNumber uint64) (*L2SentMsg, error) {
|
||||
result := &L2SentMsg{}
|
||||
row := l.db.QueryRowx(`select * from l2_sent_msg where height <= $1 AND NOT is_deleted order by nonce desc limit 1`, endBlockNumber)
|
||||
row := l.db.QueryRowx(`select * from l2_sent_msg where height <= $1 AND deleted_at IS NULL order by nonce desc limit 1`, endBlockNumber)
|
||||
err := row.StructScan(result)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -154,6 +146,6 @@ func (l *l2SentMsgOrm) GetLatestL2SentMsgLEHeight(endBlockNumber uint64) (*L2Sen
|
||||
}
|
||||
|
||||
func (l *l2SentMsgOrm) DeleteL2SentMsgAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error {
|
||||
_, err := dbTx.Exec(`UPDATE l2_sent_msg SET is_deleted = true WHERE height > $1;`, height)
|
||||
_, err := dbTx.Exec(`UPDATE l2_sent_msg SET deleted_at = current_timestamp WHERE height > $1;`, height)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -8,6 +8,13 @@ import (
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
type RelayedMsg struct {
|
||||
MsgHash string `json:"msg_hash" db:"msg_hash"`
|
||||
Height uint64 `json:"height" db:"height"`
|
||||
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
|
||||
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
|
||||
}
|
||||
|
||||
type relayedMsgOrm struct {
|
||||
db *sqlx.DB
|
||||
}
|
||||
@@ -33,7 +40,7 @@ func (l *relayedMsgOrm) BatchInsertRelayedMsgDBTx(dbTx *sqlx.Tx, messages []*Rel
|
||||
}
|
||||
_, err = dbTx.NamedExec(`insert into relayed_msg(msg_hash, height, layer1_hash, layer2_hash) values(:msg_hash, :height, :layer1_hash, :layer2_hash);`, messageMaps)
|
||||
if err != nil {
|
||||
log.Error("BatchInsertRelayedMsgDBTx: failed to insert l1 cross msgs", "msg_Hashe", "err", err)
|
||||
log.Error("BatchInsertRelayedMsgDBTx: failed to insert relayed msgs", "err", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -41,7 +48,7 @@ func (l *relayedMsgOrm) BatchInsertRelayedMsgDBTx(dbTx *sqlx.Tx, messages []*Rel
|
||||
|
||||
func (l *relayedMsgOrm) GetRelayedMsgByHash(msg_hash string) (*RelayedMsg, error) {
|
||||
result := &RelayedMsg{}
|
||||
row := l.db.QueryRowx(`SELECT msg_hash, height, layer1_hash, layer2_hash FROM relayed_msg WHERE msg_hash = $1 AND NOT is_deleted;`, msg_hash)
|
||||
row := l.db.QueryRowx(`SELECT msg_hash, height, layer1_hash, layer2_hash FROM relayed_msg WHERE msg_hash = $1 AND deleted_at IS NULL;`, msg_hash)
|
||||
if err := row.StructScan(result); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, nil
|
||||
@@ -52,7 +59,7 @@ func (l *relayedMsgOrm) GetRelayedMsgByHash(msg_hash string) (*RelayedMsg, error
|
||||
}
|
||||
|
||||
func (l *relayedMsgOrm) GetLatestRelayedHeightOnL1() (int64, error) {
|
||||
row := l.db.QueryRow(`SELECT height FROM relayed_msg WHERE layer1_hash != '' AND NOT is_deleted ORDER BY height DESC LIMIT 1;`)
|
||||
row := l.db.QueryRow(`SELECT height FROM relayed_msg WHERE layer1_hash != '' AND deleted_at IS NULL ORDER BY height DESC LIMIT 1;`)
|
||||
var result sql.NullInt64
|
||||
if err := row.Scan(&result); err != nil {
|
||||
if err == sql.ErrNoRows || !result.Valid {
|
||||
@@ -67,7 +74,7 @@ func (l *relayedMsgOrm) GetLatestRelayedHeightOnL1() (int64, error) {
|
||||
}
|
||||
|
||||
func (l *relayedMsgOrm) GetLatestRelayedHeightOnL2() (int64, error) {
|
||||
row := l.db.QueryRow(`SELECT height FROM relayed_msg WHERE layer2_hash != '' AND NOT is_deleted ORDER BY height DESC LIMIT 1;`)
|
||||
row := l.db.QueryRow(`SELECT height FROM relayed_msg WHERE layer2_hash != '' AND deleted_at IS NULL ORDER BY height DESC LIMIT 1;`)
|
||||
var result sql.NullInt64
|
||||
if err := row.Scan(&result); err != nil {
|
||||
if err == sql.ErrNoRows || !result.Valid {
|
||||
@@ -82,11 +89,11 @@ func (l *relayedMsgOrm) GetLatestRelayedHeightOnL2() (int64, error) {
|
||||
}
|
||||
|
||||
func (l *relayedMsgOrm) DeleteL1RelayedHashAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error {
|
||||
_, err := dbTx.Exec(`UPDATE relayed_msg SET is_deleted = true WHERE height > $1 AND layer1_hash != '';`, height)
|
||||
_, err := dbTx.Exec(`UPDATE relayed_msg SET deleted_at = current_timestamp WHERE height > $1 AND layer1_hash != '';`, height)
|
||||
return err
|
||||
}
|
||||
|
||||
func (l *relayedMsgOrm) DeleteL2RelayedHashAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error {
|
||||
_, err := dbTx.Exec(`UPDATE relayed_msg SET is_deleted = true WHERE height > $1 AND layer2_hash != '';`, height)
|
||||
_, err := dbTx.Exec(`UPDATE relayed_msg SET deleted_at = current_timestamp WHERE height > $1 AND layer2_hash != '';`, height)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -68,7 +68,7 @@ func (o *ormFactory) Beginx() (*sqlx.Tx, error) {
|
||||
|
||||
func (o *ormFactory) GetTotalCrossMsgCountByAddress(sender string) (uint64, error) {
|
||||
var count uint64
|
||||
row := o.DB.QueryRowx(`SELECT COUNT(*) FROM cross_message WHERE sender = $1 AND NOT is_deleted;`, sender)
|
||||
row := o.DB.QueryRowx(`SELECT COUNT(*) FROM cross_message WHERE sender = $1 AND deleted_at IS NULL;`, sender)
|
||||
if err := row.Scan(&count); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@@ -78,7 +78,7 @@ func (o *ormFactory) GetTotalCrossMsgCountByAddress(sender string) (uint64, erro
|
||||
func (o *ormFactory) GetCrossMsgsByAddressWithOffset(sender string, offset int64, limit int64) ([]*orm.CrossMsg, error) {
|
||||
para := sender
|
||||
var results []*orm.CrossMsg
|
||||
rows, err := o.DB.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND NOT is_deleted ORDER BY block_timestamp DESC NULLS FIRST, id DESC LIMIT $2 OFFSET $3;`, para, limit, offset)
|
||||
rows, err := o.DB.Queryx(`SELECT * FROM cross_message WHERE sender = $1 AND deleted_at IS NULL ORDER BY block_timestamp DESC NULLS FIRST, id DESC LIMIT $2 OFFSET $3;`, para, limit, offset)
|
||||
if err != nil || rows == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -77,7 +77,7 @@ func GetCrossTxClaimInfo(msgHash string, db db.OrmFactory) *UserClaimInfo {
|
||||
Value: l2sentMsg.Value,
|
||||
Nonce: strconv.FormatUint(l2sentMsg.Nonce, 10),
|
||||
Message: l2sentMsg.MsgData,
|
||||
Proof: l2sentMsg.MsgProof,
|
||||
Proof: "0x" + l2sentMsg.MsgProof,
|
||||
BatchHash: batch.BatchHash,
|
||||
BatchIndex: strconv.FormatUint(l2sentMsg.BatchIndex, 10),
|
||||
}
|
||||
|
||||
@@ -18,6 +18,11 @@ type MsgHashWrapper struct {
|
||||
TxHash common.Hash
|
||||
}
|
||||
|
||||
type L2SentMsgWrapper struct {
|
||||
L2SentMsg *orm.L2SentMsg
|
||||
TxHash common.Hash
|
||||
}
|
||||
|
||||
type CachedParsedTxCalldata struct {
|
||||
CallDataIndex uint64
|
||||
BatchIndices []uint64
|
||||
@@ -81,7 +86,7 @@ func ParseBackendL1EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
|
||||
Layer1Hash: vlog.TxHash.Hex(),
|
||||
Layer1Token: event.L1Token.Hex(),
|
||||
Layer2Token: event.L2Token.Hex(),
|
||||
TokenID: event.TokenID.Uint64(),
|
||||
TokenIDs: []string{event.TokenID.String()},
|
||||
})
|
||||
case backendabi.L1DepositERC1155Sig:
|
||||
event := backendabi.ERC1155MessageEvent{}
|
||||
@@ -98,7 +103,7 @@ func ParseBackendL1EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
|
||||
Layer1Hash: vlog.TxHash.Hex(),
|
||||
Layer1Token: event.L1Token.Hex(),
|
||||
Layer2Token: event.L2Token.Hex(),
|
||||
TokenID: event.TokenID.Uint64(),
|
||||
TokenIDs: []string{event.TokenID.String()},
|
||||
Amount: event.Amount.String(),
|
||||
})
|
||||
case backendabi.L1SentMessageEventSignature:
|
||||
@@ -131,15 +136,14 @@ func ParseBackendL1EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
|
||||
return l1CrossMsg, msgHashes, relayedMsgs, nil
|
||||
}
|
||||
|
||||
func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrapper, []*orm.RelayedMsg, []*orm.L2SentMsg, error) {
|
||||
func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []*orm.RelayedMsg, []L2SentMsgWrapper, error) {
|
||||
// Need use contract abi to parse event Log
|
||||
// Can only be tested after we have our contracts set up
|
||||
|
||||
var l2CrossMsg []*orm.CrossMsg
|
||||
// this is use to confirm finalized l1 msg
|
||||
var relayedMsgs []*orm.RelayedMsg
|
||||
var l2SentMsg []*orm.L2SentMsg
|
||||
var msgHashes []MsgHashWrapper
|
||||
var l2SentMsg []L2SentMsgWrapper
|
||||
for _, vlog := range logs {
|
||||
switch vlog.Topics[0] {
|
||||
case backendabi.L2WithdrawETHSig:
|
||||
@@ -147,7 +151,7 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
|
||||
err := UnpackLog(backendabi.L2ETHGatewayABI, &event, "WithdrawETH", vlog)
|
||||
if err != nil {
|
||||
log.Warn("Failed to unpack WithdrawETH event", "err", err)
|
||||
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
|
||||
return l2CrossMsg, relayedMsgs, l2SentMsg, err
|
||||
}
|
||||
l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{
|
||||
Height: vlog.BlockNumber,
|
||||
@@ -162,7 +166,7 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
|
||||
err := UnpackLog(backendabi.L2StandardERC20GatewayABI, &event, "WithdrawERC20", vlog)
|
||||
if err != nil {
|
||||
log.Warn("Failed to unpack WithdrawERC20 event", "err", err)
|
||||
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
|
||||
return l2CrossMsg, relayedMsgs, l2SentMsg, err
|
||||
}
|
||||
l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{
|
||||
Height: vlog.BlockNumber,
|
||||
@@ -179,7 +183,7 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
|
||||
err := UnpackLog(backendabi.L2ERC721GatewayABI, &event, "WithdrawERC721", vlog)
|
||||
if err != nil {
|
||||
log.Warn("Failed to unpack WithdrawERC721 event", "err", err)
|
||||
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
|
||||
return l2CrossMsg, relayedMsgs, l2SentMsg, err
|
||||
}
|
||||
l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{
|
||||
Height: vlog.BlockNumber,
|
||||
@@ -189,14 +193,14 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
|
||||
Layer2Hash: vlog.TxHash.Hex(),
|
||||
Layer1Token: event.L1Token.Hex(),
|
||||
Layer2Token: event.L2Token.Hex(),
|
||||
TokenID: event.TokenID.Uint64(),
|
||||
TokenIDs: []string{event.TokenID.String()},
|
||||
})
|
||||
case backendabi.L2WithdrawERC1155Sig:
|
||||
event := backendabi.ERC1155MessageEvent{}
|
||||
err := UnpackLog(backendabi.L2ERC1155GatewayABI, &event, "WithdrawERC1155", vlog)
|
||||
if err != nil {
|
||||
log.Warn("Failed to unpack WithdrawERC1155 event", "err", err)
|
||||
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
|
||||
return l2CrossMsg, relayedMsgs, l2SentMsg, err
|
||||
}
|
||||
l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{
|
||||
Height: vlog.BlockNumber,
|
||||
@@ -206,7 +210,7 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
|
||||
Layer2Hash: vlog.TxHash.Hex(),
|
||||
Layer1Token: event.L1Token.Hex(),
|
||||
Layer2Token: event.L2Token.Hex(),
|
||||
TokenID: event.TokenID.Uint64(),
|
||||
TokenIDs: []string{event.TokenID.String()},
|
||||
Amount: event.Amount.String(),
|
||||
})
|
||||
case backendabi.L2SentMessageEventSignature:
|
||||
@@ -214,27 +218,28 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
|
||||
err := UnpackLog(backendabi.L2ScrollMessengerABI, &event, "SentMessage", vlog)
|
||||
if err != nil {
|
||||
log.Warn("Failed to unpack SentMessage event", "err", err)
|
||||
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
|
||||
return l2CrossMsg, relayedMsgs, l2SentMsg, err
|
||||
}
|
||||
msgHash := ComputeMessageHash(event.Sender, event.Target, event.Value, event.MessageNonce, event.Message)
|
||||
msgHashes = append(msgHashes, MsgHashWrapper{
|
||||
MsgHash: msgHash,
|
||||
TxHash: vlog.TxHash})
|
||||
l2SentMsg = append(l2SentMsg, &orm.L2SentMsg{
|
||||
Sender: event.Sender.Hex(),
|
||||
Target: event.Target.Hex(),
|
||||
Value: event.Value.String(),
|
||||
MsgHash: msgHash.Hex(),
|
||||
Height: vlog.BlockNumber,
|
||||
Nonce: event.MessageNonce.Uint64(),
|
||||
MsgData: hexutil.Encode(event.Message),
|
||||
})
|
||||
l2SentMsg = append(l2SentMsg,
|
||||
L2SentMsgWrapper{
|
||||
TxHash: vlog.TxHash,
|
||||
L2SentMsg: &orm.L2SentMsg{
|
||||
Sender: event.Sender.Hex(),
|
||||
Target: event.Target.Hex(),
|
||||
Value: event.Value.String(),
|
||||
MsgHash: msgHash.Hex(),
|
||||
Height: vlog.BlockNumber,
|
||||
Nonce: event.MessageNonce.Uint64(),
|
||||
MsgData: hexutil.Encode(event.Message),
|
||||
},
|
||||
})
|
||||
case backendabi.L2RelayedMessageEventSignature:
|
||||
event := backendabi.L2RelayedMessageEvent{}
|
||||
err := UnpackLog(backendabi.L2ScrollMessengerABI, &event, "RelayedMessage", vlog)
|
||||
if err != nil {
|
||||
log.Warn("Failed to unpack RelayedMessage event", "err", err)
|
||||
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err
|
||||
return l2CrossMsg, relayedMsgs, l2SentMsg, err
|
||||
}
|
||||
relayedMsgs = append(relayedMsgs, &orm.RelayedMsg{
|
||||
MsgHash: event.MessageHash.String(),
|
||||
@@ -244,7 +249,7 @@ func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrappe
|
||||
|
||||
}
|
||||
}
|
||||
return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, nil
|
||||
return l2CrossMsg, relayedMsgs, l2SentMsg, nil
|
||||
}
|
||||
|
||||
func ParseBatchInfoFromScrollChain(ctx context.Context, client *ethclient.Client, logs []types.Log) ([]*orm.RollupBatch, error) {
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
)
|
||||
|
||||
var app *cli.App
|
||||
var logger log.Logger
|
||||
|
||||
func init() {
|
||||
// Set up event-watcher app info.
|
||||
@@ -32,7 +33,9 @@ func init() {
|
||||
app.Flags = append(app.Flags, cutils.CommonFlags...)
|
||||
app.Commands = []*cli.Command{}
|
||||
app.Before = func(ctx *cli.Context) error {
|
||||
return cutils.LogSetup(ctx)
|
||||
var err error
|
||||
logger, err = cutils.LogSetup(ctx)
|
||||
return err
|
||||
}
|
||||
// Register `event-watcher-test` app for integration-test.
|
||||
cutils.RegisterSimulation(app, cutils.EventWatcherApp)
|
||||
@@ -48,7 +51,7 @@ func action(ctx *cli.Context) error {
|
||||
|
||||
subCtx, cancel := context.WithCancel(ctx.Context)
|
||||
// Init db connection
|
||||
db, err := utils.InitDB(cfg.DBConfig)
|
||||
db, err := utils.InitDB(cfg.DBConfig, logger)
|
||||
if err != nil {
|
||||
log.Crit("failed to init db connection", "err", err)
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
)
|
||||
|
||||
var app *cli.App
|
||||
var logger log.Logger
|
||||
|
||||
func init() {
|
||||
// Set up gas-oracle app info.
|
||||
@@ -34,7 +35,9 @@ func init() {
|
||||
app.Flags = append(app.Flags, cutils.CommonFlags...)
|
||||
app.Commands = []*cli.Command{}
|
||||
app.Before = func(ctx *cli.Context) error {
|
||||
return cutils.LogSetup(ctx)
|
||||
var err error
|
||||
logger, err = cutils.LogSetup(ctx)
|
||||
return err
|
||||
}
|
||||
// Register `gas-oracle-test` app for integration-test.
|
||||
cutils.RegisterSimulation(app, cutils.GasOracleApp)
|
||||
@@ -49,7 +52,7 @@ func action(ctx *cli.Context) error {
|
||||
}
|
||||
subCtx, cancel := context.WithCancel(ctx.Context)
|
||||
// Init db connection
|
||||
db, err := utils.InitDB(cfg.DBConfig)
|
||||
db, err := utils.InitDB(cfg.DBConfig, logger)
|
||||
if err != nil {
|
||||
log.Crit("failed to init db connection", "err", err)
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
)
|
||||
|
||||
var app *cli.App
|
||||
var logger log.Logger
|
||||
|
||||
func init() {
|
||||
// Set up message-relayer app info.
|
||||
@@ -32,7 +33,9 @@ func init() {
|
||||
app.Flags = append(app.Flags, cutils.CommonFlags...)
|
||||
app.Commands = []*cli.Command{}
|
||||
app.Before = func(ctx *cli.Context) error {
|
||||
return cutils.LogSetup(ctx)
|
||||
var err error
|
||||
logger, err = cutils.LogSetup(ctx)
|
||||
return err
|
||||
}
|
||||
// Register `message-relayer-test` app for integration-test.
|
||||
cutils.RegisterSimulation(app, cutils.MessageRelayerApp)
|
||||
@@ -48,7 +51,7 @@ func action(ctx *cli.Context) error {
|
||||
|
||||
subCtx, cancel := context.WithCancel(ctx.Context)
|
||||
// Init db connection
|
||||
db, err := utils.InitDB(cfg.DBConfig)
|
||||
db, err := utils.InitDB(cfg.DBConfig, logger)
|
||||
if err != nil {
|
||||
log.Crit("failed to init db connection", "err", err)
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
)
|
||||
|
||||
var app *cli.App
|
||||
var logger log.Logger
|
||||
|
||||
func init() {
|
||||
// Set up rollup-relayer app info.
|
||||
@@ -34,7 +35,9 @@ func init() {
|
||||
app.Flags = append(app.Flags, cutils.RollupRelayerFlags...)
|
||||
app.Commands = []*cli.Command{}
|
||||
app.Before = func(ctx *cli.Context) error {
|
||||
return cutils.LogSetup(ctx)
|
||||
var err error
|
||||
logger, err = cutils.LogSetup(ctx)
|
||||
return err
|
||||
}
|
||||
// Register `rollup-relayer-test` app for integration-test.
|
||||
cutils.RegisterSimulation(app, cutils.RollupRelayerApp)
|
||||
@@ -50,7 +53,7 @@ func action(ctx *cli.Context) error {
|
||||
|
||||
subCtx, cancel := context.WithCancel(ctx.Context)
|
||||
// Init db connection
|
||||
db, err := utils.InitDB(cfg.DBConfig)
|
||||
db, err := utils.InitDB(cfg.DBConfig, logger)
|
||||
if err != nil {
|
||||
log.Crit("failed to init db connection", "err", err)
|
||||
}
|
||||
|
||||
@@ -49,7 +49,9 @@ var (
|
||||
)
|
||||
|
||||
func setupL1RelayerDB(t *testing.T) *gorm.DB {
|
||||
db, err := bridgeUtils.InitDB(cfg.DBConfig)
|
||||
logger, err1 := utils.LogSetup(nil)
|
||||
assert.NoError(t, err1)
|
||||
db, err := bridgeUtils.InitDB(cfg.DBConfig, logger)
|
||||
assert.NoError(t, err)
|
||||
sqlDB, err := db.DB()
|
||||
assert.NoError(t, err)
|
||||
|
||||
@@ -24,7 +24,9 @@ import (
|
||||
)
|
||||
|
||||
func setupL2RelayerDB(t *testing.T) *gorm.DB {
|
||||
db, err := bridgeUtils.InitDB(cfg.DBConfig)
|
||||
logger, err1 := utils.LogSetup(nil)
|
||||
assert.NoError(t, err1)
|
||||
db, err := bridgeUtils.InitDB(cfg.DBConfig, logger)
|
||||
assert.NoError(t, err)
|
||||
sqlDB, err := db.DB()
|
||||
assert.NoError(t, err)
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"gorm.io/gorm"
|
||||
|
||||
"scroll-tech/common/docker"
|
||||
cutils "scroll-tech/common/utils"
|
||||
|
||||
"scroll-tech/bridge/internal/config"
|
||||
"scroll-tech/bridge/internal/orm/migrate"
|
||||
@@ -74,7 +75,9 @@ func setupEnv(t *testing.T) (err error) {
|
||||
}
|
||||
|
||||
func setupDB(t *testing.T) *gorm.DB {
|
||||
db, err := utils.InitDB(cfg.DBConfig)
|
||||
logger, err1 := cutils.LogSetup(nil)
|
||||
assert.NoError(t, err1)
|
||||
db, err := utils.InitDB(cfg.DBConfig, logger)
|
||||
assert.NoError(t, err)
|
||||
sqlDB, err := db.DB()
|
||||
assert.NoError(t, err)
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
"scroll-tech/common/docker"
|
||||
"scroll-tech/common/types"
|
||||
cutils "scroll-tech/common/utils"
|
||||
|
||||
"scroll-tech/bridge/internal/config"
|
||||
"scroll-tech/bridge/internal/orm/migrate"
|
||||
@@ -45,6 +46,10 @@ func TestMain(m *testing.M) {
|
||||
func setupEnv(t *testing.T) {
|
||||
base = docker.NewDockerApp()
|
||||
base.RunDBImage(t)
|
||||
|
||||
logger, err1 := cutils.LogSetup(nil)
|
||||
assert.NoError(t, err1)
|
||||
|
||||
var err error
|
||||
db, err = utils.InitDB(
|
||||
&config.DBConfig{
|
||||
@@ -53,6 +58,7 @@ func setupEnv(t *testing.T) {
|
||||
MaxOpenNum: base.DBConfig.MaxOpenNum,
|
||||
MaxIdleNum: base.DBConfig.MaxIdleNum,
|
||||
},
|
||||
logger,
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
sqlDB, err := db.DB()
|
||||
|
||||
@@ -1,17 +1,52 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"gorm.io/driver/postgres"
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/logger"
|
||||
"gorm.io/gorm/utils"
|
||||
|
||||
"scroll-tech/bridge/internal/config"
|
||||
)
|
||||
|
||||
type gormLogger struct {
|
||||
gethLogger log.Logger
|
||||
}
|
||||
|
||||
func (g *gormLogger) LogMode(level logger.LogLevel) logger.Interface {
|
||||
return g
|
||||
}
|
||||
|
||||
func (g *gormLogger) Info(_ context.Context, msg string, data ...interface{}) {
|
||||
g.gethLogger.Info(msg, data)
|
||||
}
|
||||
|
||||
func (g *gormLogger) Warn(_ context.Context, msg string, data ...interface{}) {
|
||||
g.gethLogger.Warn(msg, data)
|
||||
}
|
||||
|
||||
func (g *gormLogger) Error(_ context.Context, msg string, data ...interface{}) {
|
||||
g.gethLogger.Error(msg, data)
|
||||
}
|
||||
|
||||
func (g *gormLogger) Trace(_ context.Context, begin time.Time, fc func() (string, int64), err error) {
|
||||
elapsed := time.Since(begin)
|
||||
rows, sql := fc()
|
||||
g.gethLogger.Debug("gorm", "line", utils.FileWithLineNum(), "cost", elapsed, "rows", sql, "sql", rows)
|
||||
}
|
||||
|
||||
// InitDB init the db handler
|
||||
func InitDB(config *config.DBConfig) (*gorm.DB, error) {
|
||||
func InitDB(config *config.DBConfig, gethLogger log.Logger) (*gorm.DB, error) {
|
||||
tmpGormLogger := gormLogger{
|
||||
gethLogger: gethLogger,
|
||||
}
|
||||
|
||||
db, err := gorm.Open(postgres.Open(config.DSN), &gorm.Config{
|
||||
Logger: logger.Default.LogMode(logger.Info),
|
||||
Logger: &tmpGormLogger,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"gorm.io/gorm"
|
||||
|
||||
"scroll-tech/common/docker"
|
||||
cutils "scroll-tech/common/utils"
|
||||
|
||||
bcmd "scroll-tech/bridge/cmd"
|
||||
"scroll-tech/bridge/internal/config"
|
||||
@@ -52,7 +53,9 @@ func setupDB(t *testing.T) *gorm.DB {
|
||||
MaxOpenNum: base.DBConfig.MaxOpenNum,
|
||||
MaxIdleNum: base.DBConfig.MaxIdleNum,
|
||||
}
|
||||
db, err := utils.InitDB(cfg)
|
||||
logger, err1 := cutils.LogSetup(nil)
|
||||
assert.NoError(t, err1)
|
||||
db, err := utils.InitDB(cfg, logger)
|
||||
assert.NoError(t, err)
|
||||
sqlDB, err := db.DB()
|
||||
assert.NoError(t, err)
|
||||
|
||||
@@ -1,56 +0,0 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/big"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/core/types"
|
||||
"github.com/scroll-tech/go-ethereum/rpc"
|
||||
)
|
||||
|
||||
type ethClient interface {
|
||||
BlockNumber(ctx context.Context) (uint64, error)
|
||||
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
|
||||
}
|
||||
|
||||
// GetLatestConfirmedBlockNumber get confirmed block number by rpc.BlockNumber type.
|
||||
func GetLatestConfirmedBlockNumber(ctx context.Context, client ethClient, confirm rpc.BlockNumber) (uint64, error) {
|
||||
switch true {
|
||||
case confirm == rpc.SafeBlockNumber || confirm == rpc.FinalizedBlockNumber:
|
||||
var tag *big.Int
|
||||
if confirm == rpc.FinalizedBlockNumber {
|
||||
tag = big.NewInt(int64(rpc.FinalizedBlockNumber))
|
||||
} else {
|
||||
tag = big.NewInt(int64(rpc.SafeBlockNumber))
|
||||
}
|
||||
|
||||
header, err := client.HeaderByNumber(ctx, tag)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if !header.Number.IsInt64() {
|
||||
return 0, fmt.Errorf("received invalid block confirm: %v", header.Number)
|
||||
}
|
||||
return header.Number.Uint64(), nil
|
||||
case confirm == rpc.LatestBlockNumber:
|
||||
number, err := client.BlockNumber(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return number, nil
|
||||
case confirm.Int64() >= 0: // If it's positive integer, consider it as a certain confirm value.
|
||||
number, err := client.BlockNumber(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
cfmNum := uint64(confirm.Int64())
|
||||
|
||||
if number >= cfmNum {
|
||||
return number - cfmNum, nil
|
||||
}
|
||||
return 0, nil
|
||||
default:
|
||||
return 0, fmt.Errorf("unknown confirmation type: %v", confirm)
|
||||
}
|
||||
}
|
||||
@@ -1,134 +0,0 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"math/big"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common/math"
|
||||
"github.com/scroll-tech/go-ethereum/core/types"
|
||||
"github.com/scroll-tech/go-ethereum/rpc"
|
||||
)
|
||||
|
||||
var (
|
||||
tests = []struct {
|
||||
input string
|
||||
mustFail bool
|
||||
expected rpc.BlockNumber
|
||||
}{
|
||||
{`"0x"`, true, rpc.BlockNumber(0)},
|
||||
{`"0x0"`, false, rpc.BlockNumber(0)},
|
||||
{`"0X1"`, false, rpc.BlockNumber(1)},
|
||||
{`"0x00"`, true, rpc.BlockNumber(0)},
|
||||
{`"0x01"`, true, rpc.BlockNumber(0)},
|
||||
{`"0x1"`, false, rpc.BlockNumber(1)},
|
||||
{`"0x12"`, false, rpc.BlockNumber(18)},
|
||||
{`"0x7fffffffffffffff"`, false, rpc.BlockNumber(math.MaxInt64)},
|
||||
{`"0x8000000000000000"`, true, rpc.BlockNumber(0)},
|
||||
{"0", true, rpc.BlockNumber(0)},
|
||||
{`"ff"`, true, rpc.BlockNumber(0)},
|
||||
{`"safe"`, false, rpc.SafeBlockNumber},
|
||||
{`"finalized"`, false, rpc.FinalizedBlockNumber},
|
||||
{`"pending"`, false, rpc.PendingBlockNumber},
|
||||
{`"latest"`, false, rpc.LatestBlockNumber},
|
||||
{`"earliest"`, false, rpc.EarliestBlockNumber},
|
||||
{`someString`, true, rpc.BlockNumber(0)},
|
||||
{`""`, true, rpc.BlockNumber(0)},
|
||||
{``, true, rpc.BlockNumber(0)},
|
||||
}
|
||||
)
|
||||
|
||||
func TestUnmarshalJSON(t *testing.T) {
|
||||
for i, test := range tests {
|
||||
var num rpc.BlockNumber
|
||||
err := json.Unmarshal([]byte(test.input), &num)
|
||||
if test.mustFail && err == nil {
|
||||
t.Errorf("Test %d should fail", i)
|
||||
continue
|
||||
}
|
||||
if !test.mustFail && err != nil {
|
||||
t.Errorf("Test %d should pass but got err: %v", i, err)
|
||||
continue
|
||||
}
|
||||
if num != test.expected {
|
||||
t.Errorf("Test %d got unexpected value, want %d, got %d", i, test.expected, num)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalJSON(t *testing.T) {
|
||||
for i, test := range tests {
|
||||
var num rpc.BlockNumber
|
||||
want, err := json.Marshal(test.expected)
|
||||
assert.NoError(t, err)
|
||||
if !test.mustFail {
|
||||
err = json.Unmarshal([]byte(test.input), &num)
|
||||
assert.NoError(t, err)
|
||||
got, err := json.Marshal(&num)
|
||||
assert.NoError(t, err)
|
||||
if string(want) != string(got) {
|
||||
t.Errorf("Test %d got unexpected value, want %d, got %d", i, test.expected, num)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type MockEthClient struct {
|
||||
val uint64
|
||||
}
|
||||
|
||||
func (e MockEthClient) BlockNumber(ctx context.Context) (uint64, error) {
|
||||
return e.val, nil
|
||||
}
|
||||
|
||||
func (e MockEthClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
|
||||
var blockNumber int64
|
||||
switch number.Int64() {
|
||||
case int64(rpc.LatestBlockNumber):
|
||||
blockNumber = int64(e.val)
|
||||
case int64(rpc.SafeBlockNumber):
|
||||
blockNumber = int64(e.val) - 6
|
||||
case int64(rpc.FinalizedBlockNumber):
|
||||
blockNumber = int64(e.val) - 12
|
||||
default:
|
||||
blockNumber = number.Int64()
|
||||
}
|
||||
if blockNumber < 0 {
|
||||
blockNumber = 0
|
||||
}
|
||||
|
||||
return &types.Header{Number: new(big.Int).SetInt64(blockNumber)}, nil
|
||||
}
|
||||
|
||||
func TestGetLatestConfirmedBlockNumber(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
client := MockEthClient{}
|
||||
|
||||
testCases := []struct {
|
||||
blockNumber uint64
|
||||
confirmation rpc.BlockNumber
|
||||
expectedResult uint64
|
||||
}{
|
||||
{5, 6, 0},
|
||||
{7, 6, 1},
|
||||
{10, 2, 8},
|
||||
{0, 1, 0},
|
||||
{3, 0, 3},
|
||||
{15, 15, 0},
|
||||
{16, rpc.SafeBlockNumber, 10},
|
||||
{22, rpc.FinalizedBlockNumber, 10},
|
||||
{10, rpc.LatestBlockNumber, 10},
|
||||
{5, rpc.SafeBlockNumber, 0},
|
||||
{11, rpc.FinalizedBlockNumber, 0},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
client.val = testCase.blockNumber
|
||||
confirmed, err := GetLatestConfirmedBlockNumber(ctx, &client, testCase.confirmation)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, testCase.expectedResult, confirmed)
|
||||
}
|
||||
}
|
||||
@@ -1,65 +0,0 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/big"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/accounts/abi"
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/core/types"
|
||||
"github.com/scroll-tech/go-ethereum/crypto"
|
||||
|
||||
bridgeabi "scroll-tech/bridge/abi"
|
||||
)
|
||||
|
||||
// Keccak2 compute the keccack256 of two concatenations of bytes32
|
||||
func Keccak2(a common.Hash, b common.Hash) common.Hash {
|
||||
return common.BytesToHash(crypto.Keccak256(append(a.Bytes()[:], b.Bytes()[:]...)))
|
||||
}
|
||||
|
||||
// ComputeMessageHash compute the message hash
|
||||
func ComputeMessageHash(
|
||||
sender common.Address,
|
||||
target common.Address,
|
||||
value *big.Int,
|
||||
messageNonce *big.Int,
|
||||
message []byte,
|
||||
) common.Hash {
|
||||
data, _ := bridgeabi.L2ScrollMessengerABI.Pack("relayMessage", sender, target, value, messageNonce, message)
|
||||
return common.BytesToHash(crypto.Keccak256(data))
|
||||
}
|
||||
|
||||
// BufferToUint256Le convert bytes array to uint256 array assuming little-endian
|
||||
func BufferToUint256Le(buffer []byte) []*big.Int {
|
||||
buffer256 := make([]*big.Int, len(buffer)/32)
|
||||
for i := 0; i < len(buffer)/32; i++ {
|
||||
v := big.NewInt(0)
|
||||
shft := big.NewInt(1)
|
||||
for j := 0; j < 32; j++ {
|
||||
v = new(big.Int).Add(v, new(big.Int).Mul(shft, big.NewInt(int64(buffer[i*32+j]))))
|
||||
shft = new(big.Int).Mul(shft, big.NewInt(256))
|
||||
}
|
||||
buffer256[i] = v
|
||||
}
|
||||
return buffer256
|
||||
}
|
||||
|
||||
// UnpackLog unpacks a retrieved log into the provided output structure.
|
||||
// @todo: add unit test.
|
||||
func UnpackLog(c *abi.ABI, out interface{}, event string, log types.Log) error {
|
||||
if log.Topics[0] != c.Events[event].ID {
|
||||
return fmt.Errorf("event signature mismatch")
|
||||
}
|
||||
if len(log.Data) > 0 {
|
||||
if err := c.UnpackIntoInterface(out, event, log.Data); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
var indexed abi.Arguments
|
||||
for _, arg := range c.Events[event].Inputs {
|
||||
if arg.Indexed {
|
||||
indexed = append(indexed, arg)
|
||||
}
|
||||
}
|
||||
return abi.ParseTopics(out, indexed, log.Topics[1:])
|
||||
}
|
||||
@@ -1,49 +0,0 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
"testing"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestKeccak2(t *testing.T) {
|
||||
hash := Keccak2(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"), common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"))
|
||||
if hash != common.HexToHash("0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5") {
|
||||
t.Fatalf("Invalid keccak, want %s, got %s", "0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5", hash.Hex())
|
||||
}
|
||||
|
||||
hash = Keccak2(common.HexToHash("0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5"), common.HexToHash("0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5"))
|
||||
if hash != common.HexToHash("0xb4c11951957c6f8f642c4af61cd6b24640fec6dc7fc607ee8206a99e92410d30") {
|
||||
t.Fatalf("Invalid keccak, want %s, got %s", "0xb4c11951957c6f8f642c4af61cd6b24640fec6dc7fc607ee8206a99e92410d30", hash.Hex())
|
||||
}
|
||||
|
||||
hash = Keccak2(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001"), common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000002"))
|
||||
if hash != common.HexToHash("0xe90b7bceb6e7df5418fb78d8ee546e97c83a08bbccc01a0644d599ccd2a7c2e0") {
|
||||
t.Fatalf("Invalid keccak, want %s, got %s", "0xe90b7bceb6e7df5418fb78d8ee546e97c83a08bbccc01a0644d599ccd2a7c2e0", hash.Hex())
|
||||
}
|
||||
}
|
||||
|
||||
func TestComputeMessageHash(t *testing.T) {
|
||||
hash := ComputeMessageHash(
|
||||
common.HexToAddress("0x1C5A77d9FA7eF466951B2F01F724BCa3A5820b63"),
|
||||
common.HexToAddress("0x4592D8f8D7B001e72Cb26A73e4Fa1806a51aC79d"),
|
||||
big.NewInt(0),
|
||||
big.NewInt(1),
|
||||
[]byte("testbridgecontract"),
|
||||
)
|
||||
assert.Equal(t, "0xda253c04595a49017bb54b1b46088c69752b5ad2f0c47971ac76b8b25abec202", hash.String())
|
||||
}
|
||||
|
||||
func TestBufferToUint256Le(t *testing.T) {
|
||||
input := []byte{
|
||||
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
}
|
||||
expectedOutput := []*big.Int{big.NewInt(1)}
|
||||
result := BufferToUint256Le(input)
|
||||
assert.Equal(t, expectedOutput, result)
|
||||
}
|
||||
@@ -5,8 +5,6 @@ import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"scroll-tech/common/types/message"
|
||||
)
|
||||
|
||||
// L1BlockStatus represents current l1 block processing status
|
||||
@@ -161,11 +159,18 @@ type RollerStatus struct {
|
||||
|
||||
// SessionInfo is assigned rollers info of a block batch (session)
|
||||
type SessionInfo struct {
|
||||
ID string `json:"id"`
|
||||
Rollers map[string]*RollerStatus `json:"rollers"`
|
||||
StartTimestamp int64 `json:"start_timestamp"`
|
||||
Attempts uint8 `json:"attempts,omitempty"`
|
||||
ProveType message.ProveType `json:"prove_type,omitempty"`
|
||||
ID int `json:"id" db:"id"`
|
||||
TaskID string `json:"task_id" db:"task_id"`
|
||||
RollerPublicKey string `json:"roller_public_key" db:"roller_public_key"`
|
||||
ProveType int16 `json:"prove_type" db:"prove_type"`
|
||||
RollerName string `json:"roller_name" db:"roller_name"`
|
||||
ProvingStatus int16 `json:"proving_status" db:"proving_status"`
|
||||
FailureType int16 `json:"failure_type" db:"failure_type"`
|
||||
Reward uint64 `json:"reward" db:"reward"`
|
||||
Proof []byte `json:"proof" db:"proof"`
|
||||
CreatedAt *time.Time `json:"created_at" db:"created_at"`
|
||||
UpdatedAt *time.Time `json:"updated_at" db:"updated_at"`
|
||||
DeletedAt *time.Time `json:"deleted_at" db:"deleted_at"`
|
||||
}
|
||||
|
||||
// ProvingStatus block_batch proving_status (unassigned, assigned, proved, verified, submitted)
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
)
|
||||
|
||||
// LogSetup is for setup logger
|
||||
func LogSetup(ctx *cli.Context) error {
|
||||
func LogSetup(ctx *cli.Context) (log.Logger, error) {
|
||||
var ostream log.Handler
|
||||
if logFile := ctx.String(LogFileFlag.Name); len(logFile) > 0 {
|
||||
fp, err := os.OpenFile(filepath.Clean(logFile), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
|
||||
@@ -38,6 +38,7 @@ func LogSetup(ctx *cli.Context) error {
|
||||
glogger := log.NewGlogHandler(ostream)
|
||||
// Set log level
|
||||
glogger.Verbosity(log.Lvl(ctx.Int(VerbosityFlag.Name)))
|
||||
log.Root().SetHandler(glogger)
|
||||
return nil
|
||||
logger := log.Root()
|
||||
logger.SetHandler(glogger)
|
||||
return logger, nil
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
var tag = "v4.0.6"
|
||||
var tag = "v4.0.7"
|
||||
|
||||
var commit = func() string {
|
||||
if info, ok := debug.ReadBuildInfo(); ok {
|
||||
|
||||
@@ -51,10 +51,12 @@ func (m *Manager) ListRollers() ([]*RollerInfo, error) {
|
||||
PublicKey: pk,
|
||||
}
|
||||
for id, sess := range m.sessions {
|
||||
if _, ok := sess.info.Rollers[pk]; ok {
|
||||
info.ActiveSessionStartTime = time.Unix(sess.info.StartTimestamp, 0)
|
||||
info.ActiveSession = id
|
||||
break
|
||||
for _, sessionInfo := range sess.sessionInfos {
|
||||
if sessionInfo.RollerPublicKey == pk {
|
||||
info.ActiveSessionStartTime = *sessionInfo.CreatedAt
|
||||
info.ActiveSession = id
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
res = append(res, info)
|
||||
@@ -66,14 +68,14 @@ func (m *Manager) ListRollers() ([]*RollerInfo, error) {
|
||||
func newSessionInfo(sess *session, status types.ProvingStatus, errMsg string, finished bool) *SessionInfo {
|
||||
now := time.Now()
|
||||
var nameList []string
|
||||
for pk := range sess.info.Rollers {
|
||||
nameList = append(nameList, sess.info.Rollers[pk].Name)
|
||||
for _, sessionInfo := range sess.sessionInfos {
|
||||
nameList = append(nameList, sessionInfo.RollerName)
|
||||
}
|
||||
info := SessionInfo{
|
||||
ID: sess.info.ID,
|
||||
ID: sess.taskID,
|
||||
Status: status.String(),
|
||||
AssignedRollers: nameList,
|
||||
StartTime: time.Unix(sess.info.StartTimestamp, 0),
|
||||
StartTime: *sess.sessionInfos[0].CreatedAt,
|
||||
Error: errMsg,
|
||||
}
|
||||
if finished {
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"os/signal"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/ethclient"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
@@ -36,7 +35,8 @@ func init() {
|
||||
app.Flags = append(app.Flags, apiFlags...)
|
||||
|
||||
app.Before = func(ctx *cli.Context) error {
|
||||
return utils.LogSetup(ctx)
|
||||
_, err := utils.LogSetup(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
// Register `coordinator-test` app for integration-test.
|
||||
|
||||
@@ -57,7 +57,8 @@ type rollerProofStatus struct {
|
||||
|
||||
// Contains all the information on an ongoing proof generation session.
|
||||
type session struct {
|
||||
info *types.SessionInfo
|
||||
taskID string
|
||||
sessionInfos []*types.SessionInfo
|
||||
// finish channel is used to pass the public key of the rollers who finished proving process.
|
||||
finishChan chan rollerProofStatus
|
||||
}
|
||||
@@ -248,24 +249,21 @@ func (m *Manager) restorePrevSessions() {
|
||||
log.Error("failed to recover roller session info from db", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
sessionInfosMaps := make(map[string][]*types.SessionInfo)
|
||||
for _, v := range prevSessions {
|
||||
log.Info("restore roller info for session", "session start time", v.CreatedAt, "session id", v.TaskID, "roller name",
|
||||
v.RollerName, "prove type", v.ProveType, "public key", v.RollerPublicKey, "proof status", v.ProvingStatus)
|
||||
sessionInfosMaps[v.TaskID] = append(sessionInfosMaps[v.TaskID], v)
|
||||
}
|
||||
|
||||
for taskID, sessionInfos := range sessionInfosMaps {
|
||||
sess := &session{
|
||||
info: v,
|
||||
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
|
||||
taskID: taskID,
|
||||
sessionInfos: sessionInfos,
|
||||
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
|
||||
}
|
||||
m.sessions[sess.info.ID] = sess
|
||||
|
||||
log.Info("Coordinator restart reload sessions", "session start time", time.Unix(sess.info.StartTimestamp, 0))
|
||||
for _, roller := range sess.info.Rollers {
|
||||
log.Info(
|
||||
"restore roller info for session",
|
||||
"session id", sess.info.ID,
|
||||
"roller name", roller.Name,
|
||||
"prove type", sess.info.ProveType,
|
||||
"public key", roller.PublicKey,
|
||||
"proof status", roller.Status)
|
||||
}
|
||||
|
||||
m.sessions[taskID] = sess
|
||||
go m.CollectProofs(sess)
|
||||
}
|
||||
|
||||
@@ -287,36 +285,40 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
if !ok {
|
||||
return fmt.Errorf("proof generation session for id %v does not existID", msg.ID)
|
||||
}
|
||||
proofTime := time.Since(time.Unix(sess.info.StartTimestamp, 0))
|
||||
|
||||
var tmpSessionInfo *types.SessionInfo
|
||||
for _, si := range sess.sessionInfos {
|
||||
// get the send session info of this proof msg
|
||||
if si.TaskID == msg.ID && si.RollerPublicKey == pk {
|
||||
tmpSessionInfo = si
|
||||
}
|
||||
}
|
||||
|
||||
if tmpSessionInfo == nil {
|
||||
return fmt.Errorf("proof generation session for id %v pk:%s does not existID", msg.ID, pk)
|
||||
}
|
||||
|
||||
proofTime := time.Since(*tmpSessionInfo.CreatedAt)
|
||||
proofTimeSec := uint64(proofTime.Seconds())
|
||||
|
||||
// Ensure this roller is eligible to participate in the session.
|
||||
roller, ok := sess.info.Rollers[pk]
|
||||
if !ok {
|
||||
return fmt.Errorf("roller %s %s (%s) is not eligible to partake in proof session %v", roller.Name, sess.info.ProveType, roller.PublicKey, msg.ID)
|
||||
}
|
||||
if roller.Status == types.RollerProofValid {
|
||||
if types.RollerProveStatus(tmpSessionInfo.ProvingStatus) == types.RollerProofValid {
|
||||
// In order to prevent DoS attacks, it is forbidden to repeatedly submit valid proofs.
|
||||
// TODO: Defend invalid proof resubmissions by one of the following two methods:
|
||||
// (i) slash the roller for each submission of invalid proof
|
||||
// (ii) set the maximum failure retry times
|
||||
log.Warn(
|
||||
"roller has already submitted valid proof in proof session",
|
||||
"roller name", roller.Name,
|
||||
"roller pk", roller.PublicKey,
|
||||
"prove type", sess.info.ProveType,
|
||||
"roller name", tmpSessionInfo.RollerName,
|
||||
"roller pk", tmpSessionInfo.RollerPublicKey,
|
||||
"prove type", tmpSessionInfo.ProveType,
|
||||
"proof id", msg.ID,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
log.Info(
|
||||
"handling zk proof",
|
||||
"proof id", msg.ID,
|
||||
"roller name", roller.Name,
|
||||
"roller pk", roller.PublicKey,
|
||||
"prove type", sess.info.ProveType,
|
||||
"proof time", proofTimeSec,
|
||||
)
|
||||
|
||||
log.Info("handling zk proof", "proof id", msg.ID, "roller name", tmpSessionInfo.RollerName, "roller pk",
|
||||
tmpSessionInfo.RollerPublicKey, "prove type", tmpSessionInfo.ProveType, "proof time", proofTimeSec)
|
||||
|
||||
defer func() {
|
||||
// TODO: maybe we should use db tx for the whole process?
|
||||
@@ -344,12 +346,12 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
|
||||
if msg.Status != message.StatusOk {
|
||||
coordinatorProofsGeneratedFailedTimeTimer.Update(proofTime)
|
||||
m.updateMetricRollerProofsGeneratedFailedTimeTimer(roller.PublicKey, proofTime)
|
||||
m.updateMetricRollerProofsGeneratedFailedTimeTimer(tmpSessionInfo.RollerPublicKey, proofTime)
|
||||
log.Info(
|
||||
"proof generated by roller failed",
|
||||
"proof id", msg.ID,
|
||||
"roller name", roller.Name,
|
||||
"roller pk", roller.PublicKey,
|
||||
"roller name", tmpSessionInfo.RollerName,
|
||||
"roller pk", tmpSessionInfo.RollerPublicKey,
|
||||
"prove type", msg.Type,
|
||||
"proof time", proofTimeSec,
|
||||
"error", msg.Error,
|
||||
@@ -383,8 +385,8 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
if verifyErr != nil {
|
||||
// TODO: this is only a temp workaround for testnet, we should return err in real cases
|
||||
success = false
|
||||
log.Error("Failed to verify zk proof", "proof id", msg.ID, "roller name", roller.Name,
|
||||
"roller pk", roller.PublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", verifyErr)
|
||||
log.Error("Failed to verify zk proof", "proof id", msg.ID, "roller name", tmpSessionInfo.RollerName,
|
||||
"roller pk", tmpSessionInfo.RollerPublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", verifyErr)
|
||||
// TODO: Roller needs to be slashed if proof is invalid.
|
||||
}
|
||||
|
||||
@@ -411,18 +413,32 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
}
|
||||
|
||||
coordinatorProofsVerifiedSuccessTimeTimer.Update(proofTime)
|
||||
m.updateMetricRollerProofsVerifiedSuccessTimeTimer(roller.PublicKey, proofTime)
|
||||
log.Info("proof verified by coordinator success", "proof id", msg.ID, "roller name", roller.Name,
|
||||
"roller pk", roller.PublicKey, "prove type", msg.Type, "proof time", proofTimeSec)
|
||||
m.updateMetricRollerProofsVerifiedSuccessTimeTimer(tmpSessionInfo.RollerPublicKey, proofTime)
|
||||
log.Info("proof verified by coordinator success", "proof id", msg.ID, "roller name", tmpSessionInfo.RollerName,
|
||||
"roller pk", tmpSessionInfo.RollerPublicKey, "prove type", msg.Type, "proof time", proofTimeSec)
|
||||
} else {
|
||||
coordinatorProofsVerifiedFailedTimeTimer.Update(proofTime)
|
||||
m.updateMetricRollerProofsVerifiedFailedTimeTimer(roller.PublicKey, proofTime)
|
||||
log.Info("proof verified by coordinator failed", "proof id", msg.ID, "roller name", roller.Name,
|
||||
"roller pk", roller.PublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", verifyErr)
|
||||
m.updateMetricRollerProofsVerifiedFailedTimeTimer(tmpSessionInfo.RollerPublicKey, proofTime)
|
||||
log.Info("proof verified by coordinator failed", "proof id", msg.ID, "roller name", tmpSessionInfo.RollerName,
|
||||
"roller pk", tmpSessionInfo.RollerPublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", verifyErr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkAttempts use the count of session info to check the attempts
|
||||
func (m *Manager) checkAttemptsExceeded(hash string) bool {
|
||||
sessionInfos, err := m.orm.GetSessionInfosByHashes([]string{hash})
|
||||
if err != nil {
|
||||
log.Error("get session info error", "hash id", hash, "error", err)
|
||||
return true
|
||||
}
|
||||
|
||||
if len(sessionInfos) >= int(m.cfg.SessionAttempts) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// CollectProofs collects proofs corresponding to a proof generation session.
|
||||
func (m *Manager) CollectProofs(sess *session) {
|
||||
coordinatorSessionsActiveNumberGauge.Inc(1)
|
||||
@@ -432,48 +448,47 @@ func (m *Manager) CollectProofs(sess *session) {
|
||||
select {
|
||||
//Execute after timeout, set in config.json. Consider all rollers failed.
|
||||
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
|
||||
// Check if session can be replayed
|
||||
if sess.info.Attempts < m.cfg.SessionAttempts {
|
||||
if !m.checkAttemptsExceeded(sess.taskID) {
|
||||
var success bool
|
||||
if sess.info.ProveType == message.AggregatorProve {
|
||||
if message.ProveType(sess.sessionInfos[0].ProveType) == message.AggregatorProve {
|
||||
success = m.StartAggProofGenerationSession(nil, sess)
|
||||
} else if sess.info.ProveType == message.BasicProve {
|
||||
} else if message.ProveType(sess.sessionInfos[0].ProveType) == message.BasicProve {
|
||||
success = m.StartBasicProofGenerationSession(nil, sess)
|
||||
}
|
||||
if success {
|
||||
m.mu.Lock()
|
||||
for pk := range sess.info.Rollers {
|
||||
m.freeTaskIDForRoller(pk, sess.info.ID)
|
||||
for _, v := range sess.sessionInfos {
|
||||
m.freeTaskIDForRoller(v.RollerPublicKey, v.TaskID)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
log.Info("Retrying session", "session id:", sess.info.ID)
|
||||
log.Info("Retrying session", "session id:", sess.taskID)
|
||||
return
|
||||
}
|
||||
}
|
||||
// record failed session.
|
||||
errMsg := "proof generation session ended without receiving any valid proofs"
|
||||
m.addFailedSession(sess, errMsg)
|
||||
log.Warn(errMsg, "session id", sess.info.ID)
|
||||
log.Warn(errMsg, "session id", sess.taskID)
|
||||
// Set status as skipped.
|
||||
// Note that this is only a workaround for testnet here.
|
||||
// TODO: In real cases we should reset to orm.ProvingTaskUnassigned
|
||||
// so as to re-distribute the task in the future
|
||||
if sess.info.ProveType == message.BasicProve {
|
||||
if err := m.orm.UpdateProvingStatus(sess.info.ID, types.ProvingTaskFailed); err != nil {
|
||||
log.Error("fail to reset basic task_status as Unassigned", "id", sess.info.ID, "err", err)
|
||||
if message.ProveType(sess.sessionInfos[0].ProveType) == message.BasicProve {
|
||||
if err := m.orm.UpdateProvingStatus(sess.taskID, types.ProvingTaskFailed); err != nil {
|
||||
log.Error("fail to reset basic task_status as Unassigned", "id", sess.taskID, "err", err)
|
||||
}
|
||||
}
|
||||
if sess.info.ProveType == message.AggregatorProve {
|
||||
if err := m.orm.UpdateAggTaskStatus(sess.info.ID, types.ProvingTaskFailed); err != nil {
|
||||
log.Error("fail to reset aggregator task_status as Unassigned", "id", sess.info.ID, "err", err)
|
||||
if message.ProveType(sess.sessionInfos[0].ProveType) == message.AggregatorProve {
|
||||
if err := m.orm.UpdateAggTaskStatus(sess.taskID, types.ProvingTaskFailed); err != nil {
|
||||
log.Error("fail to reset aggregator task_status as Unassigned", "id", sess.taskID, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
for pk := range sess.info.Rollers {
|
||||
m.freeTaskIDForRoller(pk, sess.info.ID)
|
||||
for _, v := range sess.sessionInfos {
|
||||
m.freeTaskIDForRoller(v.RollerPublicKey, v.TaskID)
|
||||
}
|
||||
delete(m.sessions, sess.info.ID)
|
||||
delete(m.sessions, sess.taskID)
|
||||
m.mu.Unlock()
|
||||
coordinatorSessionsTimeoutTotalCounter.Inc(1)
|
||||
return
|
||||
@@ -481,7 +496,12 @@ func (m *Manager) CollectProofs(sess *session) {
|
||||
//Execute after one of the roller finishes sending proof, return early if all rollers had sent results.
|
||||
case ret := <-sess.finishChan:
|
||||
m.mu.Lock()
|
||||
sess.info.Rollers[ret.pk].Status = ret.status
|
||||
for idx := range sess.sessionInfos {
|
||||
if sess.sessionInfos[idx].RollerPublicKey == ret.pk {
|
||||
sess.sessionInfos[idx].ProvingStatus = int16(ret.status)
|
||||
}
|
||||
}
|
||||
|
||||
if sess.isSessionFailed() {
|
||||
if ret.typ == message.BasicProve {
|
||||
if err := m.orm.UpdateProvingStatus(ret.id, types.ProvingTaskFailed); err != nil {
|
||||
@@ -493,12 +513,13 @@ func (m *Manager) CollectProofs(sess *session) {
|
||||
log.Error("failed to update aggregator proving_status as failed", "msg.ID", ret.id, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
coordinatorSessionsFailedTotalCounter.Inc(1)
|
||||
}
|
||||
if err := m.orm.SetSessionInfo(sess.info); err != nil {
|
||||
|
||||
if err := m.orm.UpdateSessionInfoProvingStatus(m.ctx, ret.typ, ret.id, ret.pk, ret.status); err != nil {
|
||||
log.Error("db set session info fail", "pk", ret.pk, "error", err)
|
||||
}
|
||||
|
||||
//Check if all rollers have finished their tasks, and rollers with valid results are indexed by public key.
|
||||
finished, validRollers := sess.isRollersFinished()
|
||||
|
||||
@@ -508,11 +529,10 @@ func (m *Manager) CollectProofs(sess *session) {
|
||||
randIndex := rand.Int63n(int64(len(validRollers)))
|
||||
_ = validRollers[randIndex]
|
||||
// TODO: reward winner
|
||||
|
||||
for pk := range sess.info.Rollers {
|
||||
m.freeTaskIDForRoller(pk, sess.info.ID)
|
||||
for _, sessionInfo := range sess.sessionInfos {
|
||||
m.freeTaskIDForRoller(sessionInfo.RollerPublicKey, sessionInfo.TaskID)
|
||||
delete(m.sessions, sessionInfo.TaskID)
|
||||
}
|
||||
delete(m.sessions, sess.info.ID)
|
||||
m.mu.Unlock()
|
||||
|
||||
coordinatorSessionsSuccessTotalCounter.Inc(1)
|
||||
@@ -528,14 +548,16 @@ func (m *Manager) CollectProofs(sess *session) {
|
||||
// validRollers also records the public keys of rollers who have finished their tasks correctly as index.
|
||||
func (s *session) isRollersFinished() (bool, []string) {
|
||||
var validRollers []string
|
||||
for pk, roller := range s.info.Rollers {
|
||||
if roller.Status == types.RollerProofValid {
|
||||
validRollers = append(validRollers, pk)
|
||||
for _, sessionInfo := range s.sessionInfos {
|
||||
if types.RollerProveStatus(sessionInfo.ProvingStatus) == types.RollerProofValid {
|
||||
validRollers = append(validRollers, sessionInfo.RollerPublicKey)
|
||||
continue
|
||||
}
|
||||
if roller.Status == types.RollerProofInvalid {
|
||||
|
||||
if types.RollerProveStatus(sessionInfo.ProvingStatus) == types.RollerProofInvalid {
|
||||
continue
|
||||
}
|
||||
|
||||
// Some rollers are still proving.
|
||||
return false, nil
|
||||
}
|
||||
@@ -543,8 +565,8 @@ func (s *session) isRollersFinished() (bool, []string) {
|
||||
}
|
||||
|
||||
func (s *session) isSessionFailed() bool {
|
||||
for _, roller := range s.info.Rollers {
|
||||
if roller.Status != types.RollerProofInvalid {
|
||||
for _, sessionInfo := range s.sessionInfos {
|
||||
if types.RollerProveStatus(sessionInfo.ProvingStatus) != types.RollerProofInvalid {
|
||||
return false
|
||||
}
|
||||
}
|
||||
@@ -573,7 +595,7 @@ func (m *Manager) StartBasicProofGenerationSession(task *types.BlockBatch, prevS
|
||||
if task != nil {
|
||||
taskID = task.Hash
|
||||
} else {
|
||||
taskID = prevSession.info.ID
|
||||
taskID = prevSession.taskID
|
||||
}
|
||||
if m.GetNumberOfIdleRollers(message.BasicProve) == 0 {
|
||||
log.Warn("no idle basic roller when starting proof generation session", "id", taskID)
|
||||
@@ -612,7 +634,7 @@ func (m *Manager) StartBasicProofGenerationSession(task *types.BlockBatch, prevS
|
||||
}
|
||||
|
||||
// Dispatch task to basic rollers.
|
||||
rollers := make(map[string]*types.RollerStatus)
|
||||
var sessionInfos []*types.SessionInfo
|
||||
for i := 0; i < int(m.cfg.RollersPerSession); i++ {
|
||||
roller := m.selectRoller(message.BasicProve)
|
||||
if roller == nil {
|
||||
@@ -626,10 +648,27 @@ func (m *Manager) StartBasicProofGenerationSession(task *types.BlockBatch, prevS
|
||||
continue
|
||||
}
|
||||
m.updateMetricRollerProofsLastAssignedTimestampGauge(roller.PublicKey)
|
||||
rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned}
|
||||
now := time.Now()
|
||||
tmpSessionInfo := types.SessionInfo{
|
||||
TaskID: taskID,
|
||||
RollerPublicKey: roller.PublicKey,
|
||||
ProveType: int16(message.BasicProve),
|
||||
RollerName: roller.Name,
|
||||
CreatedAt: &now,
|
||||
ProvingStatus: int16(types.RollerAssigned),
|
||||
}
|
||||
// Store session info.
|
||||
if err = m.orm.SetSessionInfo(&tmpSessionInfo); err != nil {
|
||||
log.Error("db set session info fail", "session id", taskID, "error", err)
|
||||
return false
|
||||
}
|
||||
sessionInfos = append(sessionInfos, &tmpSessionInfo)
|
||||
log.Info("assigned proof to roller", "session id", taskID, "session type", message.BasicProve, "roller name", roller.Name,
|
||||
"roller pk", roller.PublicKey, "proof status", tmpSessionInfo.ProvingStatus)
|
||||
|
||||
}
|
||||
// No roller assigned.
|
||||
if len(rollers) == 0 {
|
||||
if len(sessionInfos) == 0 {
|
||||
log.Error("no roller assigned", "id", taskID, "number of idle basic rollers", m.GetNumberOfIdleRollers(message.BasicProve))
|
||||
return false
|
||||
}
|
||||
@@ -642,33 +681,9 @@ func (m *Manager) StartBasicProofGenerationSession(task *types.BlockBatch, prevS
|
||||
|
||||
// Create a proof generation session.
|
||||
sess := &session{
|
||||
info: &types.SessionInfo{
|
||||
ID: taskID,
|
||||
Rollers: rollers,
|
||||
ProveType: message.BasicProve,
|
||||
StartTimestamp: time.Now().Unix(),
|
||||
Attempts: 1,
|
||||
},
|
||||
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
|
||||
}
|
||||
if prevSession != nil {
|
||||
sess.info.Attempts += prevSession.info.Attempts
|
||||
}
|
||||
|
||||
for _, roller := range sess.info.Rollers {
|
||||
log.Info(
|
||||
"assigned proof to roller",
|
||||
"session id", sess.info.ID,
|
||||
"session type", sess.info.ProveType,
|
||||
"roller name", roller.Name,
|
||||
"roller pk", roller.PublicKey,
|
||||
"proof status", roller.Status)
|
||||
}
|
||||
|
||||
// Store session info.
|
||||
if err = m.orm.SetSessionInfo(sess.info); err != nil {
|
||||
log.Error("db set session info fail", "session id", sess.info.ID, "error", err)
|
||||
return false
|
||||
taskID: taskID,
|
||||
sessionInfos: sessionInfos,
|
||||
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
@@ -685,7 +700,7 @@ func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSessio
|
||||
if task != nil {
|
||||
taskID = task.ID
|
||||
} else {
|
||||
taskID = prevSession.info.ID
|
||||
taskID = prevSession.taskID
|
||||
}
|
||||
if m.GetNumberOfIdleRollers(message.AggregatorProve) == 0 {
|
||||
log.Warn("no idle common roller when starting proof generation session", "id", taskID)
|
||||
@@ -715,7 +730,7 @@ func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSessio
|
||||
}
|
||||
|
||||
// Dispatch task to basic rollers.
|
||||
rollers := make(map[string]*types.RollerStatus)
|
||||
var sessionInfos []*types.SessionInfo
|
||||
for i := 0; i < int(m.cfg.RollersPerSession); i++ {
|
||||
roller := m.selectRoller(message.AggregatorProve)
|
||||
if roller == nil {
|
||||
@@ -732,11 +747,29 @@ func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSessio
|
||||
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", taskID)
|
||||
continue
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
tmpSessionInfo := types.SessionInfo{
|
||||
TaskID: taskID,
|
||||
RollerPublicKey: roller.PublicKey,
|
||||
ProveType: int16(message.AggregatorProve),
|
||||
RollerName: roller.Name,
|
||||
CreatedAt: &now,
|
||||
ProvingStatus: int16(types.RollerAssigned),
|
||||
}
|
||||
// Store session info.
|
||||
if err = m.orm.SetSessionInfo(&tmpSessionInfo); err != nil {
|
||||
log.Error("db set session info fail", "session id", taskID, "error", err)
|
||||
return false
|
||||
}
|
||||
|
||||
m.updateMetricRollerProofsLastAssignedTimestampGauge(roller.PublicKey)
|
||||
rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned}
|
||||
sessionInfos = append(sessionInfos, &tmpSessionInfo)
|
||||
log.Info("assigned proof to roller", "session id", taskID, "session type", message.AggregatorProve, "roller name", roller.Name,
|
||||
"roller pk", roller.PublicKey, "proof status", tmpSessionInfo.ProvingStatus)
|
||||
}
|
||||
// No roller assigned.
|
||||
if len(rollers) == 0 {
|
||||
if len(sessionInfos) == 0 {
|
||||
log.Error("no roller assigned", "id", taskID, "number of idle aggregator rollers", m.GetNumberOfIdleRollers(message.AggregatorProve))
|
||||
return false
|
||||
}
|
||||
@@ -749,33 +782,9 @@ func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSessio
|
||||
|
||||
// Create a proof generation session.
|
||||
sess := &session{
|
||||
info: &types.SessionInfo{
|
||||
ID: taskID,
|
||||
Rollers: rollers,
|
||||
ProveType: message.AggregatorProve,
|
||||
StartTimestamp: time.Now().Unix(),
|
||||
Attempts: 1,
|
||||
},
|
||||
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
|
||||
}
|
||||
if prevSession != nil {
|
||||
sess.info.Attempts += prevSession.info.Attempts
|
||||
}
|
||||
|
||||
for _, roller := range sess.info.Rollers {
|
||||
log.Info(
|
||||
"assigned proof to roller",
|
||||
"session id", sess.info.ID,
|
||||
"session type", sess.info.ProveType,
|
||||
"roller name", roller.Name,
|
||||
"roller pk", roller.PublicKey,
|
||||
"proof status", roller.Status)
|
||||
}
|
||||
|
||||
// Store session info.
|
||||
if err = m.orm.SetSessionInfo(sess.info); err != nil {
|
||||
log.Error("db set session info fail", "session id", sess.info.ID, "error", err)
|
||||
return false
|
||||
taskID: taskID,
|
||||
sessionInfos: sessionInfos,
|
||||
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
@@ -789,7 +798,7 @@ func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSessio
|
||||
func (m *Manager) addFailedSession(sess *session, errMsg string) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.failedSessionInfos[sess.info.ID] = newSessionInfo(sess, types.ProvingTaskFailed, errMsg, true)
|
||||
m.failedSessionInfos[sess.taskID] = newSessionInfo(sess, types.ProvingTaskFailed, errMsg, true)
|
||||
}
|
||||
|
||||
// VerifyToken verifies pukey for token and expiration time
|
||||
|
||||
@@ -53,8 +53,8 @@ func (m *Manager) reloadRollerAssignedTasks(pubkey string) *cmap.ConcurrentMap {
|
||||
defer m.mu.RUnlock()
|
||||
taskIDs := cmap.New()
|
||||
for id, sess := range m.sessions {
|
||||
for pk, roller := range sess.info.Rollers {
|
||||
if pk == pubkey && roller.Status == types.RollerAssigned {
|
||||
for _, sessionInfo := range sess.sessionInfos {
|
||||
if sessionInfo.RollerPublicKey == pubkey && sessionInfo.ProvingStatus == int16(types.RollerAssigned) {
|
||||
taskIDs.Set(id, struct{}{})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,8 @@ func init() {
|
||||
app.Flags = append(app.Flags, utils.CommonFlags...)
|
||||
|
||||
app.Before = func(ctx *cli.Context) error {
|
||||
return utils.LogSetup(ctx)
|
||||
_, err := utils.LogSetup(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
app.Commands = []*cli.Command{
|
||||
|
||||
@@ -3,12 +3,21 @@
|
||||
|
||||
create table session_info
|
||||
(
|
||||
hash VARCHAR NOT NULL,
|
||||
rollers_info BYTEA NOT NULL
|
||||
);
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
task_id VARCHAR NOT NULL,
|
||||
roller_public_key VARCHAR NOT NULL,
|
||||
prove_type SMALLINT DEFAULT 0,
|
||||
roller_name VARCHAR NOT NULL,
|
||||
proving_status SMALLINT DEFAULT 1,
|
||||
failure_type SMALLINT DEFAULT 0,
|
||||
reward BIGINT DEFAULT 0,
|
||||
proof BYTEA DEFAULT NULL,
|
||||
created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
deleted_at TIMESTAMP(0) DEFAULT NULL,
|
||||
|
||||
create unique index session_info_hash_uindex
|
||||
on session_info (hash);
|
||||
CONSTRAINT uk_session_unique UNIQUE (task_id, roller_public_key)
|
||||
);
|
||||
|
||||
-- +goose StatementEnd
|
||||
|
||||
|
||||
@@ -44,6 +44,7 @@ type BlockTraceOrm interface {
|
||||
type SessionInfoOrm interface {
|
||||
GetSessionInfosByHashes(hashes []string) ([]*types.SessionInfo, error)
|
||||
SetSessionInfo(rollersInfo *types.SessionInfo) error
|
||||
UpdateSessionInfoProvingStatus(ctx context.Context, proveType message.ProveType, taskID string, pk string, status types.RollerProveStatus) error
|
||||
}
|
||||
|
||||
// AggTaskOrm is aggregator task
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
package orm
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"context"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"scroll-tech/common/types"
|
||||
"scroll-tech/common/types/message"
|
||||
)
|
||||
|
||||
type sessionInfoOrm struct {
|
||||
@@ -23,7 +24,7 @@ func (o *sessionInfoOrm) GetSessionInfosByHashes(hashes []string) ([]*types.Sess
|
||||
if len(hashes) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
query, args, err := sqlx.In("SELECT rollers_info FROM session_info WHERE hash IN (?);", hashes)
|
||||
query, args, err := sqlx.In("SELECT * FROM session_info WHERE task_id IN (?);", hashes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -35,15 +36,11 @@ func (o *sessionInfoOrm) GetSessionInfosByHashes(hashes []string) ([]*types.Sess
|
||||
|
||||
var sessionInfos []*types.SessionInfo
|
||||
for rows.Next() {
|
||||
var infoBytes []byte
|
||||
if err = rows.Scan(&infoBytes); err != nil {
|
||||
var sessionInfo types.SessionInfo
|
||||
if err = rows.StructScan(&sessionInfo); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sessionInfo := &types.SessionInfo{}
|
||||
if err = json.Unmarshal(infoBytes, sessionInfo); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sessionInfos = append(sessionInfos, sessionInfo)
|
||||
sessionInfos = append(sessionInfos, &sessionInfo)
|
||||
}
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, err
|
||||
@@ -53,11 +50,16 @@ func (o *sessionInfoOrm) GetSessionInfosByHashes(hashes []string) ([]*types.Sess
|
||||
}
|
||||
|
||||
func (o *sessionInfoOrm) SetSessionInfo(rollersInfo *types.SessionInfo) error {
|
||||
infoBytes, err := json.Marshal(rollersInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sqlStr := "INSERT INTO session_info (hash, rollers_info) VALUES ($1, $2) ON CONFLICT (hash) DO UPDATE SET rollers_info = EXCLUDED.rollers_info;"
|
||||
_, err = o.db.Exec(sqlStr, rollersInfo.ID, infoBytes)
|
||||
sqlStr := "INSERT INTO session_info (task_id, roller_public_key, prove_type, roller_name, proving_status, failure_type, reward, proof, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (task_id, roller_public_key) DO UPDATE SET proving_status = EXCLUDED.proving_status;"
|
||||
_, err := o.db.Exec(sqlStr, rollersInfo.TaskID, rollersInfo.RollerPublicKey, rollersInfo.ProveType, rollersInfo.RollerName,
|
||||
rollersInfo.ProvingStatus, rollersInfo.FailureType, rollersInfo.Reward, rollersInfo.Proof, rollersInfo.CreatedAt)
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateSessionInfoProvingStatus update the session info proving status
|
||||
func (o *sessionInfoOrm) UpdateSessionInfoProvingStatus(ctx context.Context, proveType message.ProveType, taskID string, pk string, status types.RollerProveStatus) error {
|
||||
if _, err := o.db.ExecContext(ctx, o.db.Rebind("update session_info set proving_status = ? where prove_type = ? and task_id = ? and roller_public_key = ? ;"), int(proveType), int(status), taskID, pk); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -410,31 +410,29 @@ func testOrmSessionInfo(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, len(sessionInfos))
|
||||
|
||||
now := time.Now()
|
||||
sessionInfo := types.SessionInfo{
|
||||
ID: batchHash,
|
||||
Rollers: map[string]*types.RollerStatus{
|
||||
"0": {
|
||||
PublicKey: "0",
|
||||
Name: "roller-0",
|
||||
Status: types.RollerAssigned,
|
||||
},
|
||||
},
|
||||
StartTimestamp: time.Now().Unix()}
|
||||
TaskID: batchHash,
|
||||
RollerName: "roller-0",
|
||||
RollerPublicKey: "0",
|
||||
ProvingStatus: int16(types.RollerAssigned),
|
||||
CreatedAt: &now,
|
||||
}
|
||||
|
||||
// insert
|
||||
assert.NoError(t, ormSession.SetSessionInfo(&sessionInfo))
|
||||
sessionInfos, err = ormSession.GetSessionInfosByHashes(hashes)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, len(sessionInfos))
|
||||
assert.Equal(t, sessionInfo, *sessionInfos[0])
|
||||
assert.Equal(t, sessionInfo.RollerName, sessionInfos[0].RollerName)
|
||||
|
||||
// update
|
||||
sessionInfo.Rollers["0"].Status = types.RollerProofValid
|
||||
sessionInfo.ProvingStatus = int16(types.RollerProofValid)
|
||||
assert.NoError(t, ormSession.SetSessionInfo(&sessionInfo))
|
||||
sessionInfos, err = ormSession.GetSessionInfosByHashes(hashes)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, len(sessionInfos))
|
||||
assert.Equal(t, sessionInfo, *sessionInfos[0])
|
||||
assert.Equal(t, sessionInfo.ProvingStatus, sessionInfos[0].ProvingStatus)
|
||||
|
||||
// delete
|
||||
assert.NoError(t, ormBatch.UpdateProvingStatus(batchHash, types.ProvingTaskVerified))
|
||||
|
||||
2
l2geth
2
l2geth
Submodule l2geth updated: 983d630244...127af384ed
@@ -26,7 +26,8 @@ func init() {
|
||||
app.Version = version.Version
|
||||
app.Flags = append(app.Flags, utils.CommonFlags...)
|
||||
app.Before = func(ctx *cli.Context) error {
|
||||
return utils.LogSetup(ctx)
|
||||
_, err := utils.LogSetup(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
// Register `roller-test` app for integration-test.
|
||||
|
||||
Reference in New Issue
Block a user