Add batches check.

This commit is contained in:
maskpp
2023-03-07 09:44:15 +08:00
parent 856af6735d
commit 08d707dfd2
8 changed files with 191 additions and 42 deletions

View File

@@ -100,10 +100,6 @@ func NewLayer1Relayer(ctx context.Context, db database.OrmFactory, cfg *config.R
stopCh: make(chan struct{}),
}
go relayer.confirmLoop(ctx)
// Deal with broken transactions.
if err = relayer.prepare(); err != nil {
return nil, err
}
return relayer, nil
}
@@ -123,6 +119,11 @@ func (r *Layer1Relayer) prepare() error {
// Start the relayer process
func (r *Layer1Relayer) Start() {
// Deal with broken transactions.
if err := r.prepare(); err != nil {
log.Crit("failed to init layer1 transaction messages")
}
loop := func(ctx context.Context, f func()) {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()

View File

@@ -32,18 +32,19 @@ func New(ctx context.Context, cfg *config.L2Config, orm database.OrmFactory) (*B
// Otherwise, there will be a race condition between watcher.initializeGenesis and relayer.ProcessPendingBatches.
watcher := NewL2WatcherClient(ctx, client, cfg.Confirmations, cfg.L2MessengerAddress, cfg.L2MessageQueueAddress, orm)
proposer := NewBatchProposer(ctx, cfg.BatchProposerConfig, orm)
relayer, err := NewLayer2Relayer(ctx, client, orm, cfg.RelayerConfig)
if err != nil {
return nil, err
}
batchProposer := NewBatchProposer(ctx, cfg.BatchProposerConfig, relayer, orm)
proposer.SetLayer2Relayer(relayer)
relayer.SetBatchProposer(proposer)
return &Backend{
cfg: cfg,
watcher: watcher,
relayer: relayer,
batchProposer: batchProposer,
batchProposer: proposer,
orm: orm,
}, nil
}

View File

@@ -19,6 +19,10 @@ import (
"scroll-tech/bridge/config"
)
type layer2Interface interface {
SendCommitTx(batchData []*types.BatchData) error
}
// AddBatchInfoToDB inserts the batch information to the BlockBatch table and updates the batch_hash
// in all blocks included in the batch.
func AddBatchInfoToDB(db database.OrmFactory, batchData *types.BatchData) error {
@@ -70,7 +74,8 @@ type BatchProposer struct {
proofGenerationFreq uint64
batchDataBuffer []*types.BatchData
relayer *Layer2Relayer
layer2Interface
piCfg *types.PublicInputHashConfig
@@ -78,8 +83,8 @@ type BatchProposer struct {
}
// NewBatchProposer will return a new instance of BatchProposer.
func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, relayer *Layer2Relayer, orm database.OrmFactory) *BatchProposer {
p := &BatchProposer{
func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, orm database.OrmFactory) *BatchProposer {
return &BatchProposer{
mutex: sync.Mutex{},
ctx: ctx,
orm: orm,
@@ -92,21 +97,23 @@ func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, rela
batchDataBufferSizeLimit: 100*cfg.CommitTxCalldataSizeLimit + 1*1024*1024, // @todo: determine the value.
proofGenerationFreq: cfg.ProofGenerationFreq,
piCfg: cfg.PublicInputConfig,
relayer: relayer,
stopCh: make(chan struct{}),
}
}
// SetLayer2Relayer set interface from layer2.
func (p *BatchProposer) SetLayer2Relayer(relayer layer2Interface) {
p.layer2Interface = relayer
}
// Start the Listening process
func (p *BatchProposer) Start() {
// for graceful restart.
p.recoverBatchDataBuffer()
// try to commit the leftover pending batches
p.tryCommitBatches()
return p
}
// Start the Listening process
func (p *BatchProposer) Start() {
go func() {
if reflect.ValueOf(p.orm).IsNil() {
panic("must run BatchProposer with DB")
@@ -197,7 +204,7 @@ func (p *BatchProposer) recoverBatchDataBuffer() {
continue
}
batchData, err := p.generateBatchData(parentBatch, blockInfos)
batchData, err := p.GenerateBatchData(parentBatch, blockInfos)
if err != nil {
continue
}
@@ -264,7 +271,7 @@ func (p *BatchProposer) tryCommitBatches() {
// Send commit tx for batchDataBuffer[0:index]
log.Info("Commit batches", "start_index", p.batchDataBuffer[0].Batch.BatchIndex,
"end_index", p.batchDataBuffer[index-1].Batch.BatchIndex)
err := p.relayer.SendCommitTx(p.batchDataBuffer[:index])
err := p.SendCommitTx(p.batchDataBuffer[:index])
if err != nil {
// leave the retry to the next ticker
log.Error("SendCommitTx failed", "error", err)
@@ -327,7 +334,7 @@ func (p *BatchProposer) createBatchForBlocks(blocks []*types.BlockInfo) error {
return err
}
batchData, err := p.generateBatchData(lastBatch, blocks)
batchData, err := p.GenerateBatchData(lastBatch, blocks)
if err != nil {
log.Error("createBatchData failed", "error", err)
return err
@@ -342,7 +349,8 @@ func (p *BatchProposer) createBatchForBlocks(blocks []*types.BlockInfo) error {
return nil
}
func (p *BatchProposer) generateBatchData(parentBatch *types.BlockBatch, blocks []*types.BlockInfo) (*types.BatchData, error) {
// GenerateBatchData as a public function can be called by layer2 relayer.
func (p *BatchProposer) GenerateBatchData(parentBatch *types.BlockBatch, blocks []*types.BlockInfo) (*types.BatchData, error) {
var traces []*geth_types.BlockTrace
for _, block := range blocks {
trs, err := p.orm.GetL2BlockTraces(map[string]interface{}{"hash": block.Hash})

View File

@@ -41,7 +41,10 @@ func testBatchProposerProposeBatch(t *testing.T) {
BatchTxNumThreshold: 135,
BatchTimeSec: 1,
BatchBlocksLimit: 100,
}, relayer, db)
}, db)
relayer.SetBatchProposer(proposer)
proposer.SetLayer2Relayer(relayer)
proposer.tryProposeBatch()
infos, err := db.GetUnbatchedL2Blocks(map[string]interface{}{},
@@ -85,13 +88,14 @@ func testBatchProposerGracefulRestart(t *testing.T) {
assert.Equal(t, 1, len(batchHashes))
assert.Equal(t, batchData2.Hash().Hex(), batchHashes[0])
// test p.recoverBatchDataBuffer().
_ = NewBatchProposer(context.Background(), &config.BatchProposerConfig{
proposer := NewBatchProposer(context.Background(), &config.BatchProposerConfig{
ProofGenerationFreq: 1,
BatchGasThreshold: 3000000,
BatchTxNumThreshold: 135,
BatchTimeSec: 1,
BatchBlocksLimit: 100,
}, relayer, db)
}, db)
proposer.SetLayer2Relayer(relayer)
batchHashes, err = db.GetPendingBatches(math.MaxInt32)
assert.NoError(t, err)

View File

@@ -29,6 +29,10 @@ const (
defaultGasPriceDiff = 50000 // 5%
)
type batchInterface interface {
GenerateBatchData(parentBatch *types.BlockBatch, blocks []*types.BlockInfo) (*types.BatchData, error)
}
// Layer2Relayer is responsible for
// 1. Committing and finalizing L2 blocks on L1
// 2. Relaying messages from L2 to L1
@@ -71,6 +75,9 @@ type Layer2Relayer struct {
// key(string): confirmation ID, value(string): batch hash.
processingFinalization sync.Map
// Use batch_proposer's GenerateBatchData interface.
batchInterface
stopCh chan struct{}
}
@@ -133,13 +140,15 @@ func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db databa
stopCh: make(chan struct{}),
}
go relayer.confirmLoop(ctx)
if err = relayer.prepare(); err != nil {
return nil, err
}
return relayer, nil
}
// SetBatchProposer set interface from batch_proposer.
func (r *Layer2Relayer) SetBatchProposer(proposer batchInterface) {
r.batchInterface = proposer
}
func (r *Layer2Relayer) prepare() error {
var eg errgroup.Group
eg.Go(func() error {
@@ -162,12 +171,26 @@ func (r *Layer2Relayer) prepare() error {
})
return nil
})
eg.Go(func() error {
if err := r.checkRollupBatches(); err != nil {
log.Error("failed to init layer2 rollupCommitting messages", "err", err)
return err
}
utils.TryTimes(-1, func() bool {
return r.rollupSender.PendingCount() == 0
})
return nil
})
return eg.Wait()
}
// Start the relayer process
func (r *Layer2Relayer) Start() {
if err := r.prepare(); err != nil {
log.Crit("failed to init layer2 transaction messages")
}
loop := func(ctx context.Context, f func()) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

View File

@@ -2,24 +2,115 @@ package l2
import (
"errors"
"fmt"
"math/big"
"github.com/modern-go/reflect2"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/crypto"
"github.com/scroll-tech/go-ethereum/log"
"modernc.org/mathutil"
"scroll-tech/common/types"
"scroll-tech/common/utils"
bridge_abi "scroll-tech/bridge/abi"
"scroll-tech/bridge/sender"
)
// SendCommitTx sends commitBatches tx to L1.
func (r *Layer2Relayer) SendCommitTx(batchData []*types.BatchData) error {
if len(batchData) == 0 {
log.Error("SendCommitTx receives empty batch")
func (r *Layer2Relayer) checkRollupBatches() error {
// Check mock generateBatch interface.
if reflect2.IsNil(r.batchInterface) {
return nil
}
blockBatchCache := make(map[string]*types.BlockBatch)
getBlockBatch := func(batchHash string) (*types.BlockBatch, error) {
if blockBatch, ok := blockBatchCache[batchHash]; ok {
return blockBatch, nil
}
blockBatches, err := r.db.GetBlockBatches(map[string]interface{}{"hash": batchHash})
if err != nil {
return nil, err
}
if len(blockBatches) == 0 {
return nil, fmt.Errorf("don't has such blockBatch, batchHash: %s", batchHash)
}
blockBatchCache[batchHash] = blockBatches[0]
return blockBatches[0], nil
}
var batchIndex uint64
BEGIN:
blockBatches, err := r.db.GetBlockBatches(
map[string]interface{}{"rollup_status": types.RollupCommitting},
fmt.Sprintf("AND commit_tx_hash IN (SELECT commit_tx_hash FROM block_batch WHERE index > %d GROUP BY commit_tx_hash LIMIT 1)", batchIndex),
fmt.Sprintf("AND index > %d", batchIndex),
"ORDER BY index ASC",
)
if err != nil || len(blockBatches) == 0 {
return err
}
var batchDataBuffer []*types.BatchData
for _, blockBatch := range blockBatches {
batchIndex = mathutil.MaxUint64(batchIndex, blockBatch.Index)
var (
parentBatch *types.BlockBatch
blockInfos []*types.BlockInfo
)
parentBatch, err = getBlockBatch(blockBatch.ParentHash)
if err != nil {
return err
}
blockInfos, err = r.db.GetL2BlockInfos(
map[string]interface{}{"batch_hash": blockBatch.Hash},
"order by number ASC",
)
if err != nil {
return err
}
if len(blockInfos) != int(blockBatch.EndBlockNumber-blockBatch.StartBlockNumber+1) {
log.Error("the number of block info retrieved from DB mistmatches the blockBatch info in the DB",
"len(blockInfos)", len(blockInfos),
"expected", blockBatch.EndBlockNumber-blockBatch.StartBlockNumber+1)
continue
}
var batchData *types.BatchData
batchData, err = r.GenerateBatchData(parentBatch, blockInfos)
if err != nil {
return err
}
batchDataBuffer = append(batchDataBuffer, batchData)
}
batchHashes, txID, callData, err := r.packBatchData(batchDataBuffer)
if err != nil {
return err
}
// Wait until sender's pending is not full.
utils.TryTimes(-1, func() bool {
return !r.rollupSender.IsFull()
})
// Handle tx.
err = r.rollupSender.LoadOrSendTx(
common.HexToHash(blockBatches[0].CommitTxHash.String),
txID,
&r.cfg.RollupContractAddress,
big.NewInt(0),
callData,
)
if err != nil {
log.Error("failed to load or send batchData tx")
return err
}
r.processingBatchesCommitment.Store(txID, batchHashes)
goto BEGIN
}
func (r *Layer2Relayer) packBatchData(batchData []*types.BatchData) ([]string, string, []byte, error) {
// pack calldata
commitBatches := make([]bridge_abi.IScrollChainBatch, len(batchData))
for i, batch := range batchData {
@@ -31,15 +122,38 @@ func (r *Layer2Relayer) SendCommitTx(batchData []*types.BatchData) error {
"error", err,
"start_batch_index", commitBatches[0].BatchIndex,
"end_batch_index", commitBatches[len(commitBatches)-1].BatchIndex)
return err
return nil, "", nil, err
}
// generate a unique txID and send transaction
var bytes []byte
for _, batch := range batchData {
var (
bytes []byte
batchHashes = make([]string, len(batchData))
)
for i, batch := range batchData {
bytes = append(bytes, batch.Hash().Bytes()...)
batchHashes[i] = batch.Hash().Hex()
}
txID := crypto.Keccak256Hash(bytes).String()
return batchHashes, crypto.Keccak256Hash(bytes).String(), calldata, nil
}
// SendCommitTx sends commitBatches tx to L1.
func (r *Layer2Relayer) SendCommitTx(batchData []*types.BatchData) error {
if len(batchData) == 0 {
log.Error("SendCommitTx receives empty batch")
return nil
}
// pack calldata
batchHashes, txID, calldata, err := r.packBatchData(batchData)
if err != nil {
log.Error("Failed to pack commitBatches",
"error", err,
"start_batch_index", batchData[0].Batch.BatchIndex,
"end_batch_index", batchData[len(batchData)-1].Batch.BatchIndex)
return err
}
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), calldata)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
@@ -49,16 +163,14 @@ func (r *Layer2Relayer) SendCommitTx(batchData []*types.BatchData) error {
}
log.Info("Sent the commitBatches tx to layer1",
"tx_hash", txHash.Hex(),
"start_batch_index", commitBatches[0].BatchIndex,
"end_batch_index", commitBatches[len(commitBatches)-1].BatchIndex)
"start_batch_index", batchData[0].Batch.BatchIndex,
"end_batch_index", batchData[len(batchData)-1].Batch.BatchIndex)
// record and sync with db, @todo handle db error
batchHashes := make([]string, len(batchData))
for i, batch := range batchData {
batchHashes[i] = batch.Hash().Hex()
for i := range batchData {
err = r.db.UpdateCommitTxHashAndRollupStatus(r.ctx, batchHashes[i], txHash.String(), types.RollupCommitting)
if err != nil {
log.Error("UpdateCommitTxHashAndRollupStatus failed", "hash", batchHashes[i], "index", batch.Batch.BatchIndex, "err", err)
log.Error("UpdateCommitTxHashAndRollupStatus failed", "hash", batchHashes[i], "index", batchData[i].Batch.BatchIndex, "err", err)
}
}
r.processingBatchesCommitment.Store(txID, batchHashes)

View File

@@ -11,6 +11,6 @@ if [ ! -n "${IPC_PATH}" ];then
IPC_PATH="/tmp/l1geth_path.ipc"
fi
exec geth --mine --datadir "." --unlock 0 --password "./password" --allow-insecure-unlock --nodiscover \
exec geth --mine --datadir "." --unlock 0 --miner.etherbase "0x1c5a77d9fa7ef466951b2f01f724bca3a5820b63" --password "./password" --allow-insecure-unlock --nodiscover \
--http --http.addr "0.0.0.0" --http.port 8545 --ws --ws.addr "0.0.0.0" --ws.port 8546 --ipcpath ${IPC_PATH}

View File

@@ -26,7 +26,7 @@ func NewL1MessageOrm(db *sqlx.DB) L1MessageOrm {
// GetL1Messages get l1 messages by k-v map and args.
func (m *l1MessageOrm) GetL1Messages(fields map[string]interface{}, args ...string) ([]*types.L1Message, error) {
query := "SELECT nonce, msg_hash, height, sender, target, value, fee, gas_limit, deadline, calldata, layer1_hash, layer2_hash, status FROM l2_message WHERE 1 = 1 "
query := "SELECT queue_index, msg_hash, height, sender, target, value, calldata, layer1_hash, layer2_hash, status FROM l1_message WHERE 1 = 1 "
for key := range fields {
query += fmt.Sprintf(" AND %s=:%s ", key, key)
}