mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-12 07:28:08 -05:00
Compare commits
30 Commits
prealpha-v
...
goerli
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
97dded9619 | ||
|
|
f48762bf33 | ||
|
|
42190feb6c | ||
|
|
56080204c5 | ||
|
|
2674dfaf69 | ||
|
|
5bffb151a3 | ||
|
|
fadaec7add | ||
|
|
16576b6f53 | ||
|
|
aa885f068f | ||
|
|
1f764a579d | ||
|
|
91ee767669 | ||
|
|
7eac41691e | ||
|
|
d9516890b0 | ||
|
|
ddb96bb732 | ||
|
|
e419dd8d5c | ||
|
|
c99c65bdfd | ||
|
|
18fd7f56a8 | ||
|
|
a319dc1cff | ||
|
|
52bf3a55fc | ||
|
|
598e10e4fc | ||
|
|
eed3f42731 | ||
|
|
5a4bea8ccd | ||
|
|
5b37b63d89 | ||
|
|
5e5c4f7701 | ||
|
|
09dc638652 | ||
|
|
b598a01e7f | ||
|
|
0fcdb6f824 | ||
|
|
5a95dcf5ba | ||
|
|
d0c63e75df | ||
|
|
676b8a2230 |
6
.github/pull_request_template.md
vendored
6
.github/pull_request_template.md
vendored
@@ -1,3 +1,7 @@
|
||||
1. Purpose or design rationale of this PR
|
||||
|
||||
|
||||
2. Does this PR involve a new deployment, and involve a new git tag & docker image tag? If so, has `tag` in `common/version.go` been updated?
|
||||
3. Is this PR a breaking change? If so, have it been attached a `breaking-change` label?
|
||||
|
||||
|
||||
3. Is this PR a breaking change? If so, have it been attached a `breaking-change` label?
|
||||
|
||||
38
Jenkinsfile
vendored
38
Jenkinsfile
vendored
@@ -17,18 +17,6 @@ pipeline {
|
||||
}
|
||||
stages {
|
||||
stage('Build') {
|
||||
when {
|
||||
anyOf {
|
||||
changeset "Jenkinsfile"
|
||||
changeset "build/**"
|
||||
changeset "go.work**"
|
||||
changeset "bridge/**"
|
||||
changeset "coordinator/**"
|
||||
changeset "common/**"
|
||||
changeset "database/**"
|
||||
changeset "tests/**"
|
||||
}
|
||||
}
|
||||
parallel {
|
||||
stage('Build Prerequisite') {
|
||||
steps {
|
||||
@@ -70,18 +58,6 @@ pipeline {
|
||||
}
|
||||
}
|
||||
stage('Parallel Test') {
|
||||
when {
|
||||
anyOf {
|
||||
changeset "Jenkinsfile"
|
||||
changeset "build/**"
|
||||
changeset "go.work**"
|
||||
changeset "bridge/**"
|
||||
changeset "coordinator/**"
|
||||
changeset "common/**"
|
||||
changeset "database/**"
|
||||
changeset "tests/**"
|
||||
}
|
||||
}
|
||||
parallel{
|
||||
stage('Test bridge package') {
|
||||
steps {
|
||||
@@ -126,24 +102,12 @@ pipeline {
|
||||
}
|
||||
}
|
||||
stage('Compare Coverage') {
|
||||
when {
|
||||
anyOf {
|
||||
changeset "Jenkinsfile"
|
||||
changeset "build/**"
|
||||
changeset "go.work**"
|
||||
changeset "bridge/**"
|
||||
changeset "coordinator/**"
|
||||
changeset "common/**"
|
||||
changeset "database/**"
|
||||
changeset "tests/**"
|
||||
}
|
||||
}
|
||||
steps {
|
||||
sh "./build/post-test-report-coverage.sh"
|
||||
script {
|
||||
currentBuild.result = 'SUCCESS'
|
||||
}
|
||||
step([$class: 'CompareCoverageAction', publishResultAs: 'statusCheck', scmVars: [GIT_URL: env.GIT_URL]])
|
||||
step([$class: 'CompareCoverageAction', publishResultAs: 'Comment', scmVars: [GIT_URL: env.GIT_URL]])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,15 +72,20 @@ func NewLayer1Relayer(ctx context.Context, ethClient *ethclient.Client, l1Confir
|
||||
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
|
||||
func (r *Layer1Relayer) ProcessSavedEvents() {
|
||||
// msgs are sorted by nonce in increasing order
|
||||
msgs, err := r.db.GetL1MessagesByStatus(orm.MsgPending)
|
||||
msgs, err := r.db.GetL1MessagesByStatus(orm.MsgPending, 100)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch unprocessed L1 messages", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(msgs) > 0 {
|
||||
log.Info("Processing L1 messages", "count", len(msgs))
|
||||
}
|
||||
|
||||
for _, msg := range msgs {
|
||||
if err = r.processSavedEvent(msg); err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("failed to process event", "err", err)
|
||||
log.Error("failed to process event", "msg.msgHash", msg.MsgHash, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -108,7 +113,13 @@ func (r *Layer1Relayer) processSavedEvent(msg *orm.L1Message) error {
|
||||
return err
|
||||
}
|
||||
|
||||
hash, err := r.sender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
|
||||
hash, err := r.sender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data, 0)
|
||||
if err != nil && err.Error() == "execution reverted: Message expired" {
|
||||
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, orm.MsgExpired)
|
||||
}
|
||||
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
|
||||
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, orm.MsgConfirmed)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -123,6 +134,8 @@ func (r *Layer1Relayer) processSavedEvent(msg *orm.L1Message) error {
|
||||
|
||||
// Start the relayer process
|
||||
func (r *Layer1Relayer) Start() {
|
||||
log.Info("Starting l1/relayer")
|
||||
|
||||
go func() {
|
||||
// trigger by timer
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
|
||||
@@ -81,6 +81,8 @@ func NewWatcher(ctx context.Context, client *ethclient.Client, startHeight uint6
|
||||
|
||||
// Start the Watcher module.
|
||||
func (w *Watcher) Start() {
|
||||
log.Info("Starting l1/watcher")
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
@@ -199,10 +201,10 @@ func (w *Watcher) FetchContractEvent(blockHeight uint64) error {
|
||||
for _, msg := range relayedMessageEvents {
|
||||
if msg.isSuccessful {
|
||||
// succeed
|
||||
err = w.db.UpdateLayer1StatusAndLayer2Hash(w.ctx, msg.msgHash.String(), orm.MsgConfirmed, msg.txHash.String())
|
||||
err = w.db.UpdateLayer2StatusAndLayer1Hash(w.ctx, msg.msgHash.String(), orm.MsgConfirmed, msg.txHash.String())
|
||||
} else {
|
||||
// failed
|
||||
err = w.db.UpdateLayer1StatusAndLayer2Hash(w.ctx, msg.msgHash.String(), orm.MsgFailed, msg.txHash.String())
|
||||
err = w.db.UpdateLayer2StatusAndLayer1Hash(w.ctx, msg.msgHash.String(), orm.MsgFailed, msg.txHash.String())
|
||||
}
|
||||
if err != nil {
|
||||
log.Error("Failed to update layer1 status and layer2 hash", "err", err)
|
||||
@@ -250,7 +252,7 @@ func (w *Watcher) parseBridgeEventLogs(logs []types.Log) ([]*orm.L1Message, []re
|
||||
event.Target = common.HexToAddress(vLog.Topics[1].String())
|
||||
l1Messages = append(l1Messages, &orm.L1Message{
|
||||
Nonce: event.MessageNonce.Uint64(),
|
||||
MsgHash: utils.ComputeMessageHash(event.Target, event.Sender, event.Value, event.Fee, event.Deadline, event.Message, event.MessageNonce).String(),
|
||||
MsgHash: utils.ComputeMessageHash(event.Sender, event.Target, event.Value, event.Fee, event.Deadline, event.Message, event.MessageNonce).String(),
|
||||
Height: vLog.BlockNumber,
|
||||
Sender: event.Sender.String(),
|
||||
Value: event.Value.String(),
|
||||
|
||||
@@ -40,8 +40,7 @@ func newBatchProposer(cfg *config.BatchProposerConfig, orm database.OrmFactory)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *batchProposer) tryProposeBatch(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
func (w *batchProposer) tryProposeBatch() {
|
||||
w.mutex.Lock()
|
||||
defer w.mutex.Unlock()
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"math/big"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/core/types"
|
||||
@@ -50,10 +49,7 @@ func testBatchProposer(t *testing.T) {
|
||||
BatchTimeSec: 1,
|
||||
BatchBlocksLimit: 100,
|
||||
}, db)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
proposer.tryProposeBatch(&wg)
|
||||
wg.Wait()
|
||||
proposer.tryProposeBatch()
|
||||
|
||||
infos, err := db.GetUnbatchedBlocks(map[string]interface{}{},
|
||||
fmt.Sprintf("order by number ASC LIMIT %d", 100))
|
||||
|
||||
@@ -77,6 +77,7 @@ func TestFunction(t *testing.T) {
|
||||
t.Run("TestL2RelayerProcessSaveEvents", testL2RelayerProcessSaveEvents)
|
||||
t.Run("testL2RelayerProcessPendingBatches", testL2RelayerProcessPendingBatches)
|
||||
t.Run("testL2RelayerProcessCommittedBatches", testL2RelayerProcessCommittedBatches)
|
||||
t.Run("testL2RelayerSkipBatches", testL2RelayerSkipBatches)
|
||||
|
||||
t.Run("testBatchProposer", testBatchProposer)
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package l2
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"runtime"
|
||||
"sync"
|
||||
@@ -92,6 +93,8 @@ func NewLayer2Relayer(ctx context.Context, db database.OrmFactory, cfg *config.R
|
||||
}, nil
|
||||
}
|
||||
|
||||
const processMsgLimit = 100
|
||||
|
||||
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
|
||||
func (r *Layer2Relayer) ProcessSavedEvents(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
@@ -102,7 +105,11 @@ func (r *Layer2Relayer) ProcessSavedEvents(wg *sync.WaitGroup) {
|
||||
}
|
||||
|
||||
// msgs are sorted by nonce in increasing order
|
||||
msgs, err := r.db.GetL2MessagesByStatusUpToHeight(orm.MsgPending, batch.EndBlockNumber)
|
||||
msgs, err := r.db.GetL2Messages(
|
||||
map[string]interface{}{"status": orm.MsgPending},
|
||||
fmt.Sprintf("AND height<=%d", batch.EndBlockNumber),
|
||||
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch unprocessed L2 messages", "err", err)
|
||||
@@ -159,7 +166,13 @@ func (r *Layer2Relayer) processSavedEvent(msg *orm.L2Message, index uint64) erro
|
||||
return err
|
||||
}
|
||||
|
||||
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data)
|
||||
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data, 0)
|
||||
if err != nil && err.Error() == "execution reverted: Message expired" {
|
||||
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgExpired)
|
||||
}
|
||||
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
|
||||
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, orm.MsgConfirmed)
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send relayMessageWithProof tx to layer1 ", "msg.height", msg.Height, "msg.MsgHash", msg.MsgHash, "err", err)
|
||||
@@ -183,7 +196,7 @@ func (r *Layer2Relayer) processSavedEvent(msg *orm.L2Message, index uint64) erro
|
||||
func (r *Layer2Relayer) ProcessPendingBatches(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
// batches are sorted by batch index in increasing order
|
||||
batchesInDB, err := r.db.GetPendingBatches()
|
||||
batchesInDB, err := r.db.GetPendingBatches(1)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch pending L2 batches", "err", err)
|
||||
return
|
||||
@@ -256,7 +269,26 @@ func (r *Layer2Relayer) ProcessPendingBatches(wg *sync.WaitGroup) {
|
||||
|
||||
txID := id + "-commit"
|
||||
// add suffix `-commit` to avoid duplication with finalize tx in unit tests
|
||||
hash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
|
||||
hash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data, 0)
|
||||
|
||||
if err != nil && err.Error() == "execution reverted: Parent batch hasn't been committed" {
|
||||
|
||||
// check parent is committing
|
||||
batches, err = r.db.GetBlockBatches(map[string]interface{}{"end_block_hash": batch.ParentHash})
|
||||
if err != nil || len(batches) == 0 {
|
||||
log.Error("Failed to get parent batch from db", "batch_id", id, "parent_hash", batch.ParentHash, "err", err)
|
||||
return
|
||||
}
|
||||
parentBatch := batches[0]
|
||||
|
||||
if parentBatch.RollupStatus >= orm.RollupCommitting {
|
||||
// retry with manual gas estimation
|
||||
gasLimit := estimateCommitBatchGas(len(data), len(layer2Batch.Blocks))
|
||||
hash, err = r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data, gasLimit)
|
||||
log.Info("commitBatch tx resent with manual gas estimation ", "id", id, "index", batch.Index, "gasLimit", gasLimit, "hash", hash.String(), "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send commitBatch tx to layer1 ", "id", id, "index", batch.Index, "err", err)
|
||||
@@ -276,8 +308,17 @@ func (r *Layer2Relayer) ProcessPendingBatches(wg *sync.WaitGroup) {
|
||||
// ProcessCommittedBatches submit proof to layer 1 rollup contract
|
||||
func (r *Layer2Relayer) ProcessCommittedBatches(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
// set skipped batches in a single db operation
|
||||
if count, err := r.db.UpdateSkippedBatches(); err != nil {
|
||||
log.Error("UpdateSkippedBatches failed", "err", err)
|
||||
// continue anyway
|
||||
} else if count > 0 {
|
||||
log.Info("Skipping batches", "count", count)
|
||||
}
|
||||
|
||||
// batches are sorted by batch index in increasing order
|
||||
batches, err := r.db.GetCommittedBatches()
|
||||
batches, err := r.db.GetCommittedBatches(1)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch committed L2 batches", "err", err)
|
||||
return
|
||||
@@ -305,6 +346,8 @@ func (r *Layer2Relayer) ProcessCommittedBatches(wg *sync.WaitGroup) {
|
||||
return
|
||||
|
||||
case orm.ProvingTaskFailed, orm.ProvingTaskSkipped:
|
||||
// note: this is covered by UpdateSkippedBatches, but we keep it for completeness's sake
|
||||
|
||||
if err = r.db.UpdateRollupStatus(r.ctx, id, orm.RollupFinalizationSkipped); err != nil {
|
||||
log.Warn("UpdateRollupStatus failed", "id", id, "err", err)
|
||||
}
|
||||
@@ -351,7 +394,7 @@ func (r *Layer2Relayer) ProcessCommittedBatches(wg *sync.WaitGroup) {
|
||||
|
||||
txID := id + "-finalize"
|
||||
// add suffix `-finalize` to avoid duplication with commit tx in unit tests
|
||||
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data)
|
||||
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data, 0)
|
||||
hash := &txHash
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
@@ -378,6 +421,8 @@ func (r *Layer2Relayer) ProcessCommittedBatches(wg *sync.WaitGroup) {
|
||||
|
||||
// Start the relayer process
|
||||
func (r *Layer2Relayer) Start() {
|
||||
log.Info("Starting l2/relayer")
|
||||
|
||||
go func() {
|
||||
// trigger by timer
|
||||
ticker := time.NewTicker(time.Second)
|
||||
@@ -449,3 +494,12 @@ func (r *Layer2Relayer) handleConfirmation(confirmation *sender.Confirmation) {
|
||||
}
|
||||
log.Info("transaction confirmed in layer1", "type", transactionType, "confirmation", confirmation)
|
||||
}
|
||||
|
||||
func estimateCommitBatchGas(callDataLength int, numBlocks int) uint64 {
|
||||
gasLimit := uint64(0)
|
||||
gasLimit += 16 * uint64(callDataLength) // calldata cost
|
||||
gasLimit += 4*2100 + 3*22100 // fixed cost per batch
|
||||
gasLimit += 4 * 22100 * uint64(numBlocks) // cost per block in batch
|
||||
gasLimit = gasLimit * 12 / 10 // apply multiplier
|
||||
return gasLimit
|
||||
}
|
||||
|
||||
@@ -203,3 +203,71 @@ func testL2RelayerProcessCommittedBatches(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, orm.RollupFinalizing, status)
|
||||
}
|
||||
|
||||
func testL2RelayerSkipBatches(t *testing.T) {
|
||||
// Create db handler and reset db.
|
||||
db, err := database.NewOrmFactory(cfg.DBConfig)
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, migrate.ResetDB(db.GetDB().DB))
|
||||
defer db.Close()
|
||||
|
||||
l2Cfg := cfg.L2Config
|
||||
relayer, err := NewLayer2Relayer(context.Background(), db, l2Cfg.RelayerConfig)
|
||||
assert.NoError(t, err)
|
||||
defer relayer.Stop()
|
||||
|
||||
createBatch := func(rollupStatus orm.RollupStatus, provingStatus orm.ProvingStatus) string {
|
||||
dbTx, err := db.Beginx()
|
||||
assert.NoError(t, err)
|
||||
batchID, err := db.NewBatchInDBTx(dbTx, &orm.BlockInfo{}, &orm.BlockInfo{}, "0", 1, 194676) // startBlock & endBlock & parentHash & totalTxNum & totalL2Gas don't really matter here
|
||||
assert.NoError(t, err)
|
||||
err = dbTx.Commit()
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = db.UpdateRollupStatus(context.Background(), batchID, rollupStatus)
|
||||
assert.NoError(t, err)
|
||||
|
||||
tProof := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}
|
||||
tInstanceCommitments := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}
|
||||
err = db.UpdateProofByID(context.Background(), batchID, tProof, tInstanceCommitments, 100)
|
||||
assert.NoError(t, err)
|
||||
err = db.UpdateProvingStatus(batchID, provingStatus)
|
||||
assert.NoError(t, err)
|
||||
|
||||
return batchID
|
||||
}
|
||||
|
||||
skipped := []string{
|
||||
createBatch(orm.RollupCommitted, orm.ProvingTaskSkipped),
|
||||
createBatch(orm.RollupCommitted, orm.ProvingTaskFailed),
|
||||
}
|
||||
|
||||
notSkipped := []string{
|
||||
createBatch(orm.RollupPending, orm.ProvingTaskSkipped),
|
||||
createBatch(orm.RollupCommitting, orm.ProvingTaskSkipped),
|
||||
createBatch(orm.RollupFinalizing, orm.ProvingTaskSkipped),
|
||||
createBatch(orm.RollupFinalized, orm.ProvingTaskSkipped),
|
||||
createBatch(orm.RollupPending, orm.ProvingTaskFailed),
|
||||
createBatch(orm.RollupCommitting, orm.ProvingTaskFailed),
|
||||
createBatch(orm.RollupFinalizing, orm.ProvingTaskFailed),
|
||||
createBatch(orm.RollupFinalized, orm.ProvingTaskFailed),
|
||||
createBatch(orm.RollupCommitted, orm.ProvingTaskVerified),
|
||||
}
|
||||
|
||||
var wg = sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
relayer.ProcessCommittedBatches(&wg)
|
||||
wg.Wait()
|
||||
|
||||
for _, id := range skipped {
|
||||
status, err := db.GetRollupStatus(id)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, orm.RollupFinalizationSkipped, status)
|
||||
}
|
||||
|
||||
for _, id := range notSkipped {
|
||||
status, err := db.GetRollupStatus(id)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, orm.RollupFinalizationSkipped, status)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"math/big"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
geth "github.com/scroll-tech/go-ethereum"
|
||||
@@ -77,41 +76,91 @@ func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmat
|
||||
|
||||
// Start the Listening process
|
||||
func (w *WatcherClient) Start() {
|
||||
log.Info("Starting l2/watcher")
|
||||
|
||||
go func() {
|
||||
if reflect.ValueOf(w.orm).IsNil() {
|
||||
panic("must run L2 watcher with DB")
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
ctx, cancel := context.WithCancel(w.ctx)
|
||||
|
||||
for ; true; <-ticker.C {
|
||||
select {
|
||||
case <-w.stopCh:
|
||||
return
|
||||
// trace fetcher loop
|
||||
go func(ctx context.Context) {
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
default:
|
||||
// get current height
|
||||
number, err := w.BlockNumber(w.ctx)
|
||||
if err != nil {
|
||||
log.Error("failed to get_BlockNumber", "err", err)
|
||||
continue
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
// get current height
|
||||
number, err := w.BlockNumber(ctx)
|
||||
if err != nil {
|
||||
log.Error("failed to get_BlockNumber", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if number >= w.confirmations {
|
||||
number = number - w.confirmations
|
||||
} else {
|
||||
number = 0
|
||||
}
|
||||
|
||||
w.tryFetchRunningMissingBlocks(ctx, number)
|
||||
}
|
||||
|
||||
if number >= w.confirmations {
|
||||
number = number - w.confirmations
|
||||
} else {
|
||||
number = 0
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(3)
|
||||
go w.tryFetchRunningMissingBlocks(w.ctx, &wg, number)
|
||||
go w.fetchContractEvent(&wg, number)
|
||||
go w.batchProposer.tryProposeBatch(&wg)
|
||||
wg.Wait()
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
|
||||
// event fetcher loop
|
||||
go func(ctx context.Context) {
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
// get current height
|
||||
number, err := w.BlockNumber(ctx)
|
||||
if err != nil {
|
||||
log.Error("failed to get_BlockNumber", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if number >= w.confirmations {
|
||||
number = number - w.confirmations
|
||||
} else {
|
||||
number = 0
|
||||
}
|
||||
|
||||
w.FetchContractEvent(number)
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
|
||||
// batch proposer loop
|
||||
go func(ctx context.Context) {
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
w.batchProposer.tryProposeBatch()
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
|
||||
<-w.stopCh
|
||||
cancel()
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -123,8 +172,7 @@ func (w *WatcherClient) Stop() {
|
||||
const blockTracesFetchLimit = uint64(10)
|
||||
|
||||
// try fetch missing blocks if inconsistent
|
||||
func (w *WatcherClient) tryFetchRunningMissingBlocks(ctx context.Context, wg *sync.WaitGroup, blockHeight uint64) {
|
||||
defer wg.Done()
|
||||
func (w *WatcherClient) tryFetchRunningMissingBlocks(ctx context.Context, blockHeight uint64) {
|
||||
// Get newest block in DB. must have blocks at that time.
|
||||
// Don't use "block_trace" table "trace" column's BlockTrace.Number,
|
||||
// because it might be empty if the corresponding rollup_result is finalized/finalization_skipped
|
||||
@@ -181,8 +229,7 @@ func (w *WatcherClient) getAndStoreBlockTraces(ctx context.Context, from, to uin
|
||||
const contractEventsBlocksFetchLimit = int64(10)
|
||||
|
||||
// FetchContractEvent pull latest event logs from given contract address and save in DB
|
||||
func (w *WatcherClient) fetchContractEvent(wg *sync.WaitGroup, blockHeight uint64) {
|
||||
defer wg.Done()
|
||||
func (w *WatcherClient) FetchContractEvent(blockHeight uint64) {
|
||||
defer func() {
|
||||
log.Info("l2 watcher fetchContractEvent", "w.processedMsgHeight", w.processedMsgHeight)
|
||||
}()
|
||||
@@ -282,7 +329,7 @@ func (w *WatcherClient) parseBridgeEventLogs(logs []types.Log) ([]*orm.L2Message
|
||||
event.Target = common.HexToAddress(vLog.Topics[1].String())
|
||||
l2Messages = append(l2Messages, &orm.L2Message{
|
||||
Nonce: event.MessageNonce.Uint64(),
|
||||
MsgHash: utils.ComputeMessageHash(event.Target, event.Sender, event.Value, event.Fee, event.Deadline, event.Message, event.MessageNonce).String(),
|
||||
MsgHash: utils.ComputeMessageHash(event.Sender, event.Target, event.Value, event.Fee, event.Deadline, event.Message, event.MessageNonce).String(),
|
||||
Height: vLog.BlockNumber,
|
||||
Sender: event.Sender.String(),
|
||||
Value: event.Value.String(),
|
||||
|
||||
@@ -44,7 +44,7 @@ func testCreateNewWatcherAndStop(t *testing.T) {
|
||||
numTransactions := 3
|
||||
toAddress := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
|
||||
for i := 0; i < numTransactions; i++ {
|
||||
_, err = newSender.SendTransaction(strconv.Itoa(1000+i), &toAddress, big.NewInt(1000000000), nil)
|
||||
_, err = newSender.SendTransaction(strconv.Itoa(1000+i), &toAddress, big.NewInt(1000000000), nil, 0)
|
||||
assert.NoError(t, err)
|
||||
<-newSender.ConfirmChan()
|
||||
}
|
||||
@@ -112,7 +112,7 @@ func testMonitorBridgeContract(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
t.Log("Height in DB is", height)
|
||||
assert.Greater(t, height, int64(previousHeight))
|
||||
msgs, err := db.GetL2MessagesByStatus(orm.MsgPending)
|
||||
msgs, err := db.GetL2Messages(map[string]interface{}{"status": orm.MsgPending})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, len(msgs))
|
||||
}
|
||||
@@ -184,7 +184,7 @@ func testFetchMultipleSentMessageInOneBlock(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
t.Log("LatestHeight is", height)
|
||||
assert.Greater(t, height, int64(previousHeight)) // height must be greater than previousHeight because confirmations is 0
|
||||
msgs, err := db.GetL2MessagesByStatus(orm.MsgPending)
|
||||
msgs, err := db.GetL2Messages(map[string]interface{}{"status": orm.MsgPending})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 5, len(msgs))
|
||||
}
|
||||
|
||||
@@ -154,18 +154,21 @@ func (s *Sender) NumberOfAccounts() int {
|
||||
return len(s.auths.accounts)
|
||||
}
|
||||
|
||||
func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, value *big.Int, data []byte) (*FeeData, error) {
|
||||
// estimate gas limit
|
||||
gasLimit, err := s.client.EstimateGas(s.ctx, geth.CallMsg{From: auth.From, To: target, Value: value, Data: data})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, value *big.Int, data []byte, gasLimit uint64) (*FeeData, error) {
|
||||
if gasLimit == 0 {
|
||||
// estimate gas limit
|
||||
var err error
|
||||
gasLimit, err = s.client.EstimateGas(s.ctx, geth.CallMsg{From: auth.From, To: target, Value: value, Data: data})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
gasLimit = gasLimit * 15 / 10 // 50% extra gas to void out of gas error
|
||||
}
|
||||
gasLimit = gasLimit * 15 / 10 // 50% extra gas to void out of gas error
|
||||
|
||||
// @todo change it when Scroll enable EIP1559
|
||||
if s.config.TxType != DynamicFeeTxType {
|
||||
// estimate gas price
|
||||
var gasPrice *big.Int
|
||||
gasPrice, err = s.client.SuggestGasPrice(s.ctx)
|
||||
gasPrice, err := s.client.SuggestGasPrice(s.ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -189,7 +192,7 @@ func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, val
|
||||
}
|
||||
|
||||
// SendTransaction send a signed L2tL1 transaction.
|
||||
func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.Int, data []byte) (hash common.Hash, err error) {
|
||||
func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.Int, data []byte, gasLimit uint64) (hash common.Hash, err error) {
|
||||
// We occupy the ID, in case some other threads call with the same ID in the same time
|
||||
if _, loaded := s.pendingTxs.LoadOrStore(ID, nil); loaded {
|
||||
return common.Hash{}, fmt.Errorf("has the repeat tx ID, ID: %s", ID)
|
||||
@@ -213,9 +216,10 @@ func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.I
|
||||
tx *types.Transaction
|
||||
)
|
||||
// estimate gas fee
|
||||
if feeData, err = s.getFeeData(auth, target, value, data); err != nil {
|
||||
if feeData, err = s.getFeeData(auth, target, value, data, gasLimit); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if tx, err = s.createAndSendTx(auth, feeData, target, value, data, nil); err == nil {
|
||||
// add pending transaction to queue
|
||||
pending := &PendingTransaction{
|
||||
|
||||
@@ -87,7 +87,7 @@ func testBatchSender(t *testing.T, batchSize int) {
|
||||
for i := 0; i < TXBatch; i++ {
|
||||
toAddr := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
|
||||
id := strconv.Itoa(i + index*1000)
|
||||
_, err := newSender.SendTransaction(id, &toAddr, big.NewInt(1), nil)
|
||||
_, err := newSender.SendTransaction(id, &toAddr, big.NewInt(1), nil, 0)
|
||||
if errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
<-time.After(time.Second)
|
||||
continue
|
||||
|
||||
@@ -55,16 +55,20 @@ func setupEnv(t *testing.T) {
|
||||
var err error
|
||||
privateKey, err = crypto.ToECDSA(common.FromHex("1212121212121212121212121212121212121212121212121212121212121212"))
|
||||
assert.NoError(t, err)
|
||||
messagePrivateKey, err := crypto.ToECDSA(common.FromHex("1212121212121212121212121212121212121212121212121212121212121213"))
|
||||
assert.NoError(t, err)
|
||||
rollupPrivateKey, err := crypto.ToECDSA(common.FromHex("1212121212121212121212121212121212121212121212121212121212121214"))
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Load config.
|
||||
cfg, err = config.NewConfig("../config.json")
|
||||
assert.NoError(t, err)
|
||||
cfg.L1Config.Confirmations = 0
|
||||
cfg.L1Config.RelayerConfig.MessageSenderPrivateKeys = []*ecdsa.PrivateKey{privateKey}
|
||||
cfg.L1Config.RelayerConfig.RollupSenderPrivateKeys = []*ecdsa.PrivateKey{privateKey}
|
||||
cfg.L1Config.RelayerConfig.MessageSenderPrivateKeys = []*ecdsa.PrivateKey{messagePrivateKey}
|
||||
cfg.L1Config.RelayerConfig.RollupSenderPrivateKeys = []*ecdsa.PrivateKey{rollupPrivateKey}
|
||||
cfg.L2Config.Confirmations = 0
|
||||
cfg.L2Config.RelayerConfig.MessageSenderPrivateKeys = []*ecdsa.PrivateKey{privateKey}
|
||||
cfg.L2Config.RelayerConfig.RollupSenderPrivateKeys = []*ecdsa.PrivateKey{privateKey}
|
||||
cfg.L2Config.RelayerConfig.MessageSenderPrivateKeys = []*ecdsa.PrivateKey{messagePrivateKey}
|
||||
cfg.L2Config.RelayerConfig.RollupSenderPrivateKeys = []*ecdsa.PrivateKey{rollupPrivateKey}
|
||||
|
||||
// Create l1geth container.
|
||||
l1gethImg = docker.NewTestL1Docker(t)
|
||||
@@ -89,6 +93,47 @@ func setupEnv(t *testing.T) {
|
||||
// Create l1 and l2 auth
|
||||
l1Auth = prepareAuth(t, l1Client, privateKey)
|
||||
l2Auth = prepareAuth(t, l2Client, privateKey)
|
||||
|
||||
// send some balance to message and rollup sender
|
||||
transferEther(t, l1Auth, l1Client, messagePrivateKey)
|
||||
transferEther(t, l1Auth, l1Client, rollupPrivateKey)
|
||||
transferEther(t, l2Auth, l2Client, messagePrivateKey)
|
||||
transferEther(t, l2Auth, l2Client, rollupPrivateKey)
|
||||
}
|
||||
|
||||
func transferEther(t *testing.T, auth *bind.TransactOpts, client *ethclient.Client, privateKey *ecdsa.PrivateKey) {
|
||||
targetAddress := crypto.PubkeyToAddress(privateKey.PublicKey)
|
||||
|
||||
gasPrice, err := client.SuggestGasPrice(context.Background())
|
||||
assert.NoError(t, err)
|
||||
gasPrice.Mul(gasPrice, big.NewInt(2))
|
||||
|
||||
// Get pending nonce
|
||||
nonce, err := client.PendingNonceAt(context.Background(), auth.From)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// 200 ether should be enough
|
||||
value, ok := big.NewInt(0).SetString("0xad78ebc5ac6200000", 0)
|
||||
assert.Equal(t, ok, true)
|
||||
|
||||
tx := types.NewTx(&types.LegacyTx{
|
||||
Nonce: nonce,
|
||||
To: &targetAddress,
|
||||
Value: value,
|
||||
Gas: 500000,
|
||||
GasPrice: gasPrice,
|
||||
})
|
||||
signedTx, err := auth.Signer(auth.From, tx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = client.SendTransaction(context.Background(), signedTx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
receipt, err := bind.WaitMined(context.Background(), client, signedTx)
|
||||
assert.NoError(t, err)
|
||||
if receipt.Status != types.ReceiptStatusSuccessful {
|
||||
t.Fatalf("Call failed")
|
||||
}
|
||||
}
|
||||
|
||||
func free(t *testing.T) {
|
||||
@@ -150,6 +195,9 @@ func TestFunction(t *testing.T) {
|
||||
// l1 rollup and watch rollup events
|
||||
t.Run("TestCommitBatchAndFinalizeBatch", testCommitBatchAndFinalizeBatch)
|
||||
|
||||
// l2 message
|
||||
t.Run("testRelayL2MessageSucceed", testRelayL2MessageSucceed)
|
||||
|
||||
t.Cleanup(func() {
|
||||
free(t)
|
||||
})
|
||||
|
||||
175
bridge/tests/l2_message_relay_test.go
Normal file
175
bridge/tests/l2_message_relay_test.go
Normal file
@@ -0,0 +1,175 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/big"
|
||||
"scroll-tech/database"
|
||||
"scroll-tech/database/migrate"
|
||||
"scroll-tech/database/orm"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"scroll-tech/bridge/l1"
|
||||
"scroll-tech/bridge/l2"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/accounts/abi/bind"
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/core/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func testRelayL2MessageSucceed(t *testing.T) {
|
||||
// Create db handler and reset db.
|
||||
db, err := database.NewOrmFactory(cfg.DBConfig)
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, migrate.ResetDB(db.GetDB().DB))
|
||||
defer db.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(3)
|
||||
|
||||
prepareContracts(t)
|
||||
|
||||
// Create L2Relayer
|
||||
l2Cfg := cfg.L2Config
|
||||
l2Relayer, err := l2.NewLayer2Relayer(context.Background(), db, l2Cfg.RelayerConfig)
|
||||
assert.NoError(t, err)
|
||||
defer l2Relayer.Stop()
|
||||
|
||||
// Create L2Watcher
|
||||
l2Watcher := l2.NewL2WatcherClient(context.Background(), l2Client, 0, l2Cfg.BatchProposerConfig, l2Cfg.L2MessengerAddress, db)
|
||||
|
||||
// Create L1Watcher
|
||||
l1Cfg := cfg.L1Config
|
||||
l1Watcher := l1.NewWatcher(context.Background(), l1Client, 0, 0, l1Cfg.L1MessengerAddress, l1Cfg.RollupContractAddress, db)
|
||||
|
||||
// send message through l2 messenger contract
|
||||
nonce, err := l2MessengerInstance.MessageNonce(&bind.CallOpts{})
|
||||
assert.NoError(t, err)
|
||||
sendTx, err := l2MessengerInstance.SendMessage(l2Auth, l1Auth.From, big.NewInt(0), common.Hex2Bytes("00112233"), big.NewInt(0))
|
||||
assert.NoError(t, err)
|
||||
sendReceipt, err := bind.WaitMined(context.Background(), l2Client, sendTx)
|
||||
assert.NoError(t, err)
|
||||
if sendReceipt.Status != types.ReceiptStatusSuccessful || err != nil {
|
||||
t.Fatalf("Call failed")
|
||||
}
|
||||
|
||||
// l2 watch process events
|
||||
l2Watcher.FetchContractEvent(sendReceipt.BlockNumber.Uint64())
|
||||
|
||||
// check db status
|
||||
msg, err := db.GetL2MessageByNonce(nonce.Uint64())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, msg.Status, orm.MsgPending)
|
||||
assert.Equal(t, msg.Sender, l2Auth.From.String())
|
||||
assert.Equal(t, msg.Target, l1Auth.From.String())
|
||||
|
||||
// add fake blocks
|
||||
traces := []*types.BlockTrace{
|
||||
{
|
||||
Header: &types.Header{
|
||||
Number: sendReceipt.BlockNumber,
|
||||
ParentHash: common.Hash{},
|
||||
Difficulty: big.NewInt(0),
|
||||
BaseFee: big.NewInt(0),
|
||||
},
|
||||
StorageTrace: &types.StorageTrace{},
|
||||
},
|
||||
}
|
||||
err = db.InsertBlockTraces(traces)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// add fake batch
|
||||
dbTx, err := db.Beginx()
|
||||
assert.NoError(t, err)
|
||||
batchID, err := db.NewBatchInDBTx(dbTx,
|
||||
&orm.BlockInfo{
|
||||
Number: traces[0].Header.Number.Uint64(),
|
||||
Hash: traces[0].Header.Hash().String(),
|
||||
ParentHash: traces[0].Header.ParentHash.String(),
|
||||
},
|
||||
&orm.BlockInfo{
|
||||
Number: traces[0].Header.Number.Uint64(),
|
||||
Hash: traces[0].Header.Hash().String(),
|
||||
ParentHash: traces[0].Header.ParentHash.String(),
|
||||
},
|
||||
traces[0].Header.ParentHash.String(), 1, 194676)
|
||||
assert.NoError(t, err)
|
||||
err = db.SetBatchIDForBlocksInDBTx(dbTx, []uint64{
|
||||
traces[0].Header.Number.Uint64(),
|
||||
traces[0].Header.Number.Uint64()}, batchID)
|
||||
assert.NoError(t, err)
|
||||
err = dbTx.Commit()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// add dummy proof
|
||||
tProof := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}
|
||||
tInstanceCommitments := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}
|
||||
err = db.UpdateProofByID(context.Background(), batchID, tProof, tInstanceCommitments, 100)
|
||||
assert.NoError(t, err)
|
||||
err = db.UpdateProvingStatus(batchID, orm.ProvingTaskVerified)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// process pending batch and check status
|
||||
l2Relayer.ProcessPendingBatches(&wg)
|
||||
status, err := db.GetRollupStatus(batchID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, orm.RollupCommitting, status)
|
||||
commitTxHash, err := db.GetCommitTxHash(batchID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, true, commitTxHash.Valid)
|
||||
commitTx, _, err := l1Client.TransactionByHash(context.Background(), common.HexToHash(commitTxHash.String))
|
||||
assert.NoError(t, err)
|
||||
commitTxReceipt, err := bind.WaitMined(context.Background(), l1Client, commitTx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(commitTxReceipt.Logs), 1)
|
||||
|
||||
// fetch CommitBatch rollup events
|
||||
err = l1Watcher.FetchContractEvent(commitTxReceipt.BlockNumber.Uint64())
|
||||
assert.NoError(t, err)
|
||||
status, err = db.GetRollupStatus(batchID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, orm.RollupCommitted, status)
|
||||
|
||||
// process committed batch and check status
|
||||
l2Relayer.ProcessCommittedBatches(&wg)
|
||||
status, err = db.GetRollupStatus(batchID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, orm.RollupFinalizing, status)
|
||||
finalizeTxHash, err := db.GetFinalizeTxHash(batchID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, true, finalizeTxHash.Valid)
|
||||
finalizeTx, _, err := l1Client.TransactionByHash(context.Background(), common.HexToHash(finalizeTxHash.String))
|
||||
assert.NoError(t, err)
|
||||
finalizeTxReceipt, err := bind.WaitMined(context.Background(), l1Client, finalizeTx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(finalizeTxReceipt.Logs), 1)
|
||||
|
||||
// fetch FinalizeBatch events
|
||||
err = l1Watcher.FetchContractEvent(finalizeTxReceipt.BlockNumber.Uint64())
|
||||
assert.NoError(t, err)
|
||||
status, err = db.GetRollupStatus(batchID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, orm.RollupFinalized, status)
|
||||
|
||||
// process l2 messages
|
||||
l2Relayer.ProcessSavedEvents(&wg)
|
||||
msg, err = db.GetL2MessageByNonce(nonce.Uint64())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, msg.Status, orm.MsgSubmitted)
|
||||
relayTxHash, err := db.GetRelayL2MessageTxHash(nonce.Uint64())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, true, relayTxHash.Valid)
|
||||
relayTx, _, err := l1Client.TransactionByHash(context.Background(), common.HexToHash(relayTxHash.String))
|
||||
assert.NoError(t, err)
|
||||
relayTxReceipt, err := bind.WaitMined(context.Background(), l1Client, relayTx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(relayTxReceipt.Logs), 1)
|
||||
|
||||
// fetch message relayed events
|
||||
err = l1Watcher.FetchContractEvent(relayTxReceipt.BlockNumber.Uint64())
|
||||
assert.NoError(t, err)
|
||||
msg, err = db.GetL2MessageByNonce(nonce.Uint64())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, msg.Status, orm.MsgConfirmed)
|
||||
}
|
||||
@@ -98,7 +98,6 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {
|
||||
assert.Equal(t, len(commitTxReceipt.Logs), 1)
|
||||
|
||||
// fetch rollup events
|
||||
assert.NoError(t, err)
|
||||
err = l1Watcher.FetchContractEvent(commitTxReceipt.BlockNumber.Uint64())
|
||||
assert.NoError(t, err)
|
||||
status, err = db.GetRollupStatus(batchID)
|
||||
@@ -131,7 +130,6 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {
|
||||
assert.Equal(t, len(finalizeTxReceipt.Logs), 1)
|
||||
|
||||
// fetch rollup events
|
||||
assert.NoError(t, err)
|
||||
err = l1Watcher.FetchContractEvent(finalizeTxReceipt.BlockNumber.Uint64())
|
||||
assert.NoError(t, err)
|
||||
status, err = db.GetRollupStatus(batchID)
|
||||
|
||||
@@ -20,8 +20,8 @@ func encodePacked(input ...[]byte) []byte {
|
||||
|
||||
// ComputeMessageHash compute the message hash
|
||||
func ComputeMessageHash(
|
||||
target common.Address,
|
||||
sender common.Address,
|
||||
target common.Address,
|
||||
value *big.Int,
|
||||
fee *big.Int,
|
||||
deadline *big.Int,
|
||||
@@ -29,8 +29,8 @@ func ComputeMessageHash(
|
||||
messageNonce *big.Int,
|
||||
) common.Hash {
|
||||
packed := encodePacked(
|
||||
target.Bytes(),
|
||||
sender.Bytes(),
|
||||
target.Bytes(),
|
||||
math.U256Bytes(value),
|
||||
math.U256Bytes(fee),
|
||||
math.U256Bytes(deadline),
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"scroll-tech/bridge/utils"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestKeccak2(t *testing.T) {
|
||||
@@ -28,15 +29,13 @@ func TestKeccak2(t *testing.T) {
|
||||
|
||||
func TestComputeMessageHash(t *testing.T) {
|
||||
hash := utils.ComputeMessageHash(
|
||||
common.HexToAddress("0xdafea492d9c6733ae3d56b7ed1adb60692c98bc5"),
|
||||
common.HexToAddress("0xeafea492d9c6733ae3d56b7ed1adb60692c98bf7"),
|
||||
big.NewInt(1),
|
||||
big.NewInt(2),
|
||||
big.NewInt(1234567),
|
||||
common.Hex2Bytes("0011223344"),
|
||||
big.NewInt(3),
|
||||
common.HexToAddress("0xd7227113b92e537aeda220d5a2f201b836e5879d"),
|
||||
common.HexToAddress("0x47c02b023b6787ef4e503df42bbb1a94f451a1c0"),
|
||||
big.NewInt(5000000000000000),
|
||||
big.NewInt(0),
|
||||
big.NewInt(1674204924),
|
||||
common.Hex2Bytes("8eaac8a30000000000000000000000007138b17fc82d7e954b3bd2f98d8166d03e5e569b0000000000000000000000007138b17fc82d7e954b3bd2f98d8166d03e5e569b0000000000000000000000000000000000000000000000000011c37937e0800000000000000000000000000000000000000000000000000000000000000000800000000000000000000000000000000000000000000000000000000000000000"),
|
||||
big.NewInt(30706),
|
||||
)
|
||||
if hash != common.HexToHash("0x58c9a5abfd2a558bb6a6fd5192b36fe9325d98763bafd3a51a1ea28a5d0b990b") {
|
||||
t.Fatalf("Invalid ComputeMessageHash, want %s, got %s", "0x58c9a5abfd2a558bb6a6fd5192b36fe9325d98763bafd3a51a1ea28a5d0b990b", hash.Hex())
|
||||
}
|
||||
assert.Equal(t, hash.String(), "0x920e59f62ca89a0f481d44961c55d299dd20c575693692d61fdf3ca579d8edf3")
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
var tag = "prealpha-v11.0"
|
||||
var tag = "prealpha-v11.14"
|
||||
|
||||
var commit = func() string {
|
||||
if info, ok := debug.ReadBuildInfo(); ok {
|
||||
|
||||
@@ -15,7 +15,7 @@ test:
|
||||
|
||||
libzkp:
|
||||
cd ../common/libzkp/impl && cargo build --release && cp ./target/release/libzkp.a ../interface/
|
||||
cp -r ../common/libzkp/interface ./verifier/lib
|
||||
rm -rf ./verifier/lib && cp -r ../common/libzkp/interface ./verifier/lib
|
||||
|
||||
coordinator: libzkp ## Builds the Coordinator instance.
|
||||
go build -ldflags "-X scroll-tech/common/version.ZkVersion=${ZK_VERSION}" -o $(PWD)/build/bin/coordinator ./cmd
|
||||
|
||||
@@ -65,7 +65,6 @@ type Manager struct {
|
||||
// A map containing proof failed or verify failed proof.
|
||||
rollerPool cmap.ConcurrentMap
|
||||
|
||||
// TODO: once put into use, should add to graceful restart.
|
||||
failedSessionInfos map[string]*SessionInfo
|
||||
|
||||
// A direct connection to the Halo2 verifier, used to verify
|
||||
@@ -326,54 +325,74 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
|
||||
// CollectProofs collects proofs corresponding to a proof generation session.
|
||||
func (m *Manager) CollectProofs(sess *session) {
|
||||
select {
|
||||
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
|
||||
m.mu.Lock()
|
||||
defer func() {
|
||||
delete(m.sessions, sess.info.ID)
|
||||
m.mu.Unlock()
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
|
||||
m.mu.Lock()
|
||||
defer func() {
|
||||
// TODO: remove the clean-up, rollers report healthy status.
|
||||
for pk := range sess.info.Rollers {
|
||||
m.freeTaskIDForRoller(pk, sess.info.ID)
|
||||
}
|
||||
delete(m.sessions, sess.info.ID)
|
||||
m.mu.Unlock()
|
||||
}()
|
||||
|
||||
// Pick a random winner.
|
||||
// First, round up the keys that actually sent in a valid proof.
|
||||
var participatingRollers []string
|
||||
for pk, roller := range sess.info.Rollers {
|
||||
if roller.Status == orm.RollerProofValid {
|
||||
participatingRollers = append(participatingRollers, pk)
|
||||
// Pick a random winner.
|
||||
// First, round up the keys that actually sent in a valid proof.
|
||||
var participatingRollers []string
|
||||
for pk, roller := range sess.info.Rollers {
|
||||
if roller.Status == orm.RollerProofValid {
|
||||
participatingRollers = append(participatingRollers, pk)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Ensure we got at least one proof before selecting a winner.
|
||||
if len(participatingRollers) == 0 {
|
||||
// record failed session.
|
||||
errMsg := "proof generation session ended without receiving any valid proofs"
|
||||
m.addFailedSession(sess, errMsg)
|
||||
log.Warn(errMsg, "session id", sess.info.ID)
|
||||
// Set status as skipped.
|
||||
// Note that this is only a workaround for testnet here.
|
||||
// TODO: In real cases we should reset to orm.ProvingTaskUnassigned
|
||||
// so as to re-distribute the task in the future
|
||||
if err := m.orm.UpdateProvingStatus(sess.info.ID, orm.ProvingTaskFailed); err != nil {
|
||||
log.Error("fail to reset task_status as Unassigned", "id", sess.info.ID, "err", err)
|
||||
// Ensure we got at least one proof before selecting a winner.
|
||||
if len(participatingRollers) == 0 {
|
||||
// record failed session.
|
||||
errMsg := "proof generation session ended without receiving any valid proofs"
|
||||
m.addFailedSession(sess, errMsg)
|
||||
log.Warn(errMsg, "session id", sess.info.ID)
|
||||
// Set status as skipped.
|
||||
// Note that this is only a workaround for testnet here.
|
||||
// TODO: In real cases we should reset to orm.ProvingTaskUnassigned
|
||||
// so as to re-distribute the task in the future
|
||||
if err := m.orm.UpdateProvingStatus(sess.info.ID, orm.ProvingTaskFailed); err != nil {
|
||||
log.Error("fail to reset task_status as Unassigned", "id", sess.info.ID, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Now, select a random index for this slice.
|
||||
randIndex := mathrand.Intn(len(participatingRollers))
|
||||
_ = participatingRollers[randIndex]
|
||||
// TODO: reward winner
|
||||
return
|
||||
}
|
||||
|
||||
// Now, select a random index for this slice.
|
||||
randIndex := mathrand.Intn(len(participatingRollers))
|
||||
_ = participatingRollers[randIndex]
|
||||
// TODO: reward winner
|
||||
return
|
||||
|
||||
case ret := <-sess.finishChan:
|
||||
m.mu.Lock()
|
||||
sess.info.Rollers[ret.pk].Status = ret.status
|
||||
m.mu.Unlock()
|
||||
if err := m.orm.SetSessionInfo(sess.info); err != nil {
|
||||
log.Error("db set session info fail", "pk", ret.pk, "error", err)
|
||||
case ret := <-sess.finishChan:
|
||||
m.mu.Lock()
|
||||
sess.info.Rollers[ret.pk].Status = ret.status
|
||||
if m.isSessionFailed(sess.info) {
|
||||
if err := m.orm.UpdateProvingStatus(ret.id, orm.ProvingTaskFailed); err != nil {
|
||||
log.Error("failed to update proving_status as failed", "msg.ID", ret.id, "error", err)
|
||||
}
|
||||
}
|
||||
if err := m.orm.SetSessionInfo(sess.info); err != nil {
|
||||
log.Error("db set session info fail", "pk", ret.pk, "error", err)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) isSessionFailed(info *orm.SessionInfo) bool {
|
||||
for _, roller := range info.Rollers {
|
||||
if roller.Status != orm.RollerProofInvalid {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// APIs collect API services.
|
||||
func (m *Manager) APIs() []rpc.API {
|
||||
return []rpc.API{
|
||||
@@ -435,6 +454,7 @@ func (m *Manager) StartProofGenerationSession(task *orm.BlockBatch) (success boo
|
||||
for i := 0; i < int(m.cfg.RollersPerSession); i++ {
|
||||
roller := m.selectRoller()
|
||||
if roller == nil {
|
||||
log.Info("selectRoller returns nil")
|
||||
break
|
||||
}
|
||||
log.Info("roller is picked", "session id", task.ID, "name", roller.Name, "public key", roller.PublicKey)
|
||||
|
||||
@@ -64,6 +64,8 @@ func TestApis(t *testing.T) {
|
||||
t.Run("TestHandshake", testHandshake)
|
||||
t.Run("TestFailedHandshake", testFailedHandshake)
|
||||
t.Run("TestSeveralConnections", testSeveralConnections)
|
||||
t.Run("TestValidProof", testValidProof)
|
||||
t.Run("TestInvalidProof", testInvalidProof)
|
||||
t.Run("TestIdleRollerSelection", testIdleRollerSelection)
|
||||
// TODO: Restart roller alone when received task, can add this test case in integration-test.
|
||||
//t.Run("TestRollerReconnect", testRollerReconnect)
|
||||
@@ -84,7 +86,7 @@ func testHandshake(t *testing.T) {
|
||||
|
||||
// Setup coordinator and ws server.
|
||||
wsURL := "ws://" + randomURL()
|
||||
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, wsURL)
|
||||
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
|
||||
defer func() {
|
||||
handler.Shutdown(context.Background())
|
||||
rollerManager.Stop()
|
||||
@@ -105,7 +107,7 @@ func testFailedHandshake(t *testing.T) {
|
||||
|
||||
// Setup coordinator and ws server.
|
||||
wsURL := "ws://" + randomURL()
|
||||
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, wsURL)
|
||||
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
|
||||
defer func() {
|
||||
handler.Shutdown(context.Background())
|
||||
rollerManager.Stop()
|
||||
@@ -171,7 +173,7 @@ func testSeveralConnections(t *testing.T) {
|
||||
|
||||
// Setup coordinator and ws server.
|
||||
wsURL := "ws://" + randomURL()
|
||||
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, wsURL)
|
||||
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
|
||||
defer func() {
|
||||
handler.Shutdown(context.Background())
|
||||
rollerManager.Stop()
|
||||
@@ -215,6 +217,124 @@ func testSeveralConnections(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
func testValidProof(t *testing.T) {
|
||||
// Create db handler and reset db.
|
||||
l2db, err := database.NewOrmFactory(cfg.DBConfig)
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, migrate.ResetDB(l2db.GetDB().DB))
|
||||
defer l2db.Close()
|
||||
|
||||
// Setup coordinator and ws server.
|
||||
wsURL := "ws://" + randomURL()
|
||||
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 3, wsURL)
|
||||
defer func() {
|
||||
handler.Shutdown(context.Background())
|
||||
rollerManager.Stop()
|
||||
}()
|
||||
|
||||
// create mock rollers.
|
||||
rollers := make([]*mockRoller, 3)
|
||||
for i := 0; i < len(rollers); i++ {
|
||||
rollers[i] = newMockRoller(t, "roller_test"+strconv.Itoa(i), wsURL)
|
||||
// only roller 0 submits valid proof.
|
||||
rollers[i].waitTaskAndSendProof(t, time.Second, false, i == 0)
|
||||
}
|
||||
defer func() {
|
||||
// close connection
|
||||
for _, roller := range rollers {
|
||||
roller.close()
|
||||
}
|
||||
}()
|
||||
assert.Equal(t, 3, rollerManager.GetNumberOfIdleRollers())
|
||||
|
||||
var ids = make([]string, 1)
|
||||
dbTx, err := l2db.Beginx()
|
||||
assert.NoError(t, err)
|
||||
for i := range ids {
|
||||
ID, err := l2db.NewBatchInDBTx(dbTx, &orm.BlockInfo{Number: uint64(i)}, &orm.BlockInfo{Number: uint64(i)}, "0f", 1, 194676)
|
||||
assert.NoError(t, err)
|
||||
ids[i] = ID
|
||||
}
|
||||
assert.NoError(t, dbTx.Commit())
|
||||
|
||||
// verify proof status
|
||||
var (
|
||||
tick = time.Tick(500 * time.Millisecond)
|
||||
tickStop = time.Tick(10 * time.Second)
|
||||
)
|
||||
for len(ids) > 0 {
|
||||
select {
|
||||
case <-tick:
|
||||
status, err := l2db.GetProvingStatusByID(ids[0])
|
||||
assert.NoError(t, err)
|
||||
if status == orm.ProvingTaskVerified {
|
||||
ids = ids[1:]
|
||||
}
|
||||
case <-tickStop:
|
||||
t.Error("failed to check proof status")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testInvalidProof(t *testing.T) {
|
||||
// Create db handler and reset db.
|
||||
l2db, err := database.NewOrmFactory(cfg.DBConfig)
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, migrate.ResetDB(l2db.GetDB().DB))
|
||||
defer l2db.Close()
|
||||
|
||||
// Setup coordinator and ws server.
|
||||
wsURL := "ws://" + randomURL()
|
||||
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 3, wsURL)
|
||||
defer func() {
|
||||
handler.Shutdown(context.Background())
|
||||
rollerManager.Stop()
|
||||
}()
|
||||
|
||||
// create mock rollers.
|
||||
rollers := make([]*mockRoller, 3)
|
||||
for i := 0; i < len(rollers); i++ {
|
||||
rollers[i] = newMockRoller(t, "roller_test"+strconv.Itoa(i), wsURL)
|
||||
rollers[i].waitTaskAndSendProof(t, time.Second, false, false)
|
||||
}
|
||||
defer func() {
|
||||
// close connection
|
||||
for _, roller := range rollers {
|
||||
roller.close()
|
||||
}
|
||||
}()
|
||||
assert.Equal(t, 3, rollerManager.GetNumberOfIdleRollers())
|
||||
|
||||
var ids = make([]string, 1)
|
||||
dbTx, err := l2db.Beginx()
|
||||
assert.NoError(t, err)
|
||||
for i := range ids {
|
||||
ID, err := l2db.NewBatchInDBTx(dbTx, &orm.BlockInfo{Number: uint64(i)}, &orm.BlockInfo{Number: uint64(i)}, "0f", 1, 194676)
|
||||
assert.NoError(t, err)
|
||||
ids[i] = ID
|
||||
}
|
||||
assert.NoError(t, dbTx.Commit())
|
||||
|
||||
// verify proof status
|
||||
var (
|
||||
tick = time.Tick(500 * time.Millisecond)
|
||||
tickStop = time.Tick(10 * time.Second)
|
||||
)
|
||||
for len(ids) > 0 {
|
||||
select {
|
||||
case <-tick:
|
||||
status, err := l2db.GetProvingStatusByID(ids[0])
|
||||
assert.NoError(t, err)
|
||||
if status == orm.ProvingTaskFailed {
|
||||
ids = ids[1:]
|
||||
}
|
||||
case <-tickStop:
|
||||
t.Error("failed to check proof status")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testIdleRollerSelection(t *testing.T) {
|
||||
// Create db handler and reset db.
|
||||
@@ -225,7 +345,7 @@ func testIdleRollerSelection(t *testing.T) {
|
||||
|
||||
// Setup coordinator and ws server.
|
||||
wsURL := "ws://" + randomURL()
|
||||
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, wsURL)
|
||||
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
|
||||
defer func() {
|
||||
handler.Shutdown(context.Background())
|
||||
rollerManager.Stop()
|
||||
@@ -235,7 +355,7 @@ func testIdleRollerSelection(t *testing.T) {
|
||||
rollers := make([]*mockRoller, 20)
|
||||
for i := 0; i < len(rollers); i++ {
|
||||
rollers[i] = newMockRoller(t, "roller_test"+strconv.Itoa(i), wsURL)
|
||||
rollers[i].waitTaskAndSendProof(t, time.Second, false)
|
||||
rollers[i].waitTaskAndSendProof(t, time.Second, false, true)
|
||||
}
|
||||
defer func() {
|
||||
// close connection
|
||||
@@ -294,12 +414,12 @@ func testGracefulRestart(t *testing.T) {
|
||||
|
||||
// Setup coordinator and ws server.
|
||||
wsURL := "ws://" + randomURL()
|
||||
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, wsURL)
|
||||
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
|
||||
|
||||
// create mock roller
|
||||
roller := newMockRoller(t, "roller_test", wsURL)
|
||||
// wait 10 seconds, coordinator restarts before roller submits proof
|
||||
roller.waitTaskAndSendProof(t, 10*time.Second, false)
|
||||
roller.waitTaskAndSendProof(t, 10*time.Second, false, true)
|
||||
|
||||
// wait for coordinator to dispatch task
|
||||
<-time.After(5 * time.Second)
|
||||
@@ -311,7 +431,7 @@ func testGracefulRestart(t *testing.T) {
|
||||
rollerManager.Stop()
|
||||
|
||||
// Setup new coordinator and ws server.
|
||||
newRollerManager, newHandler := setupCoordinator(t, cfg.DBConfig, wsURL)
|
||||
newRollerManager, newHandler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
|
||||
defer func() {
|
||||
newHandler.Shutdown(context.Background())
|
||||
newRollerManager.Stop()
|
||||
@@ -329,7 +449,7 @@ func testGracefulRestart(t *testing.T) {
|
||||
}
|
||||
|
||||
// will overwrite the roller client for `SubmitProof`
|
||||
roller.waitTaskAndSendProof(t, time.Millisecond*500, true)
|
||||
roller.waitTaskAndSendProof(t, time.Millisecond*500, true, true)
|
||||
defer roller.close()
|
||||
|
||||
// verify proof status
|
||||
@@ -355,13 +475,13 @@ func testGracefulRestart(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func setupCoordinator(t *testing.T, dbCfg *database.DBConfig, wsURL string) (rollerManager *coordinator.Manager, handler *http.Server) {
|
||||
func setupCoordinator(t *testing.T, dbCfg *database.DBConfig, rollersPerSession uint8, wsURL string) (rollerManager *coordinator.Manager, handler *http.Server) {
|
||||
// Get db handler.
|
||||
db, err := database.NewOrmFactory(dbCfg)
|
||||
assert.True(t, assert.NoError(t, err), "failed to get db handler.")
|
||||
|
||||
rollerManager, err = coordinator.New(context.Background(), &coordinator_config.RollerManagerConfig{
|
||||
RollersPerSession: 1,
|
||||
RollersPerSession: rollersPerSession,
|
||||
Verifier: &coordinator_config.VerifierConfig{MockMode: true},
|
||||
CollectionTime: 1,
|
||||
TokenTimeToLive: 5,
|
||||
@@ -448,7 +568,7 @@ func (r *mockRoller) releaseTasks() {
|
||||
}
|
||||
|
||||
// Wait for the proof task, after receiving the proof task, roller submits proof after proofTime secs.
|
||||
func (r *mockRoller) waitTaskAndSendProof(t *testing.T, proofTime time.Duration, reconnect bool) {
|
||||
func (r *mockRoller) waitTaskAndSendProof(t *testing.T, proofTime time.Duration, reconnect bool, validProof bool) {
|
||||
// simulating the case that the roller first disconnects and then reconnects to the coordinator
|
||||
// the Subscription and its `Err()` channel will be closed, and the coordinator will `freeRoller()`
|
||||
if reconnect {
|
||||
@@ -464,10 +584,10 @@ func (r *mockRoller) waitTaskAndSendProof(t *testing.T, proofTime time.Duration,
|
||||
r.releaseTasks()
|
||||
|
||||
r.stopCh = make(chan struct{})
|
||||
go r.loop(t, r.client, proofTime, r.stopCh)
|
||||
go r.loop(t, r.client, proofTime, validProof, r.stopCh)
|
||||
}
|
||||
|
||||
func (r *mockRoller) loop(t *testing.T, client *client2.Client, proofTime time.Duration, stopCh chan struct{}) {
|
||||
func (r *mockRoller) loop(t *testing.T, client *client2.Client, proofTime time.Duration, validProof bool, stopCh chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
case task := <-r.taskCh:
|
||||
@@ -485,6 +605,9 @@ func (r *mockRoller) loop(t *testing.T, client *client2.Client, proofTime time.D
|
||||
Proof: &message.AggProof{},
|
||||
},
|
||||
}
|
||||
if !validProof {
|
||||
proof.Status = message.StatusProofError
|
||||
}
|
||||
assert.NoError(t, proof.Sign(r.privKey))
|
||||
ok, err := client.SubmitProof(context.Background(), proof)
|
||||
assert.NoError(t, err)
|
||||
|
||||
@@ -106,17 +106,18 @@ func (m *Manager) freeTaskIDForRoller(pk string, id string) {
|
||||
}
|
||||
|
||||
// GetNumberOfIdleRollers return the count of idle rollers.
|
||||
func (m *Manager) GetNumberOfIdleRollers() int {
|
||||
pubkeys := m.rollerPool.Keys()
|
||||
for i := 0; i < len(pubkeys); i++ {
|
||||
if val, ok := m.rollerPool.Get(pubkeys[i]); ok {
|
||||
func (m *Manager) GetNumberOfIdleRollers() (count int) {
|
||||
for i, pk := range m.rollerPool.Keys() {
|
||||
if val, ok := m.rollerPool.Get(pk); ok {
|
||||
r := val.(*rollerNode)
|
||||
if r.TaskIDs.Count() > 0 {
|
||||
pubkeys[i], pubkeys = pubkeys[len(pubkeys)-1], pubkeys[:len(pubkeys)-1]
|
||||
if r.TaskIDs.Count() == 0 {
|
||||
count++
|
||||
}
|
||||
} else {
|
||||
log.Error("rollerPool Get fail", "pk", pk, "idx", i, "pk len", pk)
|
||||
}
|
||||
}
|
||||
return len(pubkeys)
|
||||
return count
|
||||
}
|
||||
|
||||
func (m *Manager) selectRoller() *rollerNode {
|
||||
@@ -128,6 +129,8 @@ func (m *Manager) selectRoller() *rollerNode {
|
||||
if r.TaskIDs.Count() == 0 {
|
||||
return r
|
||||
}
|
||||
} else {
|
||||
log.Error("rollerPool Get fail", "pk", pubkeys[idx.Int64()], "idx", idx.Int64(), "pk len", len(pubkeys))
|
||||
}
|
||||
pubkeys[idx.Int64()], pubkeys = pubkeys[0], pubkeys[1:]
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ create table l1_message
|
||||
);
|
||||
|
||||
comment
|
||||
on column l1_message.status is 'undefined, pending, submitted, confirmed';
|
||||
on column l1_message.status is 'undefined, pending, submitted, confirmed, failed, expired';
|
||||
|
||||
create unique index l1_message_hash_uindex
|
||||
on l1_message (msg_hash);
|
||||
|
||||
@@ -21,7 +21,7 @@ create table l2_message
|
||||
);
|
||||
|
||||
comment
|
||||
on column l2_message.status is 'undefined, pending, submitted, confirmed';
|
||||
on column l2_message.status is 'undefined, pending, submitted, confirmed, failed, expired';
|
||||
|
||||
create unique index l2_message_hash_uindex
|
||||
on l2_message (msg_hash);
|
||||
|
||||
@@ -236,8 +236,8 @@ func (o *blockBatchOrm) BatchRecordExist(id string) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (o *blockBatchOrm) GetPendingBatches() ([]string, error) {
|
||||
rows, err := o.db.Queryx(`SELECT id FROM block_batch WHERE rollup_status = $1 ORDER BY index ASC`, RollupPending)
|
||||
func (o *blockBatchOrm) GetPendingBatches(limit uint64) ([]string, error) {
|
||||
rows, err := o.db.Queryx(`SELECT id FROM block_batch WHERE rollup_status = $1 ORDER BY index ASC LIMIT $2`, RollupPending, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -260,7 +260,7 @@ func (o *blockBatchOrm) GetPendingBatches() ([]string, error) {
|
||||
}
|
||||
|
||||
func (o *blockBatchOrm) GetLatestFinalizedBatch() (*BlockBatch, error) {
|
||||
row := o.db.QueryRowx(`SELECT * FROM block_batch WHERE rollup_status = $1 OR rollup_status = $2 ORDER BY index DESC;`, RollupFinalized, RollupFinalizationSkipped)
|
||||
row := o.db.QueryRowx(`select * from block_batch where index = (select max(index) from block_batch where rollup_status = $1);`, RollupFinalized)
|
||||
batch := &BlockBatch{}
|
||||
if err := row.StructScan(batch); err != nil {
|
||||
return nil, err
|
||||
@@ -268,8 +268,8 @@ func (o *blockBatchOrm) GetLatestFinalizedBatch() (*BlockBatch, error) {
|
||||
return batch, nil
|
||||
}
|
||||
|
||||
func (o *blockBatchOrm) GetCommittedBatches() ([]string, error) {
|
||||
rows, err := o.db.Queryx(`SELECT id FROM block_batch WHERE rollup_status = $1 ORDER BY index ASC`, RollupCommitted)
|
||||
func (o *blockBatchOrm) GetCommittedBatches(limit uint64) ([]string, error) {
|
||||
rows, err := o.db.Queryx(`SELECT id FROM block_batch WHERE rollup_status = $1 ORDER BY index ASC LIMIT $2`, RollupCommitted, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -305,7 +305,7 @@ func (o *blockBatchOrm) GetRollupStatusByIDList(ids []string) ([]RollupStatus, e
|
||||
return make([]RollupStatus, 0), nil
|
||||
}
|
||||
|
||||
query, args, err := sqlx.In("SELECT rollup_status FROM block_batch WHERE id IN (?);", ids)
|
||||
query, args, err := sqlx.In("SELECT id, rollup_status FROM block_batch WHERE id IN (?);", ids)
|
||||
if err != nil {
|
||||
return make([]RollupStatus, 0), err
|
||||
}
|
||||
@@ -314,17 +314,24 @@ func (o *blockBatchOrm) GetRollupStatusByIDList(ids []string) ([]RollupStatus, e
|
||||
|
||||
rows, err := o.db.Query(query, args...)
|
||||
|
||||
var statuses []RollupStatus
|
||||
statusMap := make(map[string]RollupStatus)
|
||||
for rows.Next() {
|
||||
var id string
|
||||
var status RollupStatus
|
||||
if err = rows.Scan(&status); err != nil {
|
||||
if err = rows.Scan(&id, &status); err != nil {
|
||||
break
|
||||
}
|
||||
statuses = append(statuses, status)
|
||||
statusMap[id] = status
|
||||
}
|
||||
var statuses []RollupStatus
|
||||
if err != nil {
|
||||
return statuses, err
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
statuses = append(statuses, statusMap[id])
|
||||
}
|
||||
|
||||
return statuses, nil
|
||||
}
|
||||
|
||||
@@ -399,3 +406,17 @@ func (o *blockBatchOrm) GetAssignedBatchIDs() ([]string, error) {
|
||||
|
||||
return ids, rows.Close()
|
||||
}
|
||||
|
||||
func (o *blockBatchOrm) UpdateSkippedBatches() (int64, error) {
|
||||
res, err := o.db.Exec(o.db.Rebind("update block_batch set rollup_status = ? where (proving_status = ? or proving_status = ?) and rollup_status = ?;"), RollupFinalizationSkipped, ProvingTaskSkipped, ProvingTaskFailed, RollupCommitted)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
count, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
@@ -28,6 +28,9 @@ const (
|
||||
|
||||
// MsgFailed represents the from_layer message status is failed
|
||||
MsgFailed
|
||||
|
||||
// MsgExpired represents the from_layer message status is expired
|
||||
MsgExpired
|
||||
)
|
||||
|
||||
// L1Message is structure of stored layer1 bridge message
|
||||
@@ -142,30 +145,34 @@ type BlockBatchOrm interface {
|
||||
ResetProvingStatusFor(before ProvingStatus) error
|
||||
NewBatchInDBTx(dbTx *sqlx.Tx, startBlock *BlockInfo, endBlock *BlockInfo, parentHash string, totalTxNum uint64, gasUsed uint64) (string, error)
|
||||
BatchRecordExist(id string) (bool, error)
|
||||
GetPendingBatches() ([]string, error)
|
||||
GetCommittedBatches() ([]string, error)
|
||||
GetPendingBatches(limit uint64) ([]string, error)
|
||||
GetCommittedBatches(limit uint64) ([]string, error)
|
||||
GetRollupStatus(id string) (RollupStatus, error)
|
||||
GetRollupStatusByIDList(ids []string) ([]RollupStatus, error)
|
||||
GetCommitTxHash(id string) (sql.NullString, error)
|
||||
GetFinalizeTxHash(id string) (sql.NullString, error)
|
||||
GetLatestFinalizedBatch() (*BlockBatch, error)
|
||||
UpdateRollupStatus(ctx context.Context, id string, status RollupStatus) error
|
||||
UpdateCommitTxHashAndRollupStatus(ctx context.Context, id string, commitTxHash string, status RollupStatus) error
|
||||
UpdateFinalizeTxHashAndRollupStatus(ctx context.Context, id string, finalizeTxHash string, status RollupStatus) error
|
||||
GetAssignedBatchIDs() ([]string, error)
|
||||
UpdateSkippedBatches() (int64, error)
|
||||
|
||||
GetCommitTxHash(id string) (sql.NullString, error) // for unit tests only
|
||||
GetFinalizeTxHash(id string) (sql.NullString, error) // for unit tests only
|
||||
}
|
||||
|
||||
// L1MessageOrm is layer1 message db interface
|
||||
type L1MessageOrm interface {
|
||||
GetL1MessageByNonce(nonce uint64) (*L1Message, error)
|
||||
GetL1MessageByMsgHash(msgHash string) (*L1Message, error)
|
||||
GetL1MessagesByStatus(status MsgStatus) ([]*L1Message, error)
|
||||
GetL1MessagesByStatus(status MsgStatus, limit uint64) ([]*L1Message, error)
|
||||
GetL1ProcessedNonce() (int64, error)
|
||||
SaveL1Messages(ctx context.Context, messages []*L1Message) error
|
||||
UpdateLayer2Hash(ctx context.Context, msgHash string, layer2Hash string) error
|
||||
UpdateLayer1Status(ctx context.Context, msgHash string, status MsgStatus) error
|
||||
UpdateLayer1StatusAndLayer2Hash(ctx context.Context, msgHash string, status MsgStatus, layer2Hash string) error
|
||||
GetLayer1LatestWatchedHeight() (int64, error)
|
||||
|
||||
GetRelayL1MessageTxHash(nonce uint64) (sql.NullString, error) // for unit tests only
|
||||
}
|
||||
|
||||
// L2MessageOrm is layer2 message db interface
|
||||
@@ -174,8 +181,7 @@ type L2MessageOrm interface {
|
||||
GetL2MessageByMsgHash(msgHash string) (*L2Message, error)
|
||||
MessageProofExist(nonce uint64) (bool, error)
|
||||
GetMessageProofByNonce(nonce uint64) (string, error)
|
||||
GetL2MessagesByStatus(status MsgStatus) ([]*L2Message, error)
|
||||
GetL2MessagesByStatusUpToHeight(status MsgStatus, height uint64) ([]*L2Message, error)
|
||||
GetL2Messages(fields map[string]interface{}, args ...string) ([]*L2Message, error)
|
||||
GetL2ProcessedNonce() (int64, error)
|
||||
SaveL2Messages(ctx context.Context, messages []*L2Message) error
|
||||
UpdateLayer1Hash(ctx context.Context, msgHash string, layer1Hash string) error
|
||||
@@ -183,4 +189,6 @@ type L2MessageOrm interface {
|
||||
UpdateLayer2StatusAndLayer1Hash(ctx context.Context, msgHash string, status MsgStatus, layer1Hash string) error
|
||||
UpdateMessageProof(ctx context.Context, nonce uint64, proof string) error
|
||||
GetLayer2LatestWatchedHeight() (int64, error)
|
||||
|
||||
GetRelayL2MessageTxHash(nonce uint64) (sql.NullString, error) // for unit tests only
|
||||
}
|
||||
|
||||
@@ -45,8 +45,8 @@ func (m *l1MessageOrm) GetL1MessageByNonce(nonce uint64) (*L1Message, error) {
|
||||
}
|
||||
|
||||
// GetL1MessagesByStatus fetch list of unprocessed messages given msg status
|
||||
func (m *l1MessageOrm) GetL1MessagesByStatus(status MsgStatus) ([]*L1Message, error) {
|
||||
rows, err := m.db.Queryx(`SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, status FROM l1_message WHERE status = $1 ORDER BY nonce ASC;`, status)
|
||||
func (m *l1MessageOrm) GetL1MessagesByStatus(status MsgStatus, limit uint64) ([]*L1Message, error) {
|
||||
rows, err := m.db.Queryx(`SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, status FROM l1_message WHERE status = $1 ORDER BY nonce ASC LIMIT $2;`, status, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -167,3 +167,12 @@ func (m *l1MessageOrm) GetLayer1LatestWatchedHeight() (int64, error) {
|
||||
}
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
func (m *l1MessageOrm) GetRelayL1MessageTxHash(nonce uint64) (sql.NullString, error) {
|
||||
row := m.db.QueryRow(`SELECT layer2_hash FROM l1_message WHERE nonce = $1`, nonce)
|
||||
var hash sql.NullString
|
||||
if err := row.Scan(&hash); err != nil {
|
||||
return sql.NullString{}, err
|
||||
}
|
||||
return hash, nil
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
@@ -88,32 +89,15 @@ func (m *layer2MessageOrm) GetL2ProcessedNonce() (int64, error) {
|
||||
}
|
||||
|
||||
// GetL2MessagesByStatus fetch list of messages given msg status
|
||||
func (m *layer2MessageOrm) GetL2MessagesByStatus(status MsgStatus) ([]*L2Message, error) {
|
||||
rows, err := m.db.Queryx(`SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer2_hash FROM l2_message WHERE status = $1 ORDER BY nonce ASC;`, status)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
func (m *layer2MessageOrm) GetL2Messages(fields map[string]interface{}, args ...string) ([]*L2Message, error) {
|
||||
query := "SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer2_hash FROM l2_message WHERE 1 = 1 "
|
||||
for key := range fields {
|
||||
query += fmt.Sprintf("AND %s=:%s ", key, key)
|
||||
}
|
||||
query = strings.Join(append([]string{query}, args...), " ")
|
||||
|
||||
var msgs []*L2Message
|
||||
for rows.Next() {
|
||||
msg := &L2Message{}
|
||||
if err = rows.StructScan(&msg); err != nil {
|
||||
break
|
||||
}
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
if len(msgs) == 0 || errors.Is(err, sql.ErrNoRows) {
|
||||
// log.Warn("no unprocessed layer2 messages in db", "err", err)
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return msgs, rows.Close()
|
||||
}
|
||||
|
||||
// GetL2MessagesByStatusUpToHeight fetch list of messages given msg status and an upper limit on height
|
||||
func (m *layer2MessageOrm) GetL2MessagesByStatusUpToHeight(status MsgStatus, height uint64) ([]*L2Message, error) {
|
||||
rows, err := m.db.Queryx(`SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer2_hash FROM l2_message WHERE status = $1 AND height <= $2 ORDER BY nonce ASC;`, status, height)
|
||||
db := m.db
|
||||
rows, err := db.NamedQuery(db.Rebind(query), fields)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -222,3 +206,12 @@ func (m *layer2MessageOrm) GetLayer2LatestWatchedHeight() (int64, error) {
|
||||
}
|
||||
return height, nil
|
||||
}
|
||||
|
||||
func (m *layer2MessageOrm) GetRelayL2MessageTxHash(nonce uint64) (sql.NullString, error) {
|
||||
row := m.db.QueryRow(`SELECT layer1_hash FROM l2_message WHERE nonce = $1`, nonce)
|
||||
var hash sql.NullString
|
||||
if err := row.Scan(&hash); err != nil {
|
||||
return sql.NullString{}, err
|
||||
}
|
||||
return hash, nil
|
||||
}
|
||||
|
||||
@@ -281,7 +281,7 @@ func testOrmBlockBatch(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int(2), len(batches))
|
||||
|
||||
batcheIDs, err := ormBatch.GetPendingBatches()
|
||||
batcheIDs, err := ormBatch.GetPendingBatches(10)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int(2), len(batcheIDs))
|
||||
assert.Equal(t, batchID1, batcheIDs[0])
|
||||
@@ -290,7 +290,7 @@ func testOrmBlockBatch(t *testing.T) {
|
||||
err = ormBatch.UpdateCommitTxHashAndRollupStatus(context.Background(), batchID1, "commit_tx_1", orm.RollupCommitted)
|
||||
assert.NoError(t, err)
|
||||
|
||||
batcheIDs, err = ormBatch.GetPendingBatches()
|
||||
batcheIDs, err = ormBatch.GetPendingBatches(10)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int(1), len(batcheIDs))
|
||||
assert.Equal(t, batchID2, batcheIDs[0])
|
||||
@@ -317,6 +317,24 @@ func testOrmBlockBatch(t *testing.T) {
|
||||
result, err := ormBatch.GetLatestFinalizedBatch()
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, batchID1, result.ID)
|
||||
|
||||
status1, err := ormBatch.GetRollupStatus(batchID1)
|
||||
assert.NoError(t, err)
|
||||
status2, err := ormBatch.GetRollupStatus(batchID2)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, status1, status2)
|
||||
statues, err := ormBatch.GetRollupStatusByIDList([]string{batchID1, batchID2, batchID1, batchID2})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, statues[0], status1)
|
||||
assert.Equal(t, statues[1], status2)
|
||||
assert.Equal(t, statues[2], status1)
|
||||
assert.Equal(t, statues[3], status2)
|
||||
statues, err = ormBatch.GetRollupStatusByIDList([]string{batchID2, batchID1, batchID2, batchID1})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, statues[0], status2)
|
||||
assert.Equal(t, statues[1], status1)
|
||||
assert.Equal(t, statues[2], status2)
|
||||
assert.Equal(t, statues[3], status1)
|
||||
}
|
||||
|
||||
// testOrmSessionInfo test rollup result table functions
|
||||
|
||||
@@ -11,7 +11,7 @@ endif
|
||||
|
||||
libzkp:
|
||||
cd ../common/libzkp/impl && cargo build --release && cp ./target/release/libzkp.a ../interface/
|
||||
cp -r ../common/libzkp/interface ./prover/lib
|
||||
rm -rf ./prover/lib && cp -r ../common/libzkp/interface ./prover/lib
|
||||
|
||||
roller: libzkp ## Build the Roller instance.
|
||||
GOBIN=$(PWD)/build/bin go build -ldflags "-X scroll-tech/common/version.ZkVersion=${ZK_VERSION}" -o $(PWD)/build/bin/roller ./cmd
|
||||
|
||||
Reference in New Issue
Block a user