Compare commits

..

24 Commits

Author SHA1 Message Date
Péter Garamvölgyi
97dded9619 improve logs 2023-01-25 17:19:03 +01:00
Péter Garamvölgyi
f48762bf33 Revert "more debug logs"
This reverts commit 42190feb6c.
2023-01-25 16:48:50 +01:00
Péter Garamvölgyi
42190feb6c more debug logs 2023-01-25 16:32:41 +01:00
Péter Garamvölgyi
56080204c5 add more logs 2023-01-25 16:29:34 +01:00
Péter Garamvölgyi
2674dfaf69 change log level 2023-01-25 16:14:19 +01:00
Péter Garamvölgyi
5bffb151a3 use correct parent hash format 2023-01-25 15:46:35 +01:00
Péter Garamvölgyi
fadaec7add retry commitBatch with manual gas estimation 2023-01-25 12:19:25 +01:00
Péter Garamvölgyi
16576b6f53 fix(bridge): Handle duplicate messages (#261) 2023-01-20 12:09:47 +01:00
HAOYUatHZ
aa885f068f docs(db): fix sql comments (#260) 2023-01-20 09:44:19 +08:00
HAOYUatHZ
1f764a579d refactor(bridge): refactor layer2MessageOrm.GetL2Messages() (#243) 2023-01-20 09:29:34 +08:00
HAOYUatHZ
91ee767669 doc(db): update sql comment (#259) 2023-01-20 09:27:50 +08:00
Péter Garamvölgyi
7eac41691e feat(bridge): handle expired messages correctly (#257) 2023-01-19 23:53:34 +01:00
Péter Garamvölgyi
d9516890b0 feat(bridge): handle expired messages (#256) 2023-01-19 23:21:17 +01:00
Péter Garamvölgyi
ddb96bb732 feat(bridge): add more l1 relayer logs (#255) 2023-01-19 22:37:02 +01:00
Péter Garamvölgyi
e419dd8d5c fix(bridge): add limit to GetL1MessagesByStatus (#254) 2023-01-19 22:19:48 +01:00
Péter Garamvölgyi
c99c65bdfd fix(bridge): execute watcher loops independently (#253) 2023-01-19 21:14:45 +01:00
colin
18fd7f56a8 fix(coordinator): reset roller state when proof session timeout (#210)
Co-authored-by: colinlyguo <colinlyguo@gmail.com>
Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
2023-01-19 21:37:30 +08:00
Péter Garamvölgyi
a319dc1cff bugfix(bridge): only relay messages for finalized batches (#251)
Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>
Co-authored-by: colinlyguo <colinlyguo@gmail.com>
2023-01-19 20:54:09 +08:00
colin
52bf3a55fc fix(coordinator): fix CollectProofs for multi-roller & add tests (#252)
Co-authored-by: colinlyguo <colinlyguo@gmail.com>
2023-01-19 14:52:20 +08:00
Péter Garamvölgyi
598e10e4fc feat(bridge): update skipped batches in a single db operation (#242) 2023-01-18 15:21:41 +01:00
maskpp
eed3f42731 fix(coordinator): Fix bug in coordinator.GetNumberOfIdleRollers function. (#247)
Co-authored-by: colinlyguo <colinlyguo@gmail.com>
Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>
2023-01-18 20:51:19 +08:00
colin
5a4bea8ccd fix(coordinator): set session failed when all rollers have submitted invalid proof (#241)
Co-authored-by: colinlyguo <colinlyguo@gmail.com>
Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
2023-01-18 20:01:43 +08:00
ChuhanJin
5b37b63d89 build(jenkinsfile): replace status_check with comment in coverage test (#249)
Co-authored-by: vincent <419436363@qq.com>
2023-01-18 19:44:47 +08:00
HAOYUatHZ
5e5c4f7701 build(roller&coordinator): fix Makefile (#245)
Co-authored-by: Lawliet-Chan <1576710154@qq.com>
2023-01-18 16:51:55 +08:00
24 changed files with 472 additions and 130 deletions

2
Jenkinsfile vendored
View File

@@ -107,7 +107,7 @@ pipeline {
script {
currentBuild.result = 'SUCCESS'
}
step([$class: 'CompareCoverageAction', publishResultAs: 'statusCheck', scmVars: [GIT_URL: env.GIT_URL]])
step([$class: 'CompareCoverageAction', publishResultAs: 'Comment', scmVars: [GIT_URL: env.GIT_URL]])
}
}
}

View File

@@ -72,15 +72,20 @@ func NewLayer1Relayer(ctx context.Context, ethClient *ethclient.Client, l1Confir
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
func (r *Layer1Relayer) ProcessSavedEvents() {
// msgs are sorted by nonce in increasing order
msgs, err := r.db.GetL1MessagesByStatus(orm.MsgPending)
msgs, err := r.db.GetL1MessagesByStatus(orm.MsgPending, 100)
if err != nil {
log.Error("Failed to fetch unprocessed L1 messages", "err", err)
return
}
if len(msgs) > 0 {
log.Info("Processing L1 messages", "count", len(msgs))
}
for _, msg := range msgs {
if err = r.processSavedEvent(msg); err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("failed to process event", "err", err)
log.Error("failed to process event", "msg.msgHash", msg.MsgHash, "err", err)
}
return
}
@@ -108,7 +113,13 @@ func (r *Layer1Relayer) processSavedEvent(msg *orm.L1Message) error {
return err
}
hash, err := r.sender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
hash, err := r.sender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data, 0)
if err != nil && err.Error() == "execution reverted: Message expired" {
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, orm.MsgExpired)
}
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, orm.MsgConfirmed)
}
if err != nil {
return err
}
@@ -123,6 +134,8 @@ func (r *Layer1Relayer) processSavedEvent(msg *orm.L1Message) error {
// Start the relayer process
func (r *Layer1Relayer) Start() {
log.Info("Starting l1/relayer")
go func() {
// trigger by timer
ticker := time.NewTicker(3 * time.Second)

View File

@@ -81,6 +81,8 @@ func NewWatcher(ctx context.Context, client *ethclient.Client, startHeight uint6
// Start the Watcher module.
func (w *Watcher) Start() {
log.Info("Starting l1/watcher")
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

View File

@@ -40,8 +40,7 @@ func newBatchProposer(cfg *config.BatchProposerConfig, orm database.OrmFactory)
}
}
func (w *batchProposer) tryProposeBatch(wg *sync.WaitGroup) {
defer wg.Done()
func (w *batchProposer) tryProposeBatch() {
w.mutex.Lock()
defer w.mutex.Unlock()

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"math/big"
"os"
"sync"
"testing"
"github.com/scroll-tech/go-ethereum/core/types"
@@ -50,10 +49,7 @@ func testBatchProposer(t *testing.T) {
BatchTimeSec: 1,
BatchBlocksLimit: 100,
}, db)
var wg sync.WaitGroup
wg.Add(1)
proposer.tryProposeBatch(&wg)
wg.Wait()
proposer.tryProposeBatch()
infos, err := db.GetUnbatchedBlocks(map[string]interface{}{},
fmt.Sprintf("order by number ASC LIMIT %d", 100))

View File

@@ -77,6 +77,7 @@ func TestFunction(t *testing.T) {
t.Run("TestL2RelayerProcessSaveEvents", testL2RelayerProcessSaveEvents)
t.Run("testL2RelayerProcessPendingBatches", testL2RelayerProcessPendingBatches)
t.Run("testL2RelayerProcessCommittedBatches", testL2RelayerProcessCommittedBatches)
t.Run("testL2RelayerSkipBatches", testL2RelayerSkipBatches)
t.Run("testBatchProposer", testBatchProposer)

View File

@@ -107,7 +107,8 @@ func (r *Layer2Relayer) ProcessSavedEvents(wg *sync.WaitGroup) {
// msgs are sorted by nonce in increasing order
msgs, err := r.db.GetL2Messages(
map[string]interface{}{"status": orm.MsgPending},
fmt.Sprintf("AND height<=%d ORDER BY nonce ASC LIMIT %d", batch.EndBlockNumber, processMsgLimit),
fmt.Sprintf("AND height<=%d", batch.EndBlockNumber),
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
)
if err != nil {
@@ -165,7 +166,13 @@ func (r *Layer2Relayer) processSavedEvent(msg *orm.L2Message, index uint64) erro
return err
}
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data, 0)
if err != nil && err.Error() == "execution reverted: Message expired" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgExpired)
}
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgConfirmed)
}
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send relayMessageWithProof tx to layer1 ", "msg.height", msg.Height, "msg.MsgHash", msg.MsgHash, "err", err)
@@ -262,7 +269,26 @@ func (r *Layer2Relayer) ProcessPendingBatches(wg *sync.WaitGroup) {
txID := id + "-commit"
// add suffix `-commit` to avoid duplication with finalize tx in unit tests
hash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
hash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data, 0)
if err != nil && err.Error() == "execution reverted: Parent batch hasn't been committed" {
// check parent is committing
batches, err = r.db.GetBlockBatches(map[string]interface{}{"end_block_hash": batch.ParentHash})
if err != nil || len(batches) == 0 {
log.Error("Failed to get parent batch from db", "batch_id", id, "parent_hash", batch.ParentHash, "err", err)
return
}
parentBatch := batches[0]
if parentBatch.RollupStatus >= orm.RollupCommitting {
// retry with manual gas estimation
gasLimit := estimateCommitBatchGas(len(data), len(layer2Batch.Blocks))
hash, err = r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data, gasLimit)
log.Info("commitBatch tx resent with manual gas estimation ", "id", id, "index", batch.Index, "gasLimit", gasLimit, "hash", hash.String(), "err", err)
}
}
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send commitBatch tx to layer1 ", "id", id, "index", batch.Index, "err", err)
@@ -282,6 +308,15 @@ func (r *Layer2Relayer) ProcessPendingBatches(wg *sync.WaitGroup) {
// ProcessCommittedBatches submit proof to layer 1 rollup contract
func (r *Layer2Relayer) ProcessCommittedBatches(wg *sync.WaitGroup) {
defer wg.Done()
// set skipped batches in a single db operation
if count, err := r.db.UpdateSkippedBatches(); err != nil {
log.Error("UpdateSkippedBatches failed", "err", err)
// continue anyway
} else if count > 0 {
log.Info("Skipping batches", "count", count)
}
// batches are sorted by batch index in increasing order
batches, err := r.db.GetCommittedBatches(1)
if err != nil {
@@ -311,6 +346,8 @@ func (r *Layer2Relayer) ProcessCommittedBatches(wg *sync.WaitGroup) {
return
case orm.ProvingTaskFailed, orm.ProvingTaskSkipped:
// note: this is covered by UpdateSkippedBatches, but we keep it for completeness's sake
if err = r.db.UpdateRollupStatus(r.ctx, id, orm.RollupFinalizationSkipped); err != nil {
log.Warn("UpdateRollupStatus failed", "id", id, "err", err)
}
@@ -357,7 +394,7 @@ func (r *Layer2Relayer) ProcessCommittedBatches(wg *sync.WaitGroup) {
txID := id + "-finalize"
// add suffix `-finalize` to avoid duplication with commit tx in unit tests
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data, 0)
hash := &txHash
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
@@ -384,6 +421,8 @@ func (r *Layer2Relayer) ProcessCommittedBatches(wg *sync.WaitGroup) {
// Start the relayer process
func (r *Layer2Relayer) Start() {
log.Info("Starting l2/relayer")
go func() {
// trigger by timer
ticker := time.NewTicker(time.Second)
@@ -455,3 +494,12 @@ func (r *Layer2Relayer) handleConfirmation(confirmation *sender.Confirmation) {
}
log.Info("transaction confirmed in layer1", "type", transactionType, "confirmation", confirmation)
}
func estimateCommitBatchGas(callDataLength int, numBlocks int) uint64 {
gasLimit := uint64(0)
gasLimit += 16 * uint64(callDataLength) // calldata cost
gasLimit += 4*2100 + 3*22100 // fixed cost per batch
gasLimit += 4 * 22100 * uint64(numBlocks) // cost per block in batch
gasLimit = gasLimit * 12 / 10 // apply multiplier
return gasLimit
}

View File

@@ -203,3 +203,71 @@ func testL2RelayerProcessCommittedBatches(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, orm.RollupFinalizing, status)
}
func testL2RelayerSkipBatches(t *testing.T) {
// Create db handler and reset db.
db, err := database.NewOrmFactory(cfg.DBConfig)
assert.NoError(t, err)
assert.NoError(t, migrate.ResetDB(db.GetDB().DB))
defer db.Close()
l2Cfg := cfg.L2Config
relayer, err := NewLayer2Relayer(context.Background(), db, l2Cfg.RelayerConfig)
assert.NoError(t, err)
defer relayer.Stop()
createBatch := func(rollupStatus orm.RollupStatus, provingStatus orm.ProvingStatus) string {
dbTx, err := db.Beginx()
assert.NoError(t, err)
batchID, err := db.NewBatchInDBTx(dbTx, &orm.BlockInfo{}, &orm.BlockInfo{}, "0", 1, 194676) // startBlock & endBlock & parentHash & totalTxNum & totalL2Gas don't really matter here
assert.NoError(t, err)
err = dbTx.Commit()
assert.NoError(t, err)
err = db.UpdateRollupStatus(context.Background(), batchID, rollupStatus)
assert.NoError(t, err)
tProof := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}
tInstanceCommitments := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}
err = db.UpdateProofByID(context.Background(), batchID, tProof, tInstanceCommitments, 100)
assert.NoError(t, err)
err = db.UpdateProvingStatus(batchID, provingStatus)
assert.NoError(t, err)
return batchID
}
skipped := []string{
createBatch(orm.RollupCommitted, orm.ProvingTaskSkipped),
createBatch(orm.RollupCommitted, orm.ProvingTaskFailed),
}
notSkipped := []string{
createBatch(orm.RollupPending, orm.ProvingTaskSkipped),
createBatch(orm.RollupCommitting, orm.ProvingTaskSkipped),
createBatch(orm.RollupFinalizing, orm.ProvingTaskSkipped),
createBatch(orm.RollupFinalized, orm.ProvingTaskSkipped),
createBatch(orm.RollupPending, orm.ProvingTaskFailed),
createBatch(orm.RollupCommitting, orm.ProvingTaskFailed),
createBatch(orm.RollupFinalizing, orm.ProvingTaskFailed),
createBatch(orm.RollupFinalized, orm.ProvingTaskFailed),
createBatch(orm.RollupCommitted, orm.ProvingTaskVerified),
}
var wg = sync.WaitGroup{}
wg.Add(1)
relayer.ProcessCommittedBatches(&wg)
wg.Wait()
for _, id := range skipped {
status, err := db.GetRollupStatus(id)
assert.NoError(t, err)
assert.Equal(t, orm.RollupFinalizationSkipped, status)
}
for _, id := range notSkipped {
status, err := db.GetRollupStatus(id)
assert.NoError(t, err)
assert.NotEqual(t, orm.RollupFinalizationSkipped, status)
}
}

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"math/big"
"reflect"
"sync"
"time"
geth "github.com/scroll-tech/go-ethereum"
@@ -77,41 +76,91 @@ func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmat
// Start the Listening process
func (w *WatcherClient) Start() {
log.Info("Starting l2/watcher")
go func() {
if reflect.ValueOf(w.orm).IsNil() {
panic("must run L2 watcher with DB")
}
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
ctx, cancel := context.WithCancel(w.ctx)
for ; true; <-ticker.C {
select {
case <-w.stopCh:
return
// trace fetcher loop
go func(ctx context.Context) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
default:
// get current height
number, err := w.BlockNumber(w.ctx)
if err != nil {
log.Error("failed to get_BlockNumber", "err", err)
continue
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// get current height
number, err := w.BlockNumber(ctx)
if err != nil {
log.Error("failed to get_BlockNumber", "err", err)
continue
}
if number >= w.confirmations {
number = number - w.confirmations
} else {
number = 0
}
w.tryFetchRunningMissingBlocks(ctx, number)
}
if number >= w.confirmations {
number = number - w.confirmations
} else {
number = 0
}
var wg sync.WaitGroup
wg.Add(3)
go w.tryFetchRunningMissingBlocks(w.ctx, &wg, number)
go w.FetchContractEvent(&wg, number)
go w.batchProposer.tryProposeBatch(&wg)
wg.Wait()
}
}
}(ctx)
// event fetcher loop
go func(ctx context.Context) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// get current height
number, err := w.BlockNumber(ctx)
if err != nil {
log.Error("failed to get_BlockNumber", "err", err)
continue
}
if number >= w.confirmations {
number = number - w.confirmations
} else {
number = 0
}
w.FetchContractEvent(number)
}
}
}(ctx)
// batch proposer loop
go func(ctx context.Context) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
w.batchProposer.tryProposeBatch()
}
}
}(ctx)
<-w.stopCh
cancel()
}()
}
@@ -123,8 +172,7 @@ func (w *WatcherClient) Stop() {
const blockTracesFetchLimit = uint64(10)
// try fetch missing blocks if inconsistent
func (w *WatcherClient) tryFetchRunningMissingBlocks(ctx context.Context, wg *sync.WaitGroup, blockHeight uint64) {
defer wg.Done()
func (w *WatcherClient) tryFetchRunningMissingBlocks(ctx context.Context, blockHeight uint64) {
// Get newest block in DB. must have blocks at that time.
// Don't use "block_trace" table "trace" column's BlockTrace.Number,
// because it might be empty if the corresponding rollup_result is finalized/finalization_skipped
@@ -181,8 +229,7 @@ func (w *WatcherClient) getAndStoreBlockTraces(ctx context.Context, from, to uin
const contractEventsBlocksFetchLimit = int64(10)
// FetchContractEvent pull latest event logs from given contract address and save in DB
func (w *WatcherClient) FetchContractEvent(wg *sync.WaitGroup, blockHeight uint64) {
defer wg.Done()
func (w *WatcherClient) FetchContractEvent(blockHeight uint64) {
defer func() {
log.Info("l2 watcher fetchContractEvent", "w.processedMsgHeight", w.processedMsgHeight)
}()

View File

@@ -44,7 +44,7 @@ func testCreateNewWatcherAndStop(t *testing.T) {
numTransactions := 3
toAddress := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
for i := 0; i < numTransactions; i++ {
_, err = newSender.SendTransaction(strconv.Itoa(1000+i), &toAddress, big.NewInt(1000000000), nil)
_, err = newSender.SendTransaction(strconv.Itoa(1000+i), &toAddress, big.NewInt(1000000000), nil, 0)
assert.NoError(t, err)
<-newSender.ConfirmChan()
}

View File

@@ -154,18 +154,21 @@ func (s *Sender) NumberOfAccounts() int {
return len(s.auths.accounts)
}
func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, value *big.Int, data []byte) (*FeeData, error) {
// estimate gas limit
gasLimit, err := s.client.EstimateGas(s.ctx, geth.CallMsg{From: auth.From, To: target, Value: value, Data: data})
if err != nil {
return nil, err
func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, value *big.Int, data []byte, gasLimit uint64) (*FeeData, error) {
if gasLimit == 0 {
// estimate gas limit
var err error
gasLimit, err = s.client.EstimateGas(s.ctx, geth.CallMsg{From: auth.From, To: target, Value: value, Data: data})
if err != nil {
return nil, err
}
gasLimit = gasLimit * 15 / 10 // 50% extra gas to void out of gas error
}
gasLimit = gasLimit * 15 / 10 // 50% extra gas to void out of gas error
// @todo change it when Scroll enable EIP1559
if s.config.TxType != DynamicFeeTxType {
// estimate gas price
var gasPrice *big.Int
gasPrice, err = s.client.SuggestGasPrice(s.ctx)
gasPrice, err := s.client.SuggestGasPrice(s.ctx)
if err != nil {
return nil, err
}
@@ -189,7 +192,7 @@ func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, val
}
// SendTransaction send a signed L2tL1 transaction.
func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.Int, data []byte) (hash common.Hash, err error) {
func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.Int, data []byte, gasLimit uint64) (hash common.Hash, err error) {
// We occupy the ID, in case some other threads call with the same ID in the same time
if _, loaded := s.pendingTxs.LoadOrStore(ID, nil); loaded {
return common.Hash{}, fmt.Errorf("has the repeat tx ID, ID: %s", ID)
@@ -213,9 +216,10 @@ func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.I
tx *types.Transaction
)
// estimate gas fee
if feeData, err = s.getFeeData(auth, target, value, data); err != nil {
if feeData, err = s.getFeeData(auth, target, value, data, gasLimit); err != nil {
return
}
if tx, err = s.createAndSendTx(auth, feeData, target, value, data, nil); err == nil {
// add pending transaction to queue
pending := &PendingTransaction{

View File

@@ -87,7 +87,7 @@ func testBatchSender(t *testing.T, batchSize int) {
for i := 0; i < TXBatch; i++ {
toAddr := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
id := strconv.Itoa(i + index*1000)
_, err := newSender.SendTransaction(id, &toAddr, big.NewInt(1), nil)
_, err := newSender.SendTransaction(id, &toAddr, big.NewInt(1), nil, 0)
if errors.Is(err, sender.ErrNoAvailableAccount) {
<-time.After(time.Second)
continue

View File

@@ -26,7 +26,7 @@ func testRelayL2MessageSucceed(t *testing.T) {
defer db.Close()
var wg sync.WaitGroup
wg.Add(4)
wg.Add(3)
prepareContracts(t)
@@ -55,7 +55,7 @@ func testRelayL2MessageSucceed(t *testing.T) {
}
// l2 watch process events
l2Watcher.FetchContractEvent(&wg, sendReceipt.BlockNumber.Uint64())
l2Watcher.FetchContractEvent(sendReceipt.BlockNumber.Uint64())
// check db status
msg, err := db.GetL2MessageByNonce(nonce.Uint64())

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "prealpha-v11.3"
var tag = "prealpha-v11.14"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -15,7 +15,7 @@ test:
libzkp:
cd ../common/libzkp/impl && cargo build --release && cp ./target/release/libzkp.a ../interface/
cp -r ../common/libzkp/interface ./verifier/lib
rm -rf ./verifier/lib && cp -r ../common/libzkp/interface ./verifier/lib
coordinator: libzkp ## Builds the Coordinator instance.
go build -ldflags "-X scroll-tech/common/version.ZkVersion=${ZK_VERSION}" -o $(PWD)/build/bin/coordinator ./cmd

View File

@@ -65,7 +65,6 @@ type Manager struct {
// A map containing proof failed or verify failed proof.
rollerPool cmap.ConcurrentMap
// TODO: once put into use, should add to graceful restart.
failedSessionInfos map[string]*SessionInfo
// A direct connection to the Halo2 verifier, used to verify
@@ -326,54 +325,74 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
// CollectProofs collects proofs corresponding to a proof generation session.
func (m *Manager) CollectProofs(sess *session) {
select {
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
m.mu.Lock()
defer func() {
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
}()
for {
select {
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
m.mu.Lock()
defer func() {
// TODO: remove the clean-up, rollers report healthy status.
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
}()
// Pick a random winner.
// First, round up the keys that actually sent in a valid proof.
var participatingRollers []string
for pk, roller := range sess.info.Rollers {
if roller.Status == orm.RollerProofValid {
participatingRollers = append(participatingRollers, pk)
// Pick a random winner.
// First, round up the keys that actually sent in a valid proof.
var participatingRollers []string
for pk, roller := range sess.info.Rollers {
if roller.Status == orm.RollerProofValid {
participatingRollers = append(participatingRollers, pk)
}
}
}
// Ensure we got at least one proof before selecting a winner.
if len(participatingRollers) == 0 {
// record failed session.
errMsg := "proof generation session ended without receiving any valid proofs"
m.addFailedSession(sess, errMsg)
log.Warn(errMsg, "session id", sess.info.ID)
// Set status as skipped.
// Note that this is only a workaround for testnet here.
// TODO: In real cases we should reset to orm.ProvingTaskUnassigned
// so as to re-distribute the task in the future
if err := m.orm.UpdateProvingStatus(sess.info.ID, orm.ProvingTaskFailed); err != nil {
log.Error("fail to reset task_status as Unassigned", "id", sess.info.ID, "err", err)
// Ensure we got at least one proof before selecting a winner.
if len(participatingRollers) == 0 {
// record failed session.
errMsg := "proof generation session ended without receiving any valid proofs"
m.addFailedSession(sess, errMsg)
log.Warn(errMsg, "session id", sess.info.ID)
// Set status as skipped.
// Note that this is only a workaround for testnet here.
// TODO: In real cases we should reset to orm.ProvingTaskUnassigned
// so as to re-distribute the task in the future
if err := m.orm.UpdateProvingStatus(sess.info.ID, orm.ProvingTaskFailed); err != nil {
log.Error("fail to reset task_status as Unassigned", "id", sess.info.ID, "err", err)
}
return
}
// Now, select a random index for this slice.
randIndex := mathrand.Intn(len(participatingRollers))
_ = participatingRollers[randIndex]
// TODO: reward winner
return
}
// Now, select a random index for this slice.
randIndex := mathrand.Intn(len(participatingRollers))
_ = participatingRollers[randIndex]
// TODO: reward winner
return
case ret := <-sess.finishChan:
m.mu.Lock()
sess.info.Rollers[ret.pk].Status = ret.status
m.mu.Unlock()
if err := m.orm.SetSessionInfo(sess.info); err != nil {
log.Error("db set session info fail", "pk", ret.pk, "error", err)
case ret := <-sess.finishChan:
m.mu.Lock()
sess.info.Rollers[ret.pk].Status = ret.status
if m.isSessionFailed(sess.info) {
if err := m.orm.UpdateProvingStatus(ret.id, orm.ProvingTaskFailed); err != nil {
log.Error("failed to update proving_status as failed", "msg.ID", ret.id, "error", err)
}
}
if err := m.orm.SetSessionInfo(sess.info); err != nil {
log.Error("db set session info fail", "pk", ret.pk, "error", err)
}
m.mu.Unlock()
}
}
}
func (m *Manager) isSessionFailed(info *orm.SessionInfo) bool {
for _, roller := range info.Rollers {
if roller.Status != orm.RollerProofInvalid {
return false
}
}
return true
}
// APIs collect API services.
func (m *Manager) APIs() []rpc.API {
return []rpc.API{
@@ -435,6 +454,7 @@ func (m *Manager) StartProofGenerationSession(task *orm.BlockBatch) (success boo
for i := 0; i < int(m.cfg.RollersPerSession); i++ {
roller := m.selectRoller()
if roller == nil {
log.Info("selectRoller returns nil")
break
}
log.Info("roller is picked", "session id", task.ID, "name", roller.Name, "public key", roller.PublicKey)

View File

@@ -64,6 +64,8 @@ func TestApis(t *testing.T) {
t.Run("TestHandshake", testHandshake)
t.Run("TestFailedHandshake", testFailedHandshake)
t.Run("TestSeveralConnections", testSeveralConnections)
t.Run("TestValidProof", testValidProof)
t.Run("TestInvalidProof", testInvalidProof)
t.Run("TestIdleRollerSelection", testIdleRollerSelection)
// TODO: Restart roller alone when received task, can add this test case in integration-test.
//t.Run("TestRollerReconnect", testRollerReconnect)
@@ -84,7 +86,7 @@ func testHandshake(t *testing.T) {
// Setup coordinator and ws server.
wsURL := "ws://" + randomURL()
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, wsURL)
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
defer func() {
handler.Shutdown(context.Background())
rollerManager.Stop()
@@ -105,7 +107,7 @@ func testFailedHandshake(t *testing.T) {
// Setup coordinator and ws server.
wsURL := "ws://" + randomURL()
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, wsURL)
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
defer func() {
handler.Shutdown(context.Background())
rollerManager.Stop()
@@ -171,7 +173,7 @@ func testSeveralConnections(t *testing.T) {
// Setup coordinator and ws server.
wsURL := "ws://" + randomURL()
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, wsURL)
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
defer func() {
handler.Shutdown(context.Background())
rollerManager.Stop()
@@ -215,6 +217,124 @@ func testSeveralConnections(t *testing.T) {
}
}
}
func testValidProof(t *testing.T) {
// Create db handler and reset db.
l2db, err := database.NewOrmFactory(cfg.DBConfig)
assert.NoError(t, err)
assert.NoError(t, migrate.ResetDB(l2db.GetDB().DB))
defer l2db.Close()
// Setup coordinator and ws server.
wsURL := "ws://" + randomURL()
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 3, wsURL)
defer func() {
handler.Shutdown(context.Background())
rollerManager.Stop()
}()
// create mock rollers.
rollers := make([]*mockRoller, 3)
for i := 0; i < len(rollers); i++ {
rollers[i] = newMockRoller(t, "roller_test"+strconv.Itoa(i), wsURL)
// only roller 0 submits valid proof.
rollers[i].waitTaskAndSendProof(t, time.Second, false, i == 0)
}
defer func() {
// close connection
for _, roller := range rollers {
roller.close()
}
}()
assert.Equal(t, 3, rollerManager.GetNumberOfIdleRollers())
var ids = make([]string, 1)
dbTx, err := l2db.Beginx()
assert.NoError(t, err)
for i := range ids {
ID, err := l2db.NewBatchInDBTx(dbTx, &orm.BlockInfo{Number: uint64(i)}, &orm.BlockInfo{Number: uint64(i)}, "0f", 1, 194676)
assert.NoError(t, err)
ids[i] = ID
}
assert.NoError(t, dbTx.Commit())
// verify proof status
var (
tick = time.Tick(500 * time.Millisecond)
tickStop = time.Tick(10 * time.Second)
)
for len(ids) > 0 {
select {
case <-tick:
status, err := l2db.GetProvingStatusByID(ids[0])
assert.NoError(t, err)
if status == orm.ProvingTaskVerified {
ids = ids[1:]
}
case <-tickStop:
t.Error("failed to check proof status")
return
}
}
}
func testInvalidProof(t *testing.T) {
// Create db handler and reset db.
l2db, err := database.NewOrmFactory(cfg.DBConfig)
assert.NoError(t, err)
assert.NoError(t, migrate.ResetDB(l2db.GetDB().DB))
defer l2db.Close()
// Setup coordinator and ws server.
wsURL := "ws://" + randomURL()
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 3, wsURL)
defer func() {
handler.Shutdown(context.Background())
rollerManager.Stop()
}()
// create mock rollers.
rollers := make([]*mockRoller, 3)
for i := 0; i < len(rollers); i++ {
rollers[i] = newMockRoller(t, "roller_test"+strconv.Itoa(i), wsURL)
rollers[i].waitTaskAndSendProof(t, time.Second, false, false)
}
defer func() {
// close connection
for _, roller := range rollers {
roller.close()
}
}()
assert.Equal(t, 3, rollerManager.GetNumberOfIdleRollers())
var ids = make([]string, 1)
dbTx, err := l2db.Beginx()
assert.NoError(t, err)
for i := range ids {
ID, err := l2db.NewBatchInDBTx(dbTx, &orm.BlockInfo{Number: uint64(i)}, &orm.BlockInfo{Number: uint64(i)}, "0f", 1, 194676)
assert.NoError(t, err)
ids[i] = ID
}
assert.NoError(t, dbTx.Commit())
// verify proof status
var (
tick = time.Tick(500 * time.Millisecond)
tickStop = time.Tick(10 * time.Second)
)
for len(ids) > 0 {
select {
case <-tick:
status, err := l2db.GetProvingStatusByID(ids[0])
assert.NoError(t, err)
if status == orm.ProvingTaskFailed {
ids = ids[1:]
}
case <-tickStop:
t.Error("failed to check proof status")
return
}
}
}
func testIdleRollerSelection(t *testing.T) {
// Create db handler and reset db.
@@ -225,7 +345,7 @@ func testIdleRollerSelection(t *testing.T) {
// Setup coordinator and ws server.
wsURL := "ws://" + randomURL()
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, wsURL)
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
defer func() {
handler.Shutdown(context.Background())
rollerManager.Stop()
@@ -235,7 +355,7 @@ func testIdleRollerSelection(t *testing.T) {
rollers := make([]*mockRoller, 20)
for i := 0; i < len(rollers); i++ {
rollers[i] = newMockRoller(t, "roller_test"+strconv.Itoa(i), wsURL)
rollers[i].waitTaskAndSendProof(t, time.Second, false)
rollers[i].waitTaskAndSendProof(t, time.Second, false, true)
}
defer func() {
// close connection
@@ -294,12 +414,12 @@ func testGracefulRestart(t *testing.T) {
// Setup coordinator and ws server.
wsURL := "ws://" + randomURL()
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, wsURL)
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
// create mock roller
roller := newMockRoller(t, "roller_test", wsURL)
// wait 10 seconds, coordinator restarts before roller submits proof
roller.waitTaskAndSendProof(t, 10*time.Second, false)
roller.waitTaskAndSendProof(t, 10*time.Second, false, true)
// wait for coordinator to dispatch task
<-time.After(5 * time.Second)
@@ -311,7 +431,7 @@ func testGracefulRestart(t *testing.T) {
rollerManager.Stop()
// Setup new coordinator and ws server.
newRollerManager, newHandler := setupCoordinator(t, cfg.DBConfig, wsURL)
newRollerManager, newHandler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
defer func() {
newHandler.Shutdown(context.Background())
newRollerManager.Stop()
@@ -329,7 +449,7 @@ func testGracefulRestart(t *testing.T) {
}
// will overwrite the roller client for `SubmitProof`
roller.waitTaskAndSendProof(t, time.Millisecond*500, true)
roller.waitTaskAndSendProof(t, time.Millisecond*500, true, true)
defer roller.close()
// verify proof status
@@ -355,13 +475,13 @@ func testGracefulRestart(t *testing.T) {
}
}
func setupCoordinator(t *testing.T, dbCfg *database.DBConfig, wsURL string) (rollerManager *coordinator.Manager, handler *http.Server) {
func setupCoordinator(t *testing.T, dbCfg *database.DBConfig, rollersPerSession uint8, wsURL string) (rollerManager *coordinator.Manager, handler *http.Server) {
// Get db handler.
db, err := database.NewOrmFactory(dbCfg)
assert.True(t, assert.NoError(t, err), "failed to get db handler.")
rollerManager, err = coordinator.New(context.Background(), &coordinator_config.RollerManagerConfig{
RollersPerSession: 1,
RollersPerSession: rollersPerSession,
Verifier: &coordinator_config.VerifierConfig{MockMode: true},
CollectionTime: 1,
TokenTimeToLive: 5,
@@ -448,7 +568,7 @@ func (r *mockRoller) releaseTasks() {
}
// Wait for the proof task, after receiving the proof task, roller submits proof after proofTime secs.
func (r *mockRoller) waitTaskAndSendProof(t *testing.T, proofTime time.Duration, reconnect bool) {
func (r *mockRoller) waitTaskAndSendProof(t *testing.T, proofTime time.Duration, reconnect bool, validProof bool) {
// simulating the case that the roller first disconnects and then reconnects to the coordinator
// the Subscription and its `Err()` channel will be closed, and the coordinator will `freeRoller()`
if reconnect {
@@ -464,10 +584,10 @@ func (r *mockRoller) waitTaskAndSendProof(t *testing.T, proofTime time.Duration,
r.releaseTasks()
r.stopCh = make(chan struct{})
go r.loop(t, r.client, proofTime, r.stopCh)
go r.loop(t, r.client, proofTime, validProof, r.stopCh)
}
func (r *mockRoller) loop(t *testing.T, client *client2.Client, proofTime time.Duration, stopCh chan struct{}) {
func (r *mockRoller) loop(t *testing.T, client *client2.Client, proofTime time.Duration, validProof bool, stopCh chan struct{}) {
for {
select {
case task := <-r.taskCh:
@@ -485,6 +605,9 @@ func (r *mockRoller) loop(t *testing.T, client *client2.Client, proofTime time.D
Proof: &message.AggProof{},
},
}
if !validProof {
proof.Status = message.StatusProofError
}
assert.NoError(t, proof.Sign(r.privKey))
ok, err := client.SubmitProof(context.Background(), proof)
assert.NoError(t, err)

View File

@@ -106,17 +106,18 @@ func (m *Manager) freeTaskIDForRoller(pk string, id string) {
}
// GetNumberOfIdleRollers return the count of idle rollers.
func (m *Manager) GetNumberOfIdleRollers() int {
pubkeys := m.rollerPool.Keys()
for i := 0; i < len(pubkeys); i++ {
if val, ok := m.rollerPool.Get(pubkeys[i]); ok {
func (m *Manager) GetNumberOfIdleRollers() (count int) {
for i, pk := range m.rollerPool.Keys() {
if val, ok := m.rollerPool.Get(pk); ok {
r := val.(*rollerNode)
if r.TaskIDs.Count() > 0 {
pubkeys[i], pubkeys = pubkeys[len(pubkeys)-1], pubkeys[:len(pubkeys)-1]
if r.TaskIDs.Count() == 0 {
count++
}
} else {
log.Error("rollerPool Get fail", "pk", pk, "idx", i, "pk len", pk)
}
}
return len(pubkeys)
return count
}
func (m *Manager) selectRoller() *rollerNode {
@@ -128,6 +129,8 @@ func (m *Manager) selectRoller() *rollerNode {
if r.TaskIDs.Count() == 0 {
return r
}
} else {
log.Error("rollerPool Get fail", "pk", pubkeys[idx.Int64()], "idx", idx.Int64(), "pk len", len(pubkeys))
}
pubkeys[idx.Int64()], pubkeys = pubkeys[0], pubkeys[1:]
}

View File

@@ -20,7 +20,7 @@ create table l1_message
);
comment
on column l1_message.status is 'undefined, pending, submitted, confirmed';
on column l1_message.status is 'undefined, pending, submitted, confirmed, failed, expired';
create unique index l1_message_hash_uindex
on l1_message (msg_hash);

View File

@@ -21,7 +21,7 @@ create table l2_message
);
comment
on column l2_message.status is 'undefined, pending, submitted, confirmed';
on column l2_message.status is 'undefined, pending, submitted, confirmed, failed, expired';
create unique index l2_message_hash_uindex
on l2_message (msg_hash);

View File

@@ -260,7 +260,7 @@ func (o *blockBatchOrm) GetPendingBatches(limit uint64) ([]string, error) {
}
func (o *blockBatchOrm) GetLatestFinalizedBatch() (*BlockBatch, error) {
row := o.db.QueryRowx(`SELECT * FROM block_batch WHERE rollup_status = $1 OR rollup_status = $2 ORDER BY index DESC;`, RollupFinalized, RollupFinalizationSkipped)
row := o.db.QueryRowx(`select * from block_batch where index = (select max(index) from block_batch where rollup_status = $1);`, RollupFinalized)
batch := &BlockBatch{}
if err := row.StructScan(batch); err != nil {
return nil, err
@@ -406,3 +406,17 @@ func (o *blockBatchOrm) GetAssignedBatchIDs() ([]string, error) {
return ids, rows.Close()
}
func (o *blockBatchOrm) UpdateSkippedBatches() (int64, error) {
res, err := o.db.Exec(o.db.Rebind("update block_batch set rollup_status = ? where (proving_status = ? or proving_status = ?) and rollup_status = ?;"), RollupFinalizationSkipped, ProvingTaskSkipped, ProvingTaskFailed, RollupCommitted)
if err != nil {
return 0, err
}
count, err := res.RowsAffected()
if err != nil {
return 0, err
}
return count, nil
}

View File

@@ -28,6 +28,9 @@ const (
// MsgFailed represents the from_layer message status is failed
MsgFailed
// MsgExpired represents the from_layer message status is expired
MsgExpired
)
// L1Message is structure of stored layer1 bridge message
@@ -151,6 +154,7 @@ type BlockBatchOrm interface {
UpdateCommitTxHashAndRollupStatus(ctx context.Context, id string, commitTxHash string, status RollupStatus) error
UpdateFinalizeTxHashAndRollupStatus(ctx context.Context, id string, finalizeTxHash string, status RollupStatus) error
GetAssignedBatchIDs() ([]string, error)
UpdateSkippedBatches() (int64, error)
GetCommitTxHash(id string) (sql.NullString, error) // for unit tests only
GetFinalizeTxHash(id string) (sql.NullString, error) // for unit tests only
@@ -160,7 +164,7 @@ type BlockBatchOrm interface {
type L1MessageOrm interface {
GetL1MessageByNonce(nonce uint64) (*L1Message, error)
GetL1MessageByMsgHash(msgHash string) (*L1Message, error)
GetL1MessagesByStatus(status MsgStatus) ([]*L1Message, error)
GetL1MessagesByStatus(status MsgStatus, limit uint64) ([]*L1Message, error)
GetL1ProcessedNonce() (int64, error)
SaveL1Messages(ctx context.Context, messages []*L1Message) error
UpdateLayer2Hash(ctx context.Context, msgHash string, layer2Hash string) error

View File

@@ -45,8 +45,8 @@ func (m *l1MessageOrm) GetL1MessageByNonce(nonce uint64) (*L1Message, error) {
}
// GetL1MessagesByStatus fetch list of unprocessed messages given msg status
func (m *l1MessageOrm) GetL1MessagesByStatus(status MsgStatus) ([]*L1Message, error) {
rows, err := m.db.Queryx(`SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, status FROM l1_message WHERE status = $1 ORDER BY nonce ASC;`, status)
func (m *l1MessageOrm) GetL1MessagesByStatus(status MsgStatus, limit uint64) ([]*L1Message, error) {
rows, err := m.db.Queryx(`SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, status FROM l1_message WHERE status = $1 ORDER BY nonce ASC LIMIT $2;`, status, limit)
if err != nil {
return nil, err
}

View File

@@ -11,7 +11,7 @@ endif
libzkp:
cd ../common/libzkp/impl && cargo build --release && cp ./target/release/libzkp.a ../interface/
cp -r ../common/libzkp/interface ./prover/lib
rm -rf ./prover/lib && cp -r ../common/libzkp/interface ./prover/lib
roller: libzkp ## Build the Roller instance.
GOBIN=$(PWD)/build/bin go build -ldflags "-X scroll-tech/common/version.ZkVersion=${ZK_VERSION}" -o $(PWD)/build/bin/roller ./cmd