mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-14 00:18:03 -05:00
391 lines
13 KiB
Go
391 lines
13 KiB
Go
package orm
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
"github.com/scroll-tech/go-ethereum/log"
|
|
|
|
"scroll-tech/common/types"
|
|
)
|
|
|
|
type blockBatchOrm struct {
|
|
db *sqlx.DB
|
|
}
|
|
|
|
var _ BlockBatchOrm = (*blockBatchOrm)(nil)
|
|
|
|
// NewBlockBatchOrm create an blockBatchOrm instance
|
|
func NewBlockBatchOrm(db *sqlx.DB) BlockBatchOrm {
|
|
return &blockBatchOrm{db: db}
|
|
}
|
|
|
|
func (o *blockBatchOrm) GetBlockBatches(fields map[string]interface{}, args ...string) ([]*types.BlockBatch, error) {
|
|
query := "SELECT * FROM block_batch WHERE 1 = 1 "
|
|
for key := range fields {
|
|
query += fmt.Sprintf("AND %s=:%s ", key, key)
|
|
}
|
|
query = strings.Join(append([]string{query}, args...), " ")
|
|
|
|
db := o.db
|
|
rows, err := db.NamedQuery(db.Rebind(query), fields)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var batches []*types.BlockBatch
|
|
for rows.Next() {
|
|
batch := &types.BlockBatch{}
|
|
if err = rows.StructScan(batch); err != nil {
|
|
break
|
|
}
|
|
batches = append(batches, batch)
|
|
}
|
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
|
return nil, err
|
|
}
|
|
|
|
return batches, rows.Close()
|
|
}
|
|
|
|
func (o *blockBatchOrm) GetProvingStatusByHash(hash string) (types.ProvingStatus, error) {
|
|
row := o.db.QueryRow(`SELECT proving_status FROM block_batch WHERE hash = $1`, hash)
|
|
var status types.ProvingStatus
|
|
if err := row.Scan(&status); err != nil {
|
|
return types.ProvingStatusUndefined, err
|
|
}
|
|
return status, nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) GetVerifiedProofAndInstanceCommitmentsByHash(hash string) ([]byte, []byte, error) {
|
|
var proof []byte
|
|
var instanceCommitments []byte
|
|
row := o.db.QueryRow(`SELECT proof, instance_commitments FROM block_batch WHERE hash = $1 and proving_status = $2`, hash, types.ProvingTaskVerified)
|
|
|
|
if err := row.Scan(&proof, &instanceCommitments); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return proof, instanceCommitments, nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) UpdateProofByHash(ctx context.Context, hash string, proof, instanceCommitments []byte, proofTimeSec uint64) error {
|
|
db := o.db
|
|
if _, err := db.ExecContext(ctx,
|
|
db.Rebind(`UPDATE block_batch set proof = ?, instance_commitments = ?, proof_time_sec = ? where hash = ?;`),
|
|
proof, instanceCommitments, proofTimeSec, hash,
|
|
); err != nil {
|
|
log.Error("failed to update proof", "err", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) UpdateProvingStatus(hash string, status types.ProvingStatus) error {
|
|
switch status {
|
|
case types.ProvingTaskAssigned:
|
|
_, err := o.db.Exec(o.db.Rebind("update block_batch set proving_status = ?, prover_assigned_at = ? where hash = ?;"), status, time.Now(), hash)
|
|
return err
|
|
case types.ProvingTaskUnassigned:
|
|
_, err := o.db.Exec(o.db.Rebind("update block_batch set proving_status = ?, prover_assigned_at = null where hash = ?;"), status, hash)
|
|
return err
|
|
case types.ProvingTaskProved, types.ProvingTaskVerified:
|
|
_, err := o.db.Exec(o.db.Rebind("update block_batch set proving_status = ?, proved_at = ? where hash = ?;"), status, time.Now(), hash)
|
|
return err
|
|
default:
|
|
_, err := o.db.Exec(o.db.Rebind("update block_batch set proving_status = ? where hash = ?;"), status, hash)
|
|
return err
|
|
}
|
|
}
|
|
|
|
func (o *blockBatchOrm) ResetProvingStatusFor(before types.ProvingStatus) error {
|
|
_, err := o.db.Exec(o.db.Rebind("update block_batch set proving_status = ? where proving_status = ?;"), types.ProvingTaskUnassigned, before)
|
|
return err
|
|
}
|
|
|
|
// func (o *blockBatchOrm) NewBatchInDBTx(dbTx *sqlx.Tx, startBlock *BlockInfo, endBlock *BlockInfo, parentHash string, totalTxNum uint64, totalL2Gas uint64) (string, error) {
|
|
func (o *blockBatchOrm) NewBatchInDBTx(dbTx *sqlx.Tx, batchData *types.BatchData) error {
|
|
numBlocks := len(batchData.Batch.Blocks)
|
|
if _, err := dbTx.NamedExec(`INSERT INTO public.block_batch (hash, index, parent_hash, start_block_number, start_block_hash, end_block_number, end_block_hash, total_tx_num, total_l2_gas, state_root, total_l1_tx_num) VALUES (:hash, :index, :parent_hash, :start_block_number, :start_block_hash, :end_block_number, :end_block_hash, :total_tx_num, :total_l2_gas, :state_root, :total_l1_tx_num)`,
|
|
map[string]interface{}{
|
|
"hash": batchData.Hash().Hex(),
|
|
"index": batchData.Batch.BatchIndex,
|
|
"parent_hash": batchData.Batch.ParentBatchHash.Hex(),
|
|
"start_block_number": batchData.Batch.Blocks[0].BlockNumber,
|
|
"start_block_hash": batchData.Batch.Blocks[0].BlockHash.Hex(),
|
|
"end_block_number": batchData.Batch.Blocks[numBlocks-1].BlockNumber,
|
|
"end_block_hash": batchData.Batch.Blocks[numBlocks-1].BlockHash.Hex(),
|
|
"total_tx_num": batchData.TotalTxNum,
|
|
"total_l1_tx_num": batchData.TotalL1TxNum,
|
|
"total_l2_gas": batchData.TotalL2Gas,
|
|
"state_root": batchData.Batch.NewStateRoot.Hex(),
|
|
"created_at": time.Now(),
|
|
// "proving_status": ProvingTaskUnassigned, // actually no need, because we have default value in DB schema
|
|
// "rollup_status": RollupPending, // actually no need, because we have default value in DB schema
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) BatchRecordExist(hash string) (bool, error) {
|
|
var res int
|
|
err := o.db.QueryRow(o.db.Rebind(`SELECT 1 FROM block_batch where hash = ? limit 1;`), hash).Scan(&res)
|
|
if err != nil {
|
|
if err != sql.ErrNoRows {
|
|
return false, err
|
|
}
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) GetPendingBatches(limit uint64) ([]string, error) {
|
|
rows, err := o.db.Queryx(`SELECT hash FROM block_batch WHERE rollup_status = $1 ORDER BY index ASC LIMIT $2`, types.RollupPending, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var hashes []string
|
|
for rows.Next() {
|
|
var hash string
|
|
if err = rows.Scan(&hash); err != nil {
|
|
break
|
|
}
|
|
hashes = append(hashes, hash)
|
|
}
|
|
if len(hashes) == 0 || errors.Is(err, sql.ErrNoRows) {
|
|
// log.Warn("no pending batches in db", "err", err)
|
|
} else if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return hashes, rows.Close()
|
|
}
|
|
|
|
func (o *blockBatchOrm) GetLatestBatch() (*types.BlockBatch, error) {
|
|
row := o.db.QueryRowx(`select * from block_batch where index = (select max(index) from block_batch);`)
|
|
batch := &types.BlockBatch{}
|
|
if err := row.StructScan(batch); err != nil {
|
|
return nil, err
|
|
}
|
|
return batch, nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) GetLatestCommittedBatch() (*types.BlockBatch, error) {
|
|
row := o.db.QueryRowx(`select * from block_batch where index = (select max(index) from block_batch where rollup_status = $1);`, types.RollupCommitted)
|
|
batch := &types.BlockBatch{}
|
|
if err := row.StructScan(batch); err != nil {
|
|
return nil, err
|
|
}
|
|
return batch, nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) GetLatestFinalizedBatch() (*types.BlockBatch, error) {
|
|
row := o.db.QueryRowx(`select * from block_batch where index = (select max(index) from block_batch where rollup_status = $1);`, types.RollupFinalized)
|
|
batch := &types.BlockBatch{}
|
|
if err := row.StructScan(batch); err != nil {
|
|
return nil, err
|
|
}
|
|
return batch, nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) GetLatestFinalizingOrFinalizedBatch() (*types.BlockBatch, error) {
|
|
row := o.db.QueryRowx(`select * from block_batch where index = (select max(index) from block_batch where rollup_status = $1 or rollup_status = $2);`, types.RollupFinalizing, types.RollupFinalized)
|
|
batch := &types.BlockBatch{}
|
|
if err := row.StructScan(batch); err != nil {
|
|
return nil, err
|
|
}
|
|
return batch, nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) GetCommittedBatches(limit uint64) ([]string, error) {
|
|
rows, err := o.db.Queryx(`SELECT hash FROM block_batch WHERE rollup_status = $1 ORDER BY index ASC LIMIT $2`, types.RollupCommitted, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var hashes []string
|
|
for rows.Next() {
|
|
var hash string
|
|
if err = rows.Scan(&hash); err != nil {
|
|
break
|
|
}
|
|
hashes = append(hashes, hash)
|
|
}
|
|
if len(hashes) == 0 || errors.Is(err, sql.ErrNoRows) {
|
|
// log.Warn("no committed batches in db", "err", err)
|
|
} else if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return hashes, rows.Close()
|
|
}
|
|
|
|
func (o *blockBatchOrm) GetRollupStatus(hash string) (types.RollupStatus, error) {
|
|
row := o.db.QueryRow(`SELECT rollup_status FROM block_batch WHERE hash = $1`, hash)
|
|
var status types.RollupStatus
|
|
if err := row.Scan(&status); err != nil {
|
|
return types.RollupUndefined, err
|
|
}
|
|
return status, nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) GetRollupStatusByHashList(hashes []string) ([]types.RollupStatus, error) {
|
|
if len(hashes) == 0 {
|
|
return make([]types.RollupStatus, 0), nil
|
|
}
|
|
|
|
query, args, err := sqlx.In("SELECT hash, rollup_status FROM block_batch WHERE hash IN (?);", hashes)
|
|
if err != nil {
|
|
return make([]types.RollupStatus, 0), err
|
|
}
|
|
// sqlx.In returns queries with the `?` bindvar, we can rebind it for our backend
|
|
query = o.db.Rebind(query)
|
|
|
|
rows, err := o.db.Query(query, args...)
|
|
|
|
statusMap := make(map[string]types.RollupStatus)
|
|
for rows.Next() {
|
|
var hash string
|
|
var status types.RollupStatus
|
|
if err = rows.Scan(&hash, &status); err != nil {
|
|
break
|
|
}
|
|
statusMap[hash] = status
|
|
}
|
|
var statuses []types.RollupStatus
|
|
if err != nil {
|
|
return statuses, err
|
|
}
|
|
|
|
for _, hash := range hashes {
|
|
statuses = append(statuses, statusMap[hash])
|
|
}
|
|
|
|
return statuses, nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) GetCommitTxHash(hash string) (sql.NullString, error) {
|
|
row := o.db.QueryRow(`SELECT commit_tx_hash FROM block_batch WHERE hash = $1`, hash)
|
|
var commitTXHash sql.NullString
|
|
if err := row.Scan(&commitTXHash); err != nil {
|
|
return sql.NullString{}, err
|
|
}
|
|
return commitTXHash, nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) GetFinalizeTxHash(hash string) (sql.NullString, error) {
|
|
row := o.db.QueryRow(`SELECT finalize_tx_hash FROM block_batch WHERE hash = $1`, hash)
|
|
var finalizeTxHash sql.NullString
|
|
if err := row.Scan(&finalizeTxHash); err != nil {
|
|
return sql.NullString{}, err
|
|
}
|
|
return finalizeTxHash, nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) UpdateRollupStatus(ctx context.Context, hash string, status types.RollupStatus) error {
|
|
switch status {
|
|
case types.RollupCommitted:
|
|
_, err := o.db.Exec(o.db.Rebind("update block_batch set rollup_status = ?, committed_at = ? where hash = ?;"), status, time.Now(), hash)
|
|
return err
|
|
case types.RollupFinalized:
|
|
_, err := o.db.Exec(o.db.Rebind("update block_batch set rollup_status = ?, finalized_at = ? where hash = ?;"), status, time.Now(), hash)
|
|
return err
|
|
default:
|
|
_, err := o.db.Exec(o.db.Rebind("update block_batch set rollup_status = ? where hash = ?;"), status, hash)
|
|
return err
|
|
}
|
|
}
|
|
|
|
func (o *blockBatchOrm) UpdateCommitTxHashAndRollupStatus(ctx context.Context, hash string, commitTxHash string, status types.RollupStatus) error {
|
|
switch status {
|
|
case types.RollupCommitted:
|
|
_, err := o.db.Exec(o.db.Rebind("update block_batch set commit_tx_hash = ?, rollup_status = ?, committed_at = ? where hash = ?;"), commitTxHash, status, time.Now(), hash)
|
|
return err
|
|
default:
|
|
_, err := o.db.Exec(o.db.Rebind("update block_batch set commit_tx_hash = ?, rollup_status = ? where hash = ?;"), commitTxHash, status, hash)
|
|
return err
|
|
}
|
|
}
|
|
|
|
func (o *blockBatchOrm) UpdateFinalizeTxHashAndRollupStatus(ctx context.Context, hash string, finalizeTxHash string, status types.RollupStatus) error {
|
|
switch status {
|
|
case types.RollupFinalized:
|
|
_, err := o.db.Exec(o.db.Rebind("update block_batch set finalize_tx_hash = ?, rollup_status = ?, finalized_at = ? where hash = ?;"), finalizeTxHash, status, time.Now(), hash)
|
|
return err
|
|
default:
|
|
_, err := o.db.Exec(o.db.Rebind("update block_batch set finalize_tx_hash = ?, rollup_status = ? where hash = ?;"), finalizeTxHash, status, hash)
|
|
return err
|
|
}
|
|
}
|
|
|
|
func (o *blockBatchOrm) GetAssignedBatchHashes() ([]string, error) {
|
|
rows, err := o.db.Queryx(`SELECT hash FROM block_batch WHERE proving_status IN ($1, $2)`, types.ProvingTaskAssigned, types.ProvingTaskProved)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var hashes []string
|
|
for rows.Next() {
|
|
var hash string
|
|
if err = rows.Scan(&hash); err != nil {
|
|
break
|
|
}
|
|
hashes = append(hashes, hash)
|
|
}
|
|
|
|
return hashes, rows.Close()
|
|
}
|
|
|
|
func (o *blockBatchOrm) GetBatchCount() (int64, error) {
|
|
row := o.db.QueryRow(`select count(*) from block_batch`)
|
|
var count int64
|
|
if err := row.Scan(&count); err != nil {
|
|
return -1, err
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) UpdateSkippedBatches() (int64, error) {
|
|
res, err := o.db.Exec(o.db.Rebind("update block_batch set rollup_status = ? where (proving_status = ? or proving_status = ?) and rollup_status = ?;"), types.RollupFinalizationSkipped, types.ProvingTaskSkipped, types.ProvingTaskFailed, types.RollupCommitted)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
count, err := res.RowsAffected()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return count, nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) UpdateL2OracleTxHash(ctx context.Context, hash, txHash string) error {
|
|
if _, err := o.db.ExecContext(ctx, o.db.Rebind("update block_batch set oracle_tx_hash = ? where hash = ?;"), txHash, hash); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) UpdateL2GasOracleStatus(ctx context.Context, hash string, status types.GasOracleStatus) error {
|
|
if _, err := o.db.ExecContext(ctx, o.db.Rebind("update block_batch set oracle_status = ? where hash = ?;"), status, hash); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (o *blockBatchOrm) UpdateL2GasOracleStatusAndOracleTxHash(ctx context.Context, hash string, status types.GasOracleStatus, txHash string) error {
|
|
if _, err := o.db.ExecContext(ctx, o.db.Rebind("update block_batch set oracle_status = ?, oracle_tx_hash = ? where hash = ?;"), status, txHash, hash); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|