Compare commits

..

12 Commits

Author SHA1 Message Date
zimpha
45326ddd4a add forceBatchFinalization 2023-02-01 03:01:59 +08:00
Péter Garamvölgyi
16576b6f53 fix(bridge): Handle duplicate messages (#261) 2023-01-20 12:09:47 +01:00
HAOYUatHZ
aa885f068f docs(db): fix sql comments (#260) 2023-01-20 09:44:19 +08:00
HAOYUatHZ
1f764a579d refactor(bridge): refactor layer2MessageOrm.GetL2Messages() (#243) 2023-01-20 09:29:34 +08:00
HAOYUatHZ
91ee767669 doc(db): update sql comment (#259) 2023-01-20 09:27:50 +08:00
Péter Garamvölgyi
7eac41691e feat(bridge): handle expired messages correctly (#257) 2023-01-19 23:53:34 +01:00
Péter Garamvölgyi
d9516890b0 feat(bridge): handle expired messages (#256) 2023-01-19 23:21:17 +01:00
Péter Garamvölgyi
ddb96bb732 feat(bridge): add more l1 relayer logs (#255) 2023-01-19 22:37:02 +01:00
Péter Garamvölgyi
e419dd8d5c fix(bridge): add limit to GetL1MessagesByStatus (#254) 2023-01-19 22:19:48 +01:00
Péter Garamvölgyi
c99c65bdfd fix(bridge): execute watcher loops independently (#253) 2023-01-19 21:14:45 +01:00
colin
18fd7f56a8 fix(coordinator): reset roller state when proof session timeout (#210)
Co-authored-by: colinlyguo <colinlyguo@gmail.com>
Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
2023-01-19 21:37:30 +08:00
Péter Garamvölgyi
a319dc1cff bugfix(bridge): only relay messages for finalized batches (#251)
Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>
Co-authored-by: colinlyguo <colinlyguo@gmail.com>
2023-01-19 20:54:09 +08:00
15 changed files with 191 additions and 55 deletions

View File

@@ -72,15 +72,20 @@ func NewLayer1Relayer(ctx context.Context, ethClient *ethclient.Client, l1Confir
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
func (r *Layer1Relayer) ProcessSavedEvents() {
// msgs are sorted by nonce in increasing order
msgs, err := r.db.GetL1MessagesByStatus(orm.MsgPending)
msgs, err := r.db.GetL1MessagesByStatus(orm.MsgPending, 100)
if err != nil {
log.Error("Failed to fetch unprocessed L1 messages", "err", err)
return
}
if len(msgs) > 0 {
log.Info("Processing L1 messages", "count", len(msgs))
}
for _, msg := range msgs {
if err = r.processSavedEvent(msg); err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("failed to process event", "err", err)
log.Error("failed to process event", "msg.msgHash", msg.MsgHash, "err", err)
}
return
}
@@ -109,6 +114,12 @@ func (r *Layer1Relayer) processSavedEvent(msg *orm.L1Message) error {
}
hash, err := r.sender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
if err != nil && err.Error() == "execution reverted: Message expired" {
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, orm.MsgExpired)
}
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, orm.MsgConfirmed)
}
if err != nil {
return err
}

View File

@@ -40,8 +40,7 @@ func newBatchProposer(cfg *config.BatchProposerConfig, orm database.OrmFactory)
}
}
func (w *batchProposer) tryProposeBatch(wg *sync.WaitGroup) {
defer wg.Done()
func (w *batchProposer) tryProposeBatch() {
w.mutex.Lock()
defer w.mutex.Unlock()

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"math/big"
"os"
"sync"
"testing"
"github.com/scroll-tech/go-ethereum/core/types"
@@ -50,10 +49,7 @@ func testBatchProposer(t *testing.T) {
BatchTimeSec: 1,
BatchBlocksLimit: 100,
}, db)
var wg sync.WaitGroup
wg.Add(1)
proposer.tryProposeBatch(&wg)
wg.Wait()
proposer.tryProposeBatch()
infos, err := db.GetUnbatchedBlocks(map[string]interface{}{},
fmt.Sprintf("order by number ASC LIMIT %d", 100))

View File

@@ -107,7 +107,8 @@ func (r *Layer2Relayer) ProcessSavedEvents(wg *sync.WaitGroup) {
// msgs are sorted by nonce in increasing order
msgs, err := r.db.GetL2Messages(
map[string]interface{}{"status": orm.MsgPending},
fmt.Sprintf("AND height<=%d ORDER BY nonce ASC LIMIT %d", batch.EndBlockNumber, processMsgLimit),
fmt.Sprintf("AND height<=%d", batch.EndBlockNumber),
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
)
if err != nil {
@@ -166,6 +167,12 @@ func (r *Layer2Relayer) processSavedEvent(msg *orm.L2Message, index uint64) erro
}
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
if err != nil && err.Error() == "execution reverted: Message expired" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgExpired)
}
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgConfirmed)
}
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send relayMessageWithProof tx to layer1 ", "msg.height", msg.Height, "msg.MsgHash", msg.MsgHash, "err", err)
@@ -284,9 +291,11 @@ func (r *Layer2Relayer) ProcessCommittedBatches(wg *sync.WaitGroup) {
defer wg.Done()
// set skipped batches in a single db operation
if err := r.db.UpdateSkippedBatches(); err != nil {
if count, err := r.db.UpdateSkippedBatches(); err != nil {
log.Error("UpdateSkippedBatches failed", "err", err)
// continue anyway
} else if count > 0 {
log.Info("Skipping batches", "count", count)
}
// batches are sorted by batch index in increasing order

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"math/big"
"reflect"
"sync"
"time"
geth "github.com/scroll-tech/go-ethereum"
@@ -82,36 +81,84 @@ func (w *WatcherClient) Start() {
panic("must run L2 watcher with DB")
}
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
ctx, cancel := context.WithCancel(w.ctx)
for ; true; <-ticker.C {
select {
case <-w.stopCh:
return
// trace fetcher loop
go func(ctx context.Context) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
default:
// get current height
number, err := w.BlockNumber(w.ctx)
if err != nil {
log.Error("failed to get_BlockNumber", "err", err)
continue
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// get current height
number, err := w.BlockNumber(ctx)
if err != nil {
log.Error("failed to get_BlockNumber", "err", err)
continue
}
if number >= w.confirmations {
number = number - w.confirmations
} else {
number = 0
}
w.tryFetchRunningMissingBlocks(ctx, number)
}
if number >= w.confirmations {
number = number - w.confirmations
} else {
number = 0
}
var wg sync.WaitGroup
wg.Add(3)
go w.tryFetchRunningMissingBlocks(w.ctx, &wg, number)
go w.FetchContractEvent(&wg, number)
go w.batchProposer.tryProposeBatch(&wg)
wg.Wait()
}
}
}(ctx)
// event fetcher loop
go func(ctx context.Context) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// get current height
number, err := w.BlockNumber(ctx)
if err != nil {
log.Error("failed to get_BlockNumber", "err", err)
continue
}
if number >= w.confirmations {
number = number - w.confirmations
} else {
number = 0
}
w.FetchContractEvent(number)
}
}
}(ctx)
// batch proposer loop
go func(ctx context.Context) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
w.batchProposer.tryProposeBatch()
}
}
}(ctx)
<-w.stopCh
cancel()
}()
}
@@ -123,8 +170,7 @@ func (w *WatcherClient) Stop() {
const blockTracesFetchLimit = uint64(10)
// try fetch missing blocks if inconsistent
func (w *WatcherClient) tryFetchRunningMissingBlocks(ctx context.Context, wg *sync.WaitGroup, blockHeight uint64) {
defer wg.Done()
func (w *WatcherClient) tryFetchRunningMissingBlocks(ctx context.Context, blockHeight uint64) {
// Get newest block in DB. must have blocks at that time.
// Don't use "block_trace" table "trace" column's BlockTrace.Number,
// because it might be empty if the corresponding rollup_result is finalized/finalization_skipped
@@ -181,8 +227,7 @@ func (w *WatcherClient) getAndStoreBlockTraces(ctx context.Context, from, to uin
const contractEventsBlocksFetchLimit = int64(10)
// FetchContractEvent pull latest event logs from given contract address and save in DB
func (w *WatcherClient) FetchContractEvent(wg *sync.WaitGroup, blockHeight uint64) {
defer wg.Done()
func (w *WatcherClient) FetchContractEvent(blockHeight uint64) {
defer func() {
log.Info("l2 watcher fetchContractEvent", "w.processedMsgHeight", w.processedMsgHeight)
}()

View File

@@ -26,7 +26,7 @@ func testRelayL2MessageSucceed(t *testing.T) {
defer db.Close()
var wg sync.WaitGroup
wg.Add(4)
wg.Add(3)
prepareContracts(t)
@@ -55,7 +55,7 @@ func testRelayL2MessageSucceed(t *testing.T) {
}
// l2 watch process events
l2Watcher.FetchContractEvent(&wg, sendReceipt.BlockNumber.Uint64())
l2Watcher.FetchContractEvent(sendReceipt.BlockNumber.Uint64())
// check db status
msg, err := db.GetL2MessageByNonce(nonce.Uint64())

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "prealpha-v11.6"
var tag = "prealpha-v11.14"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -300,6 +300,20 @@ contract ZKRollup is OwnableUpgradeable, IZKRollup {
/**************************************** Restricted Functions ****************************************/
/// @notice Explicit finalize batch by owner.
/// @dev This is temporary workaround to skip batch proving in some situations.
/// And it will be removed on mainnet launch.
/// @param _batchId The id of batch to finalize.
function forceBatchFinalization(bytes32 _batchId) external onlyOwner {
Layer2BatchStored storage _batch = batches[_batchId];
require(_batch.batchHash != bytes32(0), "No such batch");
require(!_batch.verified, "Batch already verified");
_batch.verified = true;
emit FinalizeBatch(_batchId, _batch.batchHash, _batch.batchIndex, _batch.parentHash);
}
/// @notice Update the address of operator.
/// @dev This function can only called by contract owner.
/// @param _newOperator The new operator address to update.

View File

@@ -255,4 +255,50 @@ contract ZKRollupTest is DSTestPlus {
assertEq(rollup.finalizedBatches(1), bytes32(0));
}
}
function testForceBatchFinalization() public {
// not owner
hevm.startPrank(address(1));
hevm.expectRevert("Ownable: caller is not the owner");
rollup.forceBatchFinalization(bytes32(0));
hevm.stopPrank();
IZKRollup.Layer2BlockHeader memory _header;
// import fake genesis
_header.blockHash = bytes32(uint256(1));
rollup.importGenesisBlock(_header);
// No such batch
hevm.expectRevert("No such batch");
rollup.forceBatchFinalization(bytes32(0));
// Batch already verified
bytes32 _batchId = keccak256(abi.encode(_header.blockHash, bytes32(0), 0));
hevm.expectRevert("Batch already verified");
rollup.forceBatchFinalization(_batchId);
// import fake batch
_header.blockHeight = 1;
_header.parentHash = bytes32(uint256(1));
_header.blockHash = bytes32(uint256(2));
IZKRollup.Layer2Batch memory _batch;
_batch.blocks = new IZKRollup.Layer2BlockHeader[](1);
_batch.blocks[0] = _header;
_batch.batchIndex = 1;
_batch.parentHash = _header.parentHash;
rollup.updateOperator(address(1));
hevm.startPrank(address(1));
rollup.commitBatch(_batch);
hevm.stopPrank();
// force finalize
_batchId = keccak256(abi.encode(_header.blockHash, _header.parentHash, 1));
bool verified;
(, , , verified) = rollup.batches(_batchId);
assertBoolEq(verified, false);
rollup.forceBatchFinalization(_batchId);
(, , , verified) = rollup.batches(_batchId);
assertBoolEq(verified, true);
}
}

View File

@@ -330,6 +330,10 @@ func (m *Manager) CollectProofs(sess *session) {
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
m.mu.Lock()
defer func() {
// TODO: remove the clean-up, rollers report healthy status.
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
}()

View File

@@ -20,7 +20,7 @@ create table l1_message
);
comment
on column l1_message.status is 'undefined, pending, submitted, confirmed';
on column l1_message.status is 'undefined, pending, submitted, confirmed, failed, expired';
create unique index l1_message_hash_uindex
on l1_message (msg_hash);

View File

@@ -21,7 +21,7 @@ create table l2_message
);
comment
on column l2_message.status is 'undefined, pending, submitted, confirmed';
on column l2_message.status is 'undefined, pending, submitted, confirmed, failed, expired';
create unique index l2_message_hash_uindex
on l2_message (msg_hash);

View File

@@ -260,7 +260,7 @@ func (o *blockBatchOrm) GetPendingBatches(limit uint64) ([]string, error) {
}
func (o *blockBatchOrm) GetLatestFinalizedBatch() (*BlockBatch, error) {
row := o.db.QueryRowx(`SELECT * FROM block_batch WHERE rollup_status = $1 OR rollup_status = $2 ORDER BY index DESC;`, RollupFinalized, RollupFinalizationSkipped)
row := o.db.QueryRowx(`select * from block_batch where index = (select max(index) from block_batch where rollup_status = $1);`, RollupFinalized)
batch := &BlockBatch{}
if err := row.StructScan(batch); err != nil {
return nil, err
@@ -407,7 +407,16 @@ func (o *blockBatchOrm) GetAssignedBatchIDs() ([]string, error) {
return ids, rows.Close()
}
func (o *blockBatchOrm) UpdateSkippedBatches() error {
_, err := o.db.Exec(o.db.Rebind("update block_batch set rollup_status = ? where (proving_status = ? or proving_status = ?) and rollup_status = ?;"), RollupFinalizationSkipped, ProvingTaskSkipped, ProvingTaskFailed, RollupCommitted)
return err
func (o *blockBatchOrm) UpdateSkippedBatches() (int64, error) {
res, err := o.db.Exec(o.db.Rebind("update block_batch set rollup_status = ? where (proving_status = ? or proving_status = ?) and rollup_status = ?;"), RollupFinalizationSkipped, ProvingTaskSkipped, ProvingTaskFailed, RollupCommitted)
if err != nil {
return 0, err
}
count, err := res.RowsAffected()
if err != nil {
return 0, err
}
return count, nil
}

View File

@@ -28,6 +28,9 @@ const (
// MsgFailed represents the from_layer message status is failed
MsgFailed
// MsgExpired represents the from_layer message status is expired
MsgExpired
)
// L1Message is structure of stored layer1 bridge message
@@ -151,7 +154,7 @@ type BlockBatchOrm interface {
UpdateCommitTxHashAndRollupStatus(ctx context.Context, id string, commitTxHash string, status RollupStatus) error
UpdateFinalizeTxHashAndRollupStatus(ctx context.Context, id string, finalizeTxHash string, status RollupStatus) error
GetAssignedBatchIDs() ([]string, error)
UpdateSkippedBatches() error
UpdateSkippedBatches() (int64, error)
GetCommitTxHash(id string) (sql.NullString, error) // for unit tests only
GetFinalizeTxHash(id string) (sql.NullString, error) // for unit tests only
@@ -161,7 +164,7 @@ type BlockBatchOrm interface {
type L1MessageOrm interface {
GetL1MessageByNonce(nonce uint64) (*L1Message, error)
GetL1MessageByMsgHash(msgHash string) (*L1Message, error)
GetL1MessagesByStatus(status MsgStatus) ([]*L1Message, error)
GetL1MessagesByStatus(status MsgStatus, limit uint64) ([]*L1Message, error)
GetL1ProcessedNonce() (int64, error)
SaveL1Messages(ctx context.Context, messages []*L1Message) error
UpdateLayer2Hash(ctx context.Context, msgHash string, layer2Hash string) error

View File

@@ -45,8 +45,8 @@ func (m *l1MessageOrm) GetL1MessageByNonce(nonce uint64) (*L1Message, error) {
}
// GetL1MessagesByStatus fetch list of unprocessed messages given msg status
func (m *l1MessageOrm) GetL1MessagesByStatus(status MsgStatus) ([]*L1Message, error) {
rows, err := m.db.Queryx(`SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, status FROM l1_message WHERE status = $1 ORDER BY nonce ASC;`, status)
func (m *l1MessageOrm) GetL1MessagesByStatus(status MsgStatus, limit uint64) ([]*L1Message, error) {
rows, err := m.db.Queryx(`SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, status FROM l1_message WHERE status = $1 ORDER BY nonce ASC LIMIT $2;`, status, limit)
if err != nil {
return nil, err
}