mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-13 07:57:58 -05:00
Co-authored-by: noelwei <fan@scroll.io> Co-authored-by: Rohit Narurkar <rohit.narurkar@proton.me> Co-authored-by: jonastheis <4181434+jonastheis@users.noreply.github.com> Co-authored-by: colinlyguo <colinlyguo@scroll.io> Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com> Co-authored-by: Morty <70688412+yiweichi@users.noreply.github.com> Co-authored-by: omerfirmak <omerfirmak@users.noreply.github.com>
246 lines
9.8 KiB
Go
246 lines
9.8 KiB
Go
package orm
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/scroll-tech/go-ethereum/log"
|
|
"gorm.io/gorm"
|
|
|
|
"scroll-tech/common/types"
|
|
"scroll-tech/common/utils"
|
|
)
|
|
|
|
// Bundle represents a bundle of batches.
|
|
type Bundle struct {
|
|
db *gorm.DB `gorm:"column:-"`
|
|
|
|
Index uint64 `json:"index" gorm:"column:index"`
|
|
Hash string `json:"hash" gorm:"column:hash"`
|
|
StartBatchIndex uint64 `json:"start_batch_index" gorm:"column:start_batch_index"`
|
|
StartBatchHash string `json:"start_batch_hash" gorm:"column:start_batch_hash"`
|
|
EndBatchIndex uint64 `json:"end_batch_index" gorm:"column:end_batch_index"`
|
|
EndBatchHash string `json:"end_batch_hash" gorm:"column:end_batch_hash"`
|
|
|
|
// proof
|
|
BatchProofsStatus int16 `json:"batch_proofs_status" gorm:"column:batch_proofs_status;default:1"`
|
|
ProvingStatus int16 `json:"proving_status" gorm:"column:proving_status;default:1"`
|
|
Proof []byte `json:"proof" gorm:"column:proof;default:NULL"`
|
|
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
|
|
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
|
|
TotalAttempts int16 `json:"total_attempts" gorm:"column:total_attempts;default:0"`
|
|
ActiveAttempts int16 `json:"active_attempts" gorm:"column:active_attempts;default:0"`
|
|
|
|
// rollup
|
|
RollupStatus int16 `json:"rollup_status" gorm:"column:rollup_status;default:1"`
|
|
FinalizeTxHash string `json:"finalize_tx_hash" gorm:"column:finalize_tx_hash;default:NULL"`
|
|
FinalizedAt *time.Time `json:"finalized_at" gorm:"column:finalized_at;default:NULL"`
|
|
|
|
// metadata
|
|
CreatedAt time.Time `json:"created_at" gorm:"column:created_at"`
|
|
UpdatedAt time.Time `json:"updated_at" gorm:"column:updated_at"`
|
|
DeletedAt gorm.DeletedAt `json:"deleted_at" gorm:"column:deleted_at;default:NULL"`
|
|
}
|
|
|
|
// NewBundle creates a new Bundle database instance.
|
|
func NewBundle(db *gorm.DB) *Bundle {
|
|
return &Bundle{db: db}
|
|
}
|
|
|
|
// TableName returns the table name for the Bundle model.
|
|
func (*Bundle) TableName() string {
|
|
return "bundle"
|
|
}
|
|
|
|
// GetUnassignedBundle retrieves unassigned bundle based on the specified limit.
|
|
// The returned batch sorts in ascending order by their index.
|
|
func (o *Bundle) GetUnassignedBundle(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Bundle, error) {
|
|
var bundle Bundle
|
|
db := o.db.WithContext(ctx)
|
|
sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND codec_version != 5 AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT 1;",
|
|
int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady))
|
|
err := db.Raw(sql).Scan(&bundle).Error
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Batch.GetUnassignedBundle error: %w", err)
|
|
}
|
|
if bundle.StartBatchHash == "" || bundle.EndBatchHash == "" {
|
|
return nil, nil
|
|
}
|
|
return &bundle, nil
|
|
}
|
|
|
|
// GetUnassignedBundleCount retrieves unassigned bundle count.
|
|
func (o *Bundle) GetUnassignedBundleCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (int64, error) {
|
|
var count int64
|
|
db := o.db.WithContext(ctx)
|
|
db = db.Model(&Bundle{})
|
|
db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
|
|
db = db.Where("total_attempts < ?", maxTotalAttempts)
|
|
db = db.Where("active_attempts < ?", maxActiveAttempts)
|
|
db = db.Where("batch_proofs_status = ?", int(types.BatchProofsStatusReady))
|
|
db = db.Where("codec_version != 5")
|
|
db = db.Where("bundle.deleted_at IS NULL")
|
|
if err := db.Count(&count).Error; err != nil {
|
|
return 0, fmt.Errorf("Bundle.GetUnassignedBundleCount error: %w", err)
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
// GetAssignedBundle retrieves assigned bundle based on the specified limit.
|
|
// The returned bundle sorts in ascending order by their index.
|
|
func (o *Bundle) GetAssignedBundle(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Bundle, error) {
|
|
var bundle Bundle
|
|
db := o.db.WithContext(ctx)
|
|
sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT 1;",
|
|
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady))
|
|
err := db.Raw(sql).Scan(&bundle).Error
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Bundle.GetAssignedBatch error: %w", err)
|
|
}
|
|
if bundle.StartBatchHash == "" || bundle.EndBatchHash == "" {
|
|
return nil, nil
|
|
}
|
|
return &bundle, nil
|
|
}
|
|
|
|
// GetProvingStatusByHash retrieves the proving status of a bundle given its hash.
|
|
func (o *Bundle) GetProvingStatusByHash(ctx context.Context, hash string) (types.ProvingStatus, error) {
|
|
db := o.db.WithContext(ctx)
|
|
db = db.Model(&Bundle{})
|
|
db = db.Select("proving_status")
|
|
db = db.Where("hash = ?", hash)
|
|
|
|
var bundle Bundle
|
|
if err := db.Find(&bundle).Error; err != nil {
|
|
return types.ProvingStatusUndefined, fmt.Errorf("Bundle.GetProvingStatusByHash error: %w, batch hash: %v", err, hash)
|
|
}
|
|
return types.ProvingStatus(bundle.ProvingStatus), nil
|
|
}
|
|
|
|
// GetBundleByHash retrieves the given
|
|
func (o *Bundle) GetBundleByHash(ctx context.Context, bundleHash string) (*Bundle, error) {
|
|
db := o.db.WithContext(ctx)
|
|
db = db.Model(&Bundle{})
|
|
db = db.Where("hash = ?", bundleHash)
|
|
|
|
var bundle Bundle
|
|
if err := db.First(&bundle).Error; err != nil {
|
|
return nil, fmt.Errorf("Bundle.GetBundleByHash error: %w, bundle hash: %v", err, bundleHash)
|
|
}
|
|
return &bundle, nil
|
|
}
|
|
|
|
// GetUnassignedAndBatchesUnreadyBundles get the bundles which is unassigned and batches are not ready
|
|
func (o *Bundle) GetUnassignedAndBatchesUnreadyBundles(ctx context.Context, offset, limit int) ([]*Bundle, error) {
|
|
if offset < 0 || limit < 0 {
|
|
return nil, errors.New("limit and offset must not be smaller than 0")
|
|
}
|
|
|
|
db := o.db.WithContext(ctx)
|
|
db = db.Where("proving_status = ?", types.ProvingTaskUnassigned)
|
|
db = db.Where("batch_proofs_status = ?", types.BatchProofsStatusPending)
|
|
db = db.Order("index ASC")
|
|
db = db.Offset(offset)
|
|
db = db.Limit(limit)
|
|
|
|
var bundles []*Bundle
|
|
if err := db.Find(&bundles).Error; err != nil {
|
|
return nil, fmt.Errorf("Bundle.GetUnassignedAndBatchesUnreadyBundles error: %w", err)
|
|
}
|
|
return bundles, nil
|
|
}
|
|
|
|
// UpdateBatchProofsStatusByBundleHash updates the status of batch_proofs_status field for a given bundle hash.
|
|
func (o *Bundle) UpdateBatchProofsStatusByBundleHash(ctx context.Context, bundleHash string, status types.BatchProofsStatus) error {
|
|
db := o.db.WithContext(ctx)
|
|
db = db.Model(&Bundle{})
|
|
db = db.Where("hash = ?", bundleHash)
|
|
|
|
if err := db.Update("batch_proofs_status", status).Error; err != nil {
|
|
return fmt.Errorf("Bundle.UpdateBatchProofsStatusByBundleHash error: %w, bundle hash: %v, status: %v", err, bundleHash, status.String())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateProvingStatusFailed updates the proving status failed of a bundle.
|
|
func (o *Bundle) UpdateProvingStatusFailed(ctx context.Context, bundleHash string, maxAttempts uint8, dbTX ...*gorm.DB) error {
|
|
db := o.db
|
|
if len(dbTX) > 0 && dbTX[0] != nil {
|
|
db = dbTX[0]
|
|
}
|
|
db = db.WithContext(ctx)
|
|
db = db.Model(&Bundle{})
|
|
db = db.Where("hash", bundleHash)
|
|
db = db.Where("total_attempts >= ?", maxAttempts)
|
|
db = db.Where("proving_status != ?", int(types.ProvingTaskVerified))
|
|
if err := db.Update("proving_status", int(types.ProvingTaskFailed)).Error; err != nil {
|
|
return fmt.Errorf("Bundle.UpdateProvingStatus error: %w, bundle hash: %v, status: %v", err, bundleHash, types.ProvingTaskFailed.String())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateProofAndProvingStatusByHash updates the bundle proof and proving status by hash.
|
|
func (o *Bundle) UpdateProofAndProvingStatusByHash(ctx context.Context, hash string, proof []byte, provingStatus types.ProvingStatus, proofTimeSec uint64, dbTX ...*gorm.DB) error {
|
|
db := o.db
|
|
if len(dbTX) > 0 && dbTX[0] != nil {
|
|
db = dbTX[0]
|
|
}
|
|
|
|
updateFields := make(map[string]interface{})
|
|
updateFields["proof"] = proof
|
|
updateFields["proving_status"] = provingStatus
|
|
updateFields["proof_time_sec"] = proofTimeSec
|
|
updateFields["proved_at"] = utils.NowUTC()
|
|
|
|
db = db.WithContext(ctx)
|
|
db = db.Model(&Bundle{})
|
|
db = db.Where("hash", hash)
|
|
|
|
if err := db.Updates(updateFields).Error; err != nil {
|
|
return fmt.Errorf("Batch.UpdateProofByHash error: %w, batch hash: %v", err, hash)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateBundleAttempts atomically increments the attempts count for the earliest available bundle that meets the conditions.
|
|
func (o *Bundle) UpdateBundleAttempts(ctx context.Context, hash string, curActiveAttempts, curTotalAttempts int16) (int64, error) {
|
|
db := o.db.WithContext(ctx)
|
|
db = db.Model(&Bundle{})
|
|
db = db.Where("hash = ?", hash)
|
|
db = db.Where("active_attempts = ?", curActiveAttempts)
|
|
db = db.Where("total_attempts = ?", curTotalAttempts)
|
|
result := db.Updates(map[string]interface{}{
|
|
"proving_status": types.ProvingTaskAssigned,
|
|
"total_attempts": gorm.Expr("total_attempts + 1"),
|
|
"active_attempts": gorm.Expr("active_attempts + 1"),
|
|
})
|
|
|
|
if result.Error != nil {
|
|
return 0, fmt.Errorf("failed to update bundle, err:%w", result.Error)
|
|
}
|
|
return result.RowsAffected, nil
|
|
}
|
|
|
|
// DecreaseActiveAttemptsByHash decrements the active_attempts of a bundle given its hash.
|
|
func (o *Bundle) DecreaseActiveAttemptsByHash(ctx context.Context, bundleHash string, dbTX ...*gorm.DB) error {
|
|
db := o.db
|
|
if len(dbTX) > 0 && dbTX[0] != nil {
|
|
db = dbTX[0]
|
|
}
|
|
db = db.WithContext(ctx)
|
|
db = db.Model(&Bundle{})
|
|
db = db.Where("hash = ?", bundleHash)
|
|
db = db.Where("proving_status != ?", int(types.ProvingTaskVerified))
|
|
db = db.Where("active_attempts > ?", 0)
|
|
result := db.UpdateColumn("active_attempts", gorm.Expr("active_attempts - 1"))
|
|
if result.Error != nil {
|
|
return fmt.Errorf("Bundle.DecreaseActiveAttemptsByHash error: %w, bundle hash: %v", result.Error, bundleHash)
|
|
}
|
|
if result.RowsAffected == 0 {
|
|
log.Warn("No rows were affected in DecreaseActiveAttemptsByHash", "bundle hash", bundleHash)
|
|
}
|
|
return nil
|
|
}
|