Compare commits

..

32 Commits

Author SHA1 Message Date
yiweichi
19b59b9003 chore: auto version bump [bot] 2025-06-10 08:45:40 +00:00
Morty
b6e7c3ea62 fix: use unique key 2025-06-09 17:25:46 +08:00
Morty
4c988a9b7a fix: address comments 2025-06-09 16:46:05 +08:00
Morty
4c5a1bf6d8 fix: address comments 2025-06-08 21:44:33 +08:00
Morty
66ebecfc4d feat: add blob-uploader docker ci 2025-06-06 01:56:23 +08:00
Morty
6d08239f2d fix: logs 2025-06-06 01:44:42 +08:00
Morty
8ac3181018 fix: typo 2025-06-06 01:24:58 +08:00
Morty
c4c765fd7b perfect logs 2025-06-06 01:23:21 +08:00
Morty
bf8a149146 fix: typo 2025-06-06 00:51:56 +08:00
Morty
94c8e4a0aa fix: ci 2025-06-06 00:41:49 +08:00
Morty
8964e3f644 Merge branch 'develop' into feat-add-blob-storage-service 2025-06-06 00:03:47 +08:00
Morty
1ba262a2ad fix: remove left join 2025-06-06 00:03:30 +08:00
Morty
4f9a98efe2 fix: address comments 2025-06-05 00:14:55 +08:00
Morty
a434b2c736 Apply suggestions from code review
Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>
2025-06-04 15:30:51 +08:00
Morty
0570525570 Update rollup/cmd/blob_uploader/app/app.go
Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>
2025-06-04 15:26:16 +08:00
Morty
1d8a48ad30 fix: constructBlobCodec 2025-06-04 03:48:09 +08:00
Morty
ef88fef62a fix: address comments 2025-06-04 02:25:04 +08:00
Morty
b1a8ba73c6 fix: ci 2025-06-04 02:11:08 +08:00
Morty
030742cb78 feat: support multi codec version 2025-06-04 02:08:29 +08:00
Morty
03410b72d2 fix: database test 2025-06-03 20:18:11 +08:00
Morty
b6bf8052e3 fix: ci 2025-06-03 20:11:54 +08:00
Morty
6a499fa30f rm logs 2025-06-03 05:24:52 +08:00
Morty
d623baf792 fix: GetFirstUnuploadedAndFailedBatch 2025-06-03 04:47:57 +08:00
Morty
e84a2b8557 debug logs 2025-06-03 04:12:52 +08:00
Morty
6c68f3c9c6 debug logs 2025-06-03 04:10:19 +08:00
Morty
a8c913fec4 fix: database migrate 2025-06-03 04:04:56 +08:00
Morty
b9fa8c8cb3 fix: database migrate 2025-06-03 03:52:49 +08:00
Morty
e8e7fb15a6 fix: app.go 2025-06-03 02:33:16 +08:00
Morty
2acce16ff2 feat: add start batch 2025-06-03 01:33:04 +08:00
Morty
385d97cb49 chores 2025-06-03 01:22:13 +08:00
Morty
4bf419693e feat: add blob uploader aws s3 2025-05-31 06:45:10 +08:00
Morty
9aee0d6e6e feat: add blob uploader service 2025-05-30 05:13:03 +08:00
7 changed files with 69 additions and 146 deletions

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "v4.5.24"
var tag = "v4.5.22"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -3,7 +3,6 @@
CREATE TABLE blob_upload (
batch_index BIGINT NOT NULL,
batch_hash VARCHAR NOT NULL,
platform SMALLINT NOT NULL,
status SMALLINT NOT NULL,
@@ -14,15 +13,20 @@ CREATE TABLE blob_upload (
deleted_at TIMESTAMP(0) DEFAULT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS batch_index_batch_hash_platform_uindex
ON blob_upload(batch_index, batch_hash, platform) WHERE deleted_at IS NULL;
CREATE UNIQUE INDEX IF NOT EXISTS batch_index_platform_uindex
ON blob_upload(batch_index, platform) WHERE deleted_at IS NULL;
COMMENT ON COLUMN blob_upload.status IS 'undefined, pending, uploaded, failed';
CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index ON blob_upload(batch_index) WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_blob_upload_platform ON blob_upload(platform) WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_blob_upload_status ON blob_upload(status) WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_blob_upload_status_platform ON blob_upload(status, platform) WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index_batch_hash_status_platform
ON blob_upload(batch_index, batch_hash, status, platform) WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index_status_platform ON blob_upload(batch_index, status, platform) WHERE deleted_at IS NULL;
-- +goose StatementEnd

View File

@@ -16,7 +16,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/prometheus/client_golang v1.16.0
github.com/scroll-tech/da-codec v0.1.3-0.20250519114140-bfa7133d4ad1
github.com/scroll-tech/go-ethereum v1.10.14-0.20250610090337-00c5c2bd3e65
github.com/scroll-tech/go-ethereum v1.10.14-0.20250305151038-478940e79601
github.com/smartystreets/goconvey v1.8.0
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.10.0

View File

@@ -287,8 +287,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/scroll-tech/da-codec v0.1.3-0.20250519114140-bfa7133d4ad1 h1:6aKqJSal+QVdB5HMWMs0JTbAIZ6/iAHJx9qizz0w9dU=
github.com/scroll-tech/da-codec v0.1.3-0.20250519114140-bfa7133d4ad1/go.mod h1:yhTS9OVC0xQGhg7DN5iV5KZJvnSIlFWAxDdp+6jxQtY=
github.com/scroll-tech/go-ethereum v1.10.14-0.20250610090337-00c5c2bd3e65 h1:idsnkl5rwVr7eNUB0HxdkKI0L3zbTm8XSGwMTB5ndws=
github.com/scroll-tech/go-ethereum v1.10.14-0.20250610090337-00c5c2bd3e65/go.mod h1:756YMENiSfx/5pCwKq3+uSTWjXuHTbiCB+TirJjsQT8=
github.com/scroll-tech/go-ethereum v1.10.14-0.20250305151038-478940e79601 h1:NEsjCG6uSvLRBlsP3+x6PL1kM+Ojs3g8UGotIPgJSz8=
github.com/scroll-tech/go-ethereum v1.10.14-0.20250305151038-478940e79601/go.mod h1:OblWe1+QrZwdpwO0j/LY3BSGuKT3YPUFBDQQgvvfStQ=
github.com/scroll-tech/zktrie v0.8.4 h1:UagmnZ4Z3ITCk+aUq9NQZJNAwnWl4gSxsLb2Nl7IgRE=
github.com/scroll-tech/zktrie v0.8.4/go.mod h1:XvNo7vAk8yxNyTjBDj5WIiFzYW4bx/gJ78+NK6Zn6Uk=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=

View File

@@ -2,7 +2,6 @@ package blob_uploader
import (
"context"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
@@ -68,7 +67,7 @@ func (b *BlobUploader) UploadBlobToS3() {
}
// get un-uploaded batches from database in ascending order by their index.
dbBatch, err := b.GetFirstUnuploadedBatchByPlatform(b.ctx, b.cfg.StartBatch, types.BlobStoragePlatformS3)
dbBatch, err := b.batchOrm.GetFirstUnuploadedBatchByPlatform(b.ctx, b.cfg.StartBatch, types.BlobStoragePlatformS3)
if err != nil {
log.Error("Failed to fetch unuploaded batch", "err", err)
return
@@ -86,7 +85,7 @@ func (b *BlobUploader) UploadBlobToS3() {
if err != nil {
log.Error("failed to construct constructBlobCodec payload ", "codecVersion", codecVersion, "batch index", dbBatch.Index, "err", err)
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr)
}
return
@@ -98,7 +97,7 @@ func (b *BlobUploader) UploadBlobToS3() {
log.Error("failed to calculate versioned blob hash", "batch index", dbBatch.Index, "err", err)
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
// update status to failed
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr)
}
return
@@ -111,14 +110,14 @@ func (b *BlobUploader) UploadBlobToS3() {
log.Error("failed to upload blob data to AWS S3", "batch index", dbBatch.Index, "versioned blob hash", key, "err", err)
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
// update status to failed
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr)
}
return
}
// update status to uploaded
if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil {
if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil {
log.Error("failed to update blob upload status to uploaded", "batch index", dbBatch.Index, "err", err)
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
return
@@ -196,56 +195,3 @@ func (b *BlobUploader) constructBlobCodec(dbBatch *orm.Batch) (*kzg4844.Blob, er
return daBatch.Blob(), nil
}
// GetFirstUnuploadedBatchByPlatform retrieves the first batch that either hasn't been uploaded to corresponding blob storage service
// The batch must have a commit_tx_hash (committed).
func (b *BlobUploader) GetFirstUnuploadedBatchByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (*orm.Batch, error) {
batchIndex, err := b.blobUploadOrm.GetNextBatchIndexToUploadByPlatform(ctx, startBatch, platform)
if err != nil {
return nil, err
}
var batch *orm.Batch
for {
var err error
batch, err = b.batchOrm.GetBatchByIndex(ctx, batchIndex)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
log.Debug("got batch not proposed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
return nil, nil
}
return nil, err
}
// to check if the parent batch uploaded
// if no, there is a batch revert happened, we need to fallback to upload previous batch
// skip the check if the parent batch is genesis batch
if batchIndex <= 1 || batchIndex == startBatch {
break
}
fields := map[string]interface{}{
"batch_index = ?": batchIndex - 1,
"batch_hash = ?": batch.ParentBatchHash,
"platform = ?": platform,
"status = ?": types.BlobUploadStatusUploaded,
}
blobUpload, err := b.blobUploadOrm.GetBlobUploads(ctx, fields, nil, 1)
if err != nil {
return nil, err
}
if len(blobUpload) == 0 {
batchIndex--
continue
}
break
}
if len(batch.CommitTxHash) == 0 {
log.Debug("got batch not committed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
return nil, nil
}
return batch, nil
}

View File

@@ -263,6 +263,44 @@ func (o *Batch) GetBatchByIndex(ctx context.Context, index uint64) (*Batch, erro
return &batch, nil
}
// GetFirstUnuploadedBatchByPlatform retrieves the first batch that either hasn't been uploaded to corresponding blob storage service
// The batch must have a commit_tx_hash (committed).
func (o *Batch) GetFirstUnuploadedBatchByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (*Batch, error) {
db := o.db.WithContext(ctx)
db = db.Model(&BlobUpload{})
db = db.Where("platform = ? AND status = ?", platform, types.BlobUploadStatusUploaded)
db = db.Order("batch_index DESC")
db = db.Limit(1)
var blobUpload BlobUpload
var batchIndex uint64
if err := db.First(&blobUpload).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
batchIndex = startBatch
} else {
return nil, fmt.Errorf("Batch.GetFirstUnuploadedBatchByPlatform error: %w", err)
}
} else {
batchIndex = blobUpload.BatchIndex + 1
}
batch, err := o.GetBatchByIndex(ctx, batchIndex)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
log.Debug("got batch not proposed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
return nil, nil
}
return nil, fmt.Errorf("Batch.GetFirstUnuploadedBatchByPlatform error: %w", err)
}
if len(batch.CommitTxHash) == 0 {
log.Debug("got batch not committed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
return nil, nil
}
return batch, nil
}
// InsertBatch inserts a new batch into the database.
func (o *Batch) InsertBatch(ctx context.Context, batch *encoding.Batch, codecVersion encoding.CodecVersion, metrics rutils.BatchMetrics, dbTX ...*gorm.DB) (*Batch, error) {
if batch == nil {

View File

@@ -2,11 +2,11 @@ package orm
import (
"context"
"errors"
"fmt"
"time"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"scroll-tech/common/types"
)
@@ -16,9 +16,8 @@ type BlobUpload struct {
db *gorm.DB `gorm:"-"`
// blob upload
BatchIndex uint64 `json:"batch_index" gorm:"column:batch_index"`
BatchHash string `json:"batch_hash" gorm:"column:batch_hash"`
Platform int16 `json:"platform" gorm:"column:platform"`
BatchIndex uint64 `json:"batch_index" gorm:"column:batch_index;primaryKey"`
Platform int16 `json:"platform" gorm:"column:platform;primaryKey"`
Status int16 `json:"status" gorm:"column:status"`
// metadata
@@ -37,87 +36,23 @@ func (*BlobUpload) TableName() string {
return "blob_upload"
}
// GetNextBatchIndexToUploadByPlatform retrieves the next batch index that hasn't been uploaded to corresponding blob storage service
func (o *BlobUpload) GetNextBatchIndexToUploadByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (uint64, error) {
db := o.db.WithContext(ctx)
db = db.Model(&BlobUpload{})
db = db.Where("platform = ? AND status = ?", platform, types.BlobUploadStatusUploaded)
db = db.Order("batch_index DESC")
db = db.Limit(1)
var blobUpload BlobUpload
var batchIndex uint64
if err := db.First(&blobUpload).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
batchIndex = startBatch
} else {
return 0, fmt.Errorf("BlobUpload.GetNextBatchIndexToUploadByPlatform error: %w", err)
}
} else {
batchIndex = blobUpload.BatchIndex + 1
}
return batchIndex, nil
}
// GetBlobUpload retrieves the selected blob uploads from the database.
func (o *BlobUpload) GetBlobUploads(ctx context.Context, fields map[string]interface{}, orderByList []string, limit int) ([]*BlobUpload, error) {
db := o.db.WithContext(ctx)
db = db.Model(&BlobUpload{})
for key, value := range fields {
db = db.Where(key, value)
}
for _, orderBy := range orderByList {
db = db.Order(orderBy)
}
if limit > 0 {
db = db.Limit(limit)
}
db = db.Order("batch_index ASC")
var blobUploads []*BlobUpload
if err := db.Find(&blobUploads).Error; err != nil {
return nil, fmt.Errorf("BlobUpload.GetBlobUploads error: %w", err)
}
return blobUploads, nil
}
// InsertOrUpdateBlobUpload inserts a new blob upload record or updates the existing one.
func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex uint64, batchHash string, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error {
func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex uint64, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
var existing BlobUpload
err := db.Where("batch_index = ? AND batch_hash = ? AND platform = ? AND deleted_at IS NULL",
batchIndex, batchHash, int16(platform),
).First(&existing).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
newRecord := BlobUpload{
BatchIndex: batchIndex,
BatchHash: batchHash,
Platform: int16(platform),
Status: int16(status),
}
if err := db.Create(&newRecord).Error; err != nil {
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload insert error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform)
}
return nil
} else if err != nil {
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload query error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform)
blobUpload := &BlobUpload{
BatchIndex: batchIndex,
Platform: int16(platform),
Status: int16(status),
}
if err := db.Model(&existing).Update("status", int16(status)).Error; err != nil {
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload update error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform)
if err := db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "batch_index"}, {Name: "platform"}},
DoUpdates: clause.AssignmentColumns([]string{"status"}),
}).Create(blobUpload).Error; err != nil {
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload error: %w, batch index: %v, platform: %v", err, batchIndex, platform)
}
return nil
}