mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-04-23 03:00:50 -04:00
Compare commits
12 Commits
prealpha-v
...
contract/e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
45326ddd4a | ||
|
|
16576b6f53 | ||
|
|
aa885f068f | ||
|
|
1f764a579d | ||
|
|
91ee767669 | ||
|
|
7eac41691e | ||
|
|
d9516890b0 | ||
|
|
ddb96bb732 | ||
|
|
e419dd8d5c | ||
|
|
c99c65bdfd | ||
|
|
18fd7f56a8 | ||
|
|
a319dc1cff |
@@ -72,15 +72,20 @@ func NewLayer1Relayer(ctx context.Context, ethClient *ethclient.Client, l1Confir
|
||||
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
|
||||
func (r *Layer1Relayer) ProcessSavedEvents() {
|
||||
// msgs are sorted by nonce in increasing order
|
||||
msgs, err := r.db.GetL1MessagesByStatus(orm.MsgPending)
|
||||
msgs, err := r.db.GetL1MessagesByStatus(orm.MsgPending, 100)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch unprocessed L1 messages", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(msgs) > 0 {
|
||||
log.Info("Processing L1 messages", "count", len(msgs))
|
||||
}
|
||||
|
||||
for _, msg := range msgs {
|
||||
if err = r.processSavedEvent(msg); err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("failed to process event", "err", err)
|
||||
log.Error("failed to process event", "msg.msgHash", msg.MsgHash, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -109,6 +114,12 @@ func (r *Layer1Relayer) processSavedEvent(msg *orm.L1Message) error {
|
||||
}
|
||||
|
||||
hash, err := r.sender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
|
||||
if err != nil && err.Error() == "execution reverted: Message expired" {
|
||||
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, orm.MsgExpired)
|
||||
}
|
||||
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
|
||||
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, orm.MsgConfirmed)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -40,8 +40,7 @@ func newBatchProposer(cfg *config.BatchProposerConfig, orm database.OrmFactory)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *batchProposer) tryProposeBatch(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
func (w *batchProposer) tryProposeBatch() {
|
||||
w.mutex.Lock()
|
||||
defer w.mutex.Unlock()
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"math/big"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/core/types"
|
||||
@@ -50,10 +49,7 @@ func testBatchProposer(t *testing.T) {
|
||||
BatchTimeSec: 1,
|
||||
BatchBlocksLimit: 100,
|
||||
}, db)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
proposer.tryProposeBatch(&wg)
|
||||
wg.Wait()
|
||||
proposer.tryProposeBatch()
|
||||
|
||||
infos, err := db.GetUnbatchedBlocks(map[string]interface{}{},
|
||||
fmt.Sprintf("order by number ASC LIMIT %d", 100))
|
||||
|
||||
@@ -107,7 +107,8 @@ func (r *Layer2Relayer) ProcessSavedEvents(wg *sync.WaitGroup) {
|
||||
// msgs are sorted by nonce in increasing order
|
||||
msgs, err := r.db.GetL2Messages(
|
||||
map[string]interface{}{"status": orm.MsgPending},
|
||||
fmt.Sprintf("AND height<=%d ORDER BY nonce ASC LIMIT %d", batch.EndBlockNumber, processMsgLimit),
|
||||
fmt.Sprintf("AND height<=%d", batch.EndBlockNumber),
|
||||
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -166,6 +167,12 @@ func (r *Layer2Relayer) processSavedEvent(msg *orm.L2Message, index uint64) erro
|
||||
}
|
||||
|
||||
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
|
||||
if err != nil && err.Error() == "execution reverted: Message expired" {
|
||||
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgExpired)
|
||||
}
|
||||
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
|
||||
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgConfirmed)
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send relayMessageWithProof tx to layer1 ", "msg.height", msg.Height, "msg.MsgHash", msg.MsgHash, "err", err)
|
||||
@@ -284,9 +291,11 @@ func (r *Layer2Relayer) ProcessCommittedBatches(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
// set skipped batches in a single db operation
|
||||
if err := r.db.UpdateSkippedBatches(); err != nil {
|
||||
if count, err := r.db.UpdateSkippedBatches(); err != nil {
|
||||
log.Error("UpdateSkippedBatches failed", "err", err)
|
||||
// continue anyway
|
||||
} else if count > 0 {
|
||||
log.Info("Skipping batches", "count", count)
|
||||
}
|
||||
|
||||
// batches are sorted by batch index in increasing order
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"math/big"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
geth "github.com/scroll-tech/go-ethereum"
|
||||
@@ -82,36 +81,84 @@ func (w *WatcherClient) Start() {
|
||||
panic("must run L2 watcher with DB")
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
ctx, cancel := context.WithCancel(w.ctx)
|
||||
|
||||
for ; true; <-ticker.C {
|
||||
select {
|
||||
case <-w.stopCh:
|
||||
return
|
||||
// trace fetcher loop
|
||||
go func(ctx context.Context) {
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
default:
|
||||
// get current height
|
||||
number, err := w.BlockNumber(w.ctx)
|
||||
if err != nil {
|
||||
log.Error("failed to get_BlockNumber", "err", err)
|
||||
continue
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
// get current height
|
||||
number, err := w.BlockNumber(ctx)
|
||||
if err != nil {
|
||||
log.Error("failed to get_BlockNumber", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if number >= w.confirmations {
|
||||
number = number - w.confirmations
|
||||
} else {
|
||||
number = 0
|
||||
}
|
||||
|
||||
w.tryFetchRunningMissingBlocks(ctx, number)
|
||||
}
|
||||
|
||||
if number >= w.confirmations {
|
||||
number = number - w.confirmations
|
||||
} else {
|
||||
number = 0
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(3)
|
||||
go w.tryFetchRunningMissingBlocks(w.ctx, &wg, number)
|
||||
go w.FetchContractEvent(&wg, number)
|
||||
go w.batchProposer.tryProposeBatch(&wg)
|
||||
wg.Wait()
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
|
||||
// event fetcher loop
|
||||
go func(ctx context.Context) {
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
// get current height
|
||||
number, err := w.BlockNumber(ctx)
|
||||
if err != nil {
|
||||
log.Error("failed to get_BlockNumber", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if number >= w.confirmations {
|
||||
number = number - w.confirmations
|
||||
} else {
|
||||
number = 0
|
||||
}
|
||||
|
||||
w.FetchContractEvent(number)
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
|
||||
// batch proposer loop
|
||||
go func(ctx context.Context) {
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
w.batchProposer.tryProposeBatch()
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
|
||||
<-w.stopCh
|
||||
cancel()
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -123,8 +170,7 @@ func (w *WatcherClient) Stop() {
|
||||
const blockTracesFetchLimit = uint64(10)
|
||||
|
||||
// try fetch missing blocks if inconsistent
|
||||
func (w *WatcherClient) tryFetchRunningMissingBlocks(ctx context.Context, wg *sync.WaitGroup, blockHeight uint64) {
|
||||
defer wg.Done()
|
||||
func (w *WatcherClient) tryFetchRunningMissingBlocks(ctx context.Context, blockHeight uint64) {
|
||||
// Get newest block in DB. must have blocks at that time.
|
||||
// Don't use "block_trace" table "trace" column's BlockTrace.Number,
|
||||
// because it might be empty if the corresponding rollup_result is finalized/finalization_skipped
|
||||
@@ -181,8 +227,7 @@ func (w *WatcherClient) getAndStoreBlockTraces(ctx context.Context, from, to uin
|
||||
const contractEventsBlocksFetchLimit = int64(10)
|
||||
|
||||
// FetchContractEvent pull latest event logs from given contract address and save in DB
|
||||
func (w *WatcherClient) FetchContractEvent(wg *sync.WaitGroup, blockHeight uint64) {
|
||||
defer wg.Done()
|
||||
func (w *WatcherClient) FetchContractEvent(blockHeight uint64) {
|
||||
defer func() {
|
||||
log.Info("l2 watcher fetchContractEvent", "w.processedMsgHeight", w.processedMsgHeight)
|
||||
}()
|
||||
|
||||
@@ -26,7 +26,7 @@ func testRelayL2MessageSucceed(t *testing.T) {
|
||||
defer db.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(4)
|
||||
wg.Add(3)
|
||||
|
||||
prepareContracts(t)
|
||||
|
||||
@@ -55,7 +55,7 @@ func testRelayL2MessageSucceed(t *testing.T) {
|
||||
}
|
||||
|
||||
// l2 watch process events
|
||||
l2Watcher.FetchContractEvent(&wg, sendReceipt.BlockNumber.Uint64())
|
||||
l2Watcher.FetchContractEvent(sendReceipt.BlockNumber.Uint64())
|
||||
|
||||
// check db status
|
||||
msg, err := db.GetL2MessageByNonce(nonce.Uint64())
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
var tag = "prealpha-v11.6"
|
||||
var tag = "prealpha-v11.14"
|
||||
|
||||
var commit = func() string {
|
||||
if info, ok := debug.ReadBuildInfo(); ok {
|
||||
|
||||
@@ -300,6 +300,20 @@ contract ZKRollup is OwnableUpgradeable, IZKRollup {
|
||||
|
||||
/**************************************** Restricted Functions ****************************************/
|
||||
|
||||
/// @notice Explicit finalize batch by owner.
|
||||
/// @dev This is temporary workaround to skip batch proving in some situations.
|
||||
/// And it will be removed on mainnet launch.
|
||||
/// @param _batchId The id of batch to finalize.
|
||||
function forceBatchFinalization(bytes32 _batchId) external onlyOwner {
|
||||
Layer2BatchStored storage _batch = batches[_batchId];
|
||||
require(_batch.batchHash != bytes32(0), "No such batch");
|
||||
require(!_batch.verified, "Batch already verified");
|
||||
|
||||
_batch.verified = true;
|
||||
|
||||
emit FinalizeBatch(_batchId, _batch.batchHash, _batch.batchIndex, _batch.parentHash);
|
||||
}
|
||||
|
||||
/// @notice Update the address of operator.
|
||||
/// @dev This function can only called by contract owner.
|
||||
/// @param _newOperator The new operator address to update.
|
||||
|
||||
@@ -255,4 +255,50 @@ contract ZKRollupTest is DSTestPlus {
|
||||
assertEq(rollup.finalizedBatches(1), bytes32(0));
|
||||
}
|
||||
}
|
||||
|
||||
function testForceBatchFinalization() public {
|
||||
// not owner
|
||||
hevm.startPrank(address(1));
|
||||
hevm.expectRevert("Ownable: caller is not the owner");
|
||||
rollup.forceBatchFinalization(bytes32(0));
|
||||
hevm.stopPrank();
|
||||
|
||||
IZKRollup.Layer2BlockHeader memory _header;
|
||||
|
||||
// import fake genesis
|
||||
_header.blockHash = bytes32(uint256(1));
|
||||
rollup.importGenesisBlock(_header);
|
||||
|
||||
// No such batch
|
||||
hevm.expectRevert("No such batch");
|
||||
rollup.forceBatchFinalization(bytes32(0));
|
||||
|
||||
// Batch already verified
|
||||
bytes32 _batchId = keccak256(abi.encode(_header.blockHash, bytes32(0), 0));
|
||||
hevm.expectRevert("Batch already verified");
|
||||
rollup.forceBatchFinalization(_batchId);
|
||||
|
||||
// import fake batch
|
||||
_header.blockHeight = 1;
|
||||
_header.parentHash = bytes32(uint256(1));
|
||||
_header.blockHash = bytes32(uint256(2));
|
||||
IZKRollup.Layer2Batch memory _batch;
|
||||
_batch.blocks = new IZKRollup.Layer2BlockHeader[](1);
|
||||
_batch.blocks[0] = _header;
|
||||
_batch.batchIndex = 1;
|
||||
_batch.parentHash = _header.parentHash;
|
||||
rollup.updateOperator(address(1));
|
||||
hevm.startPrank(address(1));
|
||||
rollup.commitBatch(_batch);
|
||||
hevm.stopPrank();
|
||||
|
||||
// force finalize
|
||||
_batchId = keccak256(abi.encode(_header.blockHash, _header.parentHash, 1));
|
||||
bool verified;
|
||||
(, , , verified) = rollup.batches(_batchId);
|
||||
assertBoolEq(verified, false);
|
||||
rollup.forceBatchFinalization(_batchId);
|
||||
(, , , verified) = rollup.batches(_batchId);
|
||||
assertBoolEq(verified, true);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -330,6 +330,10 @@ func (m *Manager) CollectProofs(sess *session) {
|
||||
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
|
||||
m.mu.Lock()
|
||||
defer func() {
|
||||
// TODO: remove the clean-up, rollers report healthy status.
|
||||
for pk := range sess.info.Rollers {
|
||||
m.freeTaskIDForRoller(pk, sess.info.ID)
|
||||
}
|
||||
delete(m.sessions, sess.info.ID)
|
||||
m.mu.Unlock()
|
||||
}()
|
||||
|
||||
@@ -20,7 +20,7 @@ create table l1_message
|
||||
);
|
||||
|
||||
comment
|
||||
on column l1_message.status is 'undefined, pending, submitted, confirmed';
|
||||
on column l1_message.status is 'undefined, pending, submitted, confirmed, failed, expired';
|
||||
|
||||
create unique index l1_message_hash_uindex
|
||||
on l1_message (msg_hash);
|
||||
|
||||
@@ -21,7 +21,7 @@ create table l2_message
|
||||
);
|
||||
|
||||
comment
|
||||
on column l2_message.status is 'undefined, pending, submitted, confirmed';
|
||||
on column l2_message.status is 'undefined, pending, submitted, confirmed, failed, expired';
|
||||
|
||||
create unique index l2_message_hash_uindex
|
||||
on l2_message (msg_hash);
|
||||
|
||||
@@ -260,7 +260,7 @@ func (o *blockBatchOrm) GetPendingBatches(limit uint64) ([]string, error) {
|
||||
}
|
||||
|
||||
func (o *blockBatchOrm) GetLatestFinalizedBatch() (*BlockBatch, error) {
|
||||
row := o.db.QueryRowx(`SELECT * FROM block_batch WHERE rollup_status = $1 OR rollup_status = $2 ORDER BY index DESC;`, RollupFinalized, RollupFinalizationSkipped)
|
||||
row := o.db.QueryRowx(`select * from block_batch where index = (select max(index) from block_batch where rollup_status = $1);`, RollupFinalized)
|
||||
batch := &BlockBatch{}
|
||||
if err := row.StructScan(batch); err != nil {
|
||||
return nil, err
|
||||
@@ -407,7 +407,16 @@ func (o *blockBatchOrm) GetAssignedBatchIDs() ([]string, error) {
|
||||
return ids, rows.Close()
|
||||
}
|
||||
|
||||
func (o *blockBatchOrm) UpdateSkippedBatches() error {
|
||||
_, err := o.db.Exec(o.db.Rebind("update block_batch set rollup_status = ? where (proving_status = ? or proving_status = ?) and rollup_status = ?;"), RollupFinalizationSkipped, ProvingTaskSkipped, ProvingTaskFailed, RollupCommitted)
|
||||
return err
|
||||
func (o *blockBatchOrm) UpdateSkippedBatches() (int64, error) {
|
||||
res, err := o.db.Exec(o.db.Rebind("update block_batch set rollup_status = ? where (proving_status = ? or proving_status = ?) and rollup_status = ?;"), RollupFinalizationSkipped, ProvingTaskSkipped, ProvingTaskFailed, RollupCommitted)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
count, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
@@ -28,6 +28,9 @@ const (
|
||||
|
||||
// MsgFailed represents the from_layer message status is failed
|
||||
MsgFailed
|
||||
|
||||
// MsgExpired represents the from_layer message status is expired
|
||||
MsgExpired
|
||||
)
|
||||
|
||||
// L1Message is structure of stored layer1 bridge message
|
||||
@@ -151,7 +154,7 @@ type BlockBatchOrm interface {
|
||||
UpdateCommitTxHashAndRollupStatus(ctx context.Context, id string, commitTxHash string, status RollupStatus) error
|
||||
UpdateFinalizeTxHashAndRollupStatus(ctx context.Context, id string, finalizeTxHash string, status RollupStatus) error
|
||||
GetAssignedBatchIDs() ([]string, error)
|
||||
UpdateSkippedBatches() error
|
||||
UpdateSkippedBatches() (int64, error)
|
||||
|
||||
GetCommitTxHash(id string) (sql.NullString, error) // for unit tests only
|
||||
GetFinalizeTxHash(id string) (sql.NullString, error) // for unit tests only
|
||||
@@ -161,7 +164,7 @@ type BlockBatchOrm interface {
|
||||
type L1MessageOrm interface {
|
||||
GetL1MessageByNonce(nonce uint64) (*L1Message, error)
|
||||
GetL1MessageByMsgHash(msgHash string) (*L1Message, error)
|
||||
GetL1MessagesByStatus(status MsgStatus) ([]*L1Message, error)
|
||||
GetL1MessagesByStatus(status MsgStatus, limit uint64) ([]*L1Message, error)
|
||||
GetL1ProcessedNonce() (int64, error)
|
||||
SaveL1Messages(ctx context.Context, messages []*L1Message) error
|
||||
UpdateLayer2Hash(ctx context.Context, msgHash string, layer2Hash string) error
|
||||
|
||||
@@ -45,8 +45,8 @@ func (m *l1MessageOrm) GetL1MessageByNonce(nonce uint64) (*L1Message, error) {
|
||||
}
|
||||
|
||||
// GetL1MessagesByStatus fetch list of unprocessed messages given msg status
|
||||
func (m *l1MessageOrm) GetL1MessagesByStatus(status MsgStatus) ([]*L1Message, error) {
|
||||
rows, err := m.db.Queryx(`SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, status FROM l1_message WHERE status = $1 ORDER BY nonce ASC;`, status)
|
||||
func (m *l1MessageOrm) GetL1MessagesByStatus(status MsgStatus, limit uint64) ([]*L1Message, error) {
|
||||
rows, err := m.db.Queryx(`SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, status FROM l1_message WHERE status = $1 ORDER BY nonce ASC LIMIT $2;`, status, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user