mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-13 16:08:04 -05:00
Compare commits
32 Commits
bump-versi
...
v4.5.22-rc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
19b59b9003 | ||
|
|
b6e7c3ea62 | ||
|
|
4c988a9b7a | ||
|
|
4c5a1bf6d8 | ||
|
|
66ebecfc4d | ||
|
|
6d08239f2d | ||
|
|
8ac3181018 | ||
|
|
c4c765fd7b | ||
|
|
bf8a149146 | ||
|
|
94c8e4a0aa | ||
|
|
8964e3f644 | ||
|
|
1ba262a2ad | ||
|
|
4f9a98efe2 | ||
|
|
a434b2c736 | ||
|
|
0570525570 | ||
|
|
1d8a48ad30 | ||
|
|
ef88fef62a | ||
|
|
b1a8ba73c6 | ||
|
|
030742cb78 | ||
|
|
03410b72d2 | ||
|
|
b6bf8052e3 | ||
|
|
6a499fa30f | ||
|
|
d623baf792 | ||
|
|
e84a2b8557 | ||
|
|
6c68f3c9c6 | ||
|
|
a8c913fec4 | ||
|
|
b9fa8c8cb3 | ||
|
|
e8e7fb15a6 | ||
|
|
2acce16ff2 | ||
|
|
385d97cb49 | ||
|
|
4bf419693e | ||
|
|
9aee0d6e6e |
@@ -5,7 +5,7 @@ import (
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
var tag = "v4.5.24"
|
||||
var tag = "v4.5.22"
|
||||
|
||||
var commit = func() string {
|
||||
if info, ok := debug.ReadBuildInfo(); ok {
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
|
||||
CREATE TABLE blob_upload (
|
||||
batch_index BIGINT NOT NULL,
|
||||
batch_hash VARCHAR NOT NULL,
|
||||
|
||||
platform SMALLINT NOT NULL,
|
||||
status SMALLINT NOT NULL,
|
||||
@@ -14,15 +13,20 @@ CREATE TABLE blob_upload (
|
||||
deleted_at TIMESTAMP(0) DEFAULT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS batch_index_batch_hash_platform_uindex
|
||||
ON blob_upload(batch_index, batch_hash, platform) WHERE deleted_at IS NULL;
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS batch_index_platform_uindex
|
||||
ON blob_upload(batch_index, platform) WHERE deleted_at IS NULL;
|
||||
|
||||
COMMENT ON COLUMN blob_upload.status IS 'undefined, pending, uploaded, failed';
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index ON blob_upload(batch_index) WHERE deleted_at IS NULL;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_blob_upload_platform ON blob_upload(platform) WHERE deleted_at IS NULL;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_blob_upload_status ON blob_upload(status) WHERE deleted_at IS NULL;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_blob_upload_status_platform ON blob_upload(status, platform) WHERE deleted_at IS NULL;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index_batch_hash_status_platform
|
||||
ON blob_upload(batch_index, batch_hash, status, platform) WHERE deleted_at IS NULL;
|
||||
CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index_status_platform ON blob_upload(batch_index, status, platform) WHERE deleted_at IS NULL;
|
||||
|
||||
-- +goose StatementEnd
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ require (
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/prometheus/client_golang v1.16.0
|
||||
github.com/scroll-tech/da-codec v0.1.3-0.20250519114140-bfa7133d4ad1
|
||||
github.com/scroll-tech/go-ethereum v1.10.14-0.20250610090337-00c5c2bd3e65
|
||||
github.com/scroll-tech/go-ethereum v1.10.14-0.20250305151038-478940e79601
|
||||
github.com/smartystreets/goconvey v1.8.0
|
||||
github.com/spf13/viper v1.19.0
|
||||
github.com/stretchr/testify v1.10.0
|
||||
|
||||
@@ -287,8 +287,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g
|
||||
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
|
||||
github.com/scroll-tech/da-codec v0.1.3-0.20250519114140-bfa7133d4ad1 h1:6aKqJSal+QVdB5HMWMs0JTbAIZ6/iAHJx9qizz0w9dU=
|
||||
github.com/scroll-tech/da-codec v0.1.3-0.20250519114140-bfa7133d4ad1/go.mod h1:yhTS9OVC0xQGhg7DN5iV5KZJvnSIlFWAxDdp+6jxQtY=
|
||||
github.com/scroll-tech/go-ethereum v1.10.14-0.20250610090337-00c5c2bd3e65 h1:idsnkl5rwVr7eNUB0HxdkKI0L3zbTm8XSGwMTB5ndws=
|
||||
github.com/scroll-tech/go-ethereum v1.10.14-0.20250610090337-00c5c2bd3e65/go.mod h1:756YMENiSfx/5pCwKq3+uSTWjXuHTbiCB+TirJjsQT8=
|
||||
github.com/scroll-tech/go-ethereum v1.10.14-0.20250305151038-478940e79601 h1:NEsjCG6uSvLRBlsP3+x6PL1kM+Ojs3g8UGotIPgJSz8=
|
||||
github.com/scroll-tech/go-ethereum v1.10.14-0.20250305151038-478940e79601/go.mod h1:OblWe1+QrZwdpwO0j/LY3BSGuKT3YPUFBDQQgvvfStQ=
|
||||
github.com/scroll-tech/zktrie v0.8.4 h1:UagmnZ4Z3ITCk+aUq9NQZJNAwnWl4gSxsLb2Nl7IgRE=
|
||||
github.com/scroll-tech/zktrie v0.8.4/go.mod h1:XvNo7vAk8yxNyTjBDj5WIiFzYW4bx/gJ78+NK6Zn6Uk=
|
||||
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
|
||||
|
||||
@@ -2,7 +2,6 @@ package blob_uploader
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@@ -68,7 +67,7 @@ func (b *BlobUploader) UploadBlobToS3() {
|
||||
}
|
||||
|
||||
// get un-uploaded batches from database in ascending order by their index.
|
||||
dbBatch, err := b.GetFirstUnuploadedBatchByPlatform(b.ctx, b.cfg.StartBatch, types.BlobStoragePlatformS3)
|
||||
dbBatch, err := b.batchOrm.GetFirstUnuploadedBatchByPlatform(b.ctx, b.cfg.StartBatch, types.BlobStoragePlatformS3)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch unuploaded batch", "err", err)
|
||||
return
|
||||
@@ -86,7 +85,7 @@ func (b *BlobUploader) UploadBlobToS3() {
|
||||
if err != nil {
|
||||
log.Error("failed to construct constructBlobCodec payload ", "codecVersion", codecVersion, "batch index", dbBatch.Index, "err", err)
|
||||
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
|
||||
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
|
||||
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
|
||||
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr)
|
||||
}
|
||||
return
|
||||
@@ -98,7 +97,7 @@ func (b *BlobUploader) UploadBlobToS3() {
|
||||
log.Error("failed to calculate versioned blob hash", "batch index", dbBatch.Index, "err", err)
|
||||
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
|
||||
// update status to failed
|
||||
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
|
||||
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
|
||||
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr)
|
||||
}
|
||||
return
|
||||
@@ -111,14 +110,14 @@ func (b *BlobUploader) UploadBlobToS3() {
|
||||
log.Error("failed to upload blob data to AWS S3", "batch index", dbBatch.Index, "versioned blob hash", key, "err", err)
|
||||
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
|
||||
// update status to failed
|
||||
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
|
||||
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
|
||||
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// update status to uploaded
|
||||
if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil {
|
||||
if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil {
|
||||
log.Error("failed to update blob upload status to uploaded", "batch index", dbBatch.Index, "err", err)
|
||||
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
|
||||
return
|
||||
@@ -196,56 +195,3 @@ func (b *BlobUploader) constructBlobCodec(dbBatch *orm.Batch) (*kzg4844.Blob, er
|
||||
|
||||
return daBatch.Blob(), nil
|
||||
}
|
||||
|
||||
// GetFirstUnuploadedBatchByPlatform retrieves the first batch that either hasn't been uploaded to corresponding blob storage service
|
||||
// The batch must have a commit_tx_hash (committed).
|
||||
func (b *BlobUploader) GetFirstUnuploadedBatchByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (*orm.Batch, error) {
|
||||
batchIndex, err := b.blobUploadOrm.GetNextBatchIndexToUploadByPlatform(ctx, startBatch, platform)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var batch *orm.Batch
|
||||
for {
|
||||
var err error
|
||||
batch, err = b.batchOrm.GetBatchByIndex(ctx, batchIndex)
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
log.Debug("got batch not proposed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// to check if the parent batch uploaded
|
||||
// if no, there is a batch revert happened, we need to fallback to upload previous batch
|
||||
// skip the check if the parent batch is genesis batch
|
||||
if batchIndex <= 1 || batchIndex == startBatch {
|
||||
break
|
||||
}
|
||||
fields := map[string]interface{}{
|
||||
"batch_index = ?": batchIndex - 1,
|
||||
"batch_hash = ?": batch.ParentBatchHash,
|
||||
"platform = ?": platform,
|
||||
"status = ?": types.BlobUploadStatusUploaded,
|
||||
}
|
||||
blobUpload, err := b.blobUploadOrm.GetBlobUploads(ctx, fields, nil, 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(blobUpload) == 0 {
|
||||
batchIndex--
|
||||
continue
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
if len(batch.CommitTxHash) == 0 {
|
||||
log.Debug("got batch not committed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return batch, nil
|
||||
}
|
||||
|
||||
@@ -263,6 +263,44 @@ func (o *Batch) GetBatchByIndex(ctx context.Context, index uint64) (*Batch, erro
|
||||
return &batch, nil
|
||||
}
|
||||
|
||||
// GetFirstUnuploadedBatchByPlatform retrieves the first batch that either hasn't been uploaded to corresponding blob storage service
|
||||
// The batch must have a commit_tx_hash (committed).
|
||||
func (o *Batch) GetFirstUnuploadedBatchByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (*Batch, error) {
|
||||
db := o.db.WithContext(ctx)
|
||||
db = db.Model(&BlobUpload{})
|
||||
db = db.Where("platform = ? AND status = ?", platform, types.BlobUploadStatusUploaded)
|
||||
db = db.Order("batch_index DESC")
|
||||
db = db.Limit(1)
|
||||
|
||||
var blobUpload BlobUpload
|
||||
var batchIndex uint64
|
||||
if err := db.First(&blobUpload).Error; err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
batchIndex = startBatch
|
||||
} else {
|
||||
return nil, fmt.Errorf("Batch.GetFirstUnuploadedBatchByPlatform error: %w", err)
|
||||
}
|
||||
} else {
|
||||
batchIndex = blobUpload.BatchIndex + 1
|
||||
}
|
||||
|
||||
batch, err := o.GetBatchByIndex(ctx, batchIndex)
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
log.Debug("got batch not proposed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
|
||||
return nil, nil
|
||||
}
|
||||
return nil, fmt.Errorf("Batch.GetFirstUnuploadedBatchByPlatform error: %w", err)
|
||||
}
|
||||
|
||||
if len(batch.CommitTxHash) == 0 {
|
||||
log.Debug("got batch not committed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return batch, nil
|
||||
}
|
||||
|
||||
// InsertBatch inserts a new batch into the database.
|
||||
func (o *Batch) InsertBatch(ctx context.Context, batch *encoding.Batch, codecVersion encoding.CodecVersion, metrics rutils.BatchMetrics, dbTX ...*gorm.DB) (*Batch, error) {
|
||||
if batch == nil {
|
||||
|
||||
@@ -2,11 +2,11 @@ package orm
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
|
||||
"scroll-tech/common/types"
|
||||
)
|
||||
@@ -16,9 +16,8 @@ type BlobUpload struct {
|
||||
db *gorm.DB `gorm:"-"`
|
||||
|
||||
// blob upload
|
||||
BatchIndex uint64 `json:"batch_index" gorm:"column:batch_index"`
|
||||
BatchHash string `json:"batch_hash" gorm:"column:batch_hash"`
|
||||
Platform int16 `json:"platform" gorm:"column:platform"`
|
||||
BatchIndex uint64 `json:"batch_index" gorm:"column:batch_index;primaryKey"`
|
||||
Platform int16 `json:"platform" gorm:"column:platform;primaryKey"`
|
||||
Status int16 `json:"status" gorm:"column:status"`
|
||||
|
||||
// metadata
|
||||
@@ -37,87 +36,23 @@ func (*BlobUpload) TableName() string {
|
||||
return "blob_upload"
|
||||
}
|
||||
|
||||
// GetNextBatchIndexToUploadByPlatform retrieves the next batch index that hasn't been uploaded to corresponding blob storage service
|
||||
func (o *BlobUpload) GetNextBatchIndexToUploadByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (uint64, error) {
|
||||
db := o.db.WithContext(ctx)
|
||||
db = db.Model(&BlobUpload{})
|
||||
db = db.Where("platform = ? AND status = ?", platform, types.BlobUploadStatusUploaded)
|
||||
db = db.Order("batch_index DESC")
|
||||
db = db.Limit(1)
|
||||
|
||||
var blobUpload BlobUpload
|
||||
var batchIndex uint64
|
||||
if err := db.First(&blobUpload).Error; err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
batchIndex = startBatch
|
||||
} else {
|
||||
return 0, fmt.Errorf("BlobUpload.GetNextBatchIndexToUploadByPlatform error: %w", err)
|
||||
}
|
||||
} else {
|
||||
batchIndex = blobUpload.BatchIndex + 1
|
||||
}
|
||||
|
||||
return batchIndex, nil
|
||||
}
|
||||
|
||||
// GetBlobUpload retrieves the selected blob uploads from the database.
|
||||
func (o *BlobUpload) GetBlobUploads(ctx context.Context, fields map[string]interface{}, orderByList []string, limit int) ([]*BlobUpload, error) {
|
||||
db := o.db.WithContext(ctx)
|
||||
db = db.Model(&BlobUpload{})
|
||||
|
||||
for key, value := range fields {
|
||||
db = db.Where(key, value)
|
||||
}
|
||||
|
||||
for _, orderBy := range orderByList {
|
||||
db = db.Order(orderBy)
|
||||
}
|
||||
|
||||
if limit > 0 {
|
||||
db = db.Limit(limit)
|
||||
}
|
||||
|
||||
db = db.Order("batch_index ASC")
|
||||
|
||||
var blobUploads []*BlobUpload
|
||||
if err := db.Find(&blobUploads).Error; err != nil {
|
||||
return nil, fmt.Errorf("BlobUpload.GetBlobUploads error: %w", err)
|
||||
}
|
||||
|
||||
return blobUploads, nil
|
||||
}
|
||||
|
||||
// InsertOrUpdateBlobUpload inserts a new blob upload record or updates the existing one.
|
||||
func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex uint64, batchHash string, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error {
|
||||
func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex uint64, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error {
|
||||
db := o.db
|
||||
if len(dbTX) > 0 && dbTX[0] != nil {
|
||||
db = dbTX[0]
|
||||
}
|
||||
db = db.WithContext(ctx)
|
||||
|
||||
var existing BlobUpload
|
||||
err := db.Where("batch_index = ? AND batch_hash = ? AND platform = ? AND deleted_at IS NULL",
|
||||
batchIndex, batchHash, int16(platform),
|
||||
).First(&existing).Error
|
||||
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
newRecord := BlobUpload{
|
||||
BatchIndex: batchIndex,
|
||||
BatchHash: batchHash,
|
||||
Platform: int16(platform),
|
||||
Status: int16(status),
|
||||
}
|
||||
if err := db.Create(&newRecord).Error; err != nil {
|
||||
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload insert error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform)
|
||||
}
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload query error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform)
|
||||
blobUpload := &BlobUpload{
|
||||
BatchIndex: batchIndex,
|
||||
Platform: int16(platform),
|
||||
Status: int16(status),
|
||||
}
|
||||
|
||||
if err := db.Model(&existing).Update("status", int16(status)).Error; err != nil {
|
||||
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload update error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform)
|
||||
if err := db.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "batch_index"}, {Name: "platform"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{"status"}),
|
||||
}).Create(blobUpload).Error; err != nil {
|
||||
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload error: %w, batch index: %v, platform: %v", err, batchIndex, platform)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user