This commit is contained in:
Morty
2025-06-03 01:22:13 +08:00
parent 4bf419693e
commit 385d97cb49
8 changed files with 114 additions and 123 deletions

View File

@@ -2,11 +2,14 @@
-- +goose StatementBegin
CREATE TABLE blob_upload (
batch_index BIGINT NOT NULL,
batch_index BIGINT NOT NULL,
platform TEXT NOT NULL,
status SMALLINT NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT now(),
platform SMALLINT NOT NULL,
status SMALLINT NOT NULL,
-- metadata
updated_at TIMESTAMP NOT NULL DEFAULT now(),
deleted_at TIMESTAMP(0) DEFAULT NULL
PRIMARY KEY (batch_index, platform),
FOREIGN KEY (batch_index) REFERENCES batch(index)
@@ -14,6 +17,18 @@ CREATE TABLE blob_upload (
COMMENT ON COLUMN blob_upload.status IS 'undefined, pending, uploaded, failed';
CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index ON blob_upload(batch_index) WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_blob_upload_platform ON blob_upload(platform) WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_blob_upload_status ON blob_upload(status) WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_blob_upload_updated_at ON blob_upload(updated_at) WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_blob_upload_status_platform ON blob_upload(status, platform) WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index_status_platform ON blob_upload(batch_index, status, platform) WHERE deleted_at IS NULL;
-- +goose StatementEnd
-- +goose Down

View File

@@ -1,20 +0,0 @@
-- +goose Up
-- +goose StatementBegin
-- Add index on status for faster filtering by status
CREATE INDEX idx_blob_upload_status ON blob_upload(status);
-- Add index on updated_at for faster sorting and filtering by time
CREATE INDEX idx_blob_upload_updated_at ON blob_upload(updated_at);
-- Add index on (batch_index, status) for faster filtering by both fields
CREATE INDEX idx_blob_upload_batch_index_status ON blob_upload(batch_index, status);
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
DROP INDEX IF EXISTS idx_blob_upload_status;
DROP INDEX IF EXISTS idx_blob_upload_updated_at;
DROP INDEX IF EXISTS idx_blob_upload_batch_index_status;
-- +goose StatementEnd

View File

@@ -11,6 +11,7 @@ mock_abi:
rollup_bins: ## Builds the Rollup bins.
go build -o $(PWD)/build/bin/gas_oracle ./cmd/gas_oracle/
go build -o $(PWD)/build/bin/rollup_relayer ./cmd/rollup_relayer/
go build -o $(PWD)/build/bin/blob_uploader ./cmd/blob_uploader/
gas_oracle: ## Builds the gas_oracle bin
go build -o $(PWD)/build/bin/gas_oracle ./cmd/gas_oracle/
@@ -18,6 +19,9 @@ gas_oracle: ## Builds the gas_oracle bin
rollup_relayer: ## Builds the rollup_relayer bin
go build -o $(PWD)/build/bin/rollup_relayer ./cmd/rollup_relayer/
blob_uploader: ## Builds the blob_uploader bin
go build -o $(PWD)/build/bin/blob_uploader ./cmd/blob_uploader/
test:
go test -v -race -coverprofile=coverage.txt -covermode=atomic -p 1 $(PWD)/...

View File

@@ -9,7 +9,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/scroll-tech/da-codec/encoding"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
"github.com/urfave/cli/v2"
@@ -19,9 +18,7 @@ import (
"scroll-tech/common/version"
"scroll-tech/rollup/internal/config"
"scroll-tech/rollup/internal/controller/relayer"
"scroll-tech/rollup/internal/controller/watcher"
rutils "scroll-tech/rollup/internal/utils"
"scroll-tech/rollup/internal/controller/blob_uploader"
)
var app *cli.App
@@ -67,36 +64,12 @@ func action(ctx *cli.Context) error {
registry := prometheus.DefaultRegisterer
observability.Server(ctx, db)
// Init l2geth connection
l2client, err := ethclient.Dial(cfg.L2Config.Endpoint)
if err != nil {
log.Crit("failed to connect l2 geth", "config file", cfgFile, "error", err)
}
genesisPath := ctx.String(utils.Genesis.Name)
genesis, err := utils.ReadGenesis(genesisPath)
if err != nil {
log.Crit("failed to read genesis", "genesis file", genesisPath, "error", err)
}
// sanity check config
if cfg.L2Config.RelayerConfig.BatchSubmission == nil {
log.Crit("cfg.L2Config.RelayerConfig.BatchSubmission must not be nil")
}
if cfg.L2Config.RelayerConfig.BatchSubmission.MinBatches < 1 {
log.Crit("cfg.L2Config.RelayerConfig.SenderConfig.BatchSubmission.MinBatches must be at least 1")
}
if cfg.L2Config.RelayerConfig.BatchSubmission.MaxBatches < 1 {
log.Crit("cfg.L2Config.RelayerConfig.SenderConfig.BatchSubmission.MaxBatches must be at least 1")
}
if cfg.L2Config.BatchProposerConfig.MaxChunksPerBatch <= 0 {
log.Crit("cfg.L2Config.BatchProposerConfig.MaxChunksPerBatch must be greater than 0")
}
if cfg.L2Config.ChunkProposerConfig.MaxL2GasPerChunk <= 0 {
log.Crit("cfg.L2Config.ChunkProposerConfig.MaxL2GasPerChunk must be greater than 0")
if cfg.L2Config.BlobUploaderConfig == nil {
log.Crit("cfg.L2Config.BlobUploaderConfig must not be nil")
}
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, genesis.Config, relayer.ServiceTypeL2RollupRelayer, registry)
blobUploader, err := blob_uploader.NewBlobUploader(ctx.Context, db, cfg.L2Config.BlobUploaderConfig, registry)
if err != nil {
log.Crit("failed to create l2 relayer", "config file", cfgFile, "error", err)
}
@@ -106,31 +79,7 @@ func action(ctx *cli.Context) error {
log.Crit("min codec version must be greater than or equal to CodecV7", "minCodecVersion", minCodecVersion)
}
chunkProposer := watcher.NewChunkProposer(subCtx, cfg.L2Config.ChunkProposerConfig, minCodecVersion, genesis.Config, db, registry)
batchProposer := watcher.NewBatchProposer(subCtx, cfg.L2Config.BatchProposerConfig, minCodecVersion, genesis.Config, db, registry)
bundleProposer := watcher.NewBundleProposer(subCtx, cfg.L2Config.BundleProposerConfig, minCodecVersion, genesis.Config, db, registry)
l2watcher := watcher.NewL2WatcherClient(subCtx, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, genesis.Config, db, registry)
// Watcher loop to fetch missing blocks
go utils.LoopWithContext(subCtx, 2*time.Second, func(ctx context.Context) {
number, loopErr := rutils.GetLatestConfirmedBlockNumber(ctx, l2client, cfg.L2Config.Confirmations)
if loopErr != nil {
log.Error("failed to get block number", "err", loopErr)
return
}
l2watcher.TryFetchRunningMissingBlocks(number)
})
go utils.Loop(subCtx, time.Duration(cfg.L2Config.ChunkProposerConfig.ProposeIntervalMilliseconds)*time.Millisecond, chunkProposer.TryProposeChunk)
go utils.Loop(subCtx, time.Duration(cfg.L2Config.BatchProposerConfig.ProposeIntervalMilliseconds)*time.Millisecond, batchProposer.TryProposeBatch)
go utils.Loop(subCtx, 10*time.Second, bundleProposer.TryProposeBundle)
go utils.Loop(subCtx, 2*time.Second, l2relayer.ProcessPendingBatches)
go utils.Loop(subCtx, 15*time.Second, l2relayer.ProcessPendingBundles)
go utils.Loop(subCtx, 2*time.Second, blobUploader.UploadBlobToS3)
// Finish start all blob-uploader functions.
log.Info("Start blob-uploader successfully", "version", version.Version)
@@ -145,7 +94,7 @@ func action(ctx *cli.Context) error {
return nil
}
// Run rollup relayer cmd instance.
// Run blob uploader cmd instance.
func Run() {
if err := app.Run(os.Args); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)

View File

@@ -104,6 +104,15 @@
"bundle_proposer_config": {
"max_batch_num_per_bundle": 20,
"bundle_timeout_sec": 36000
},
"blob_uploader_config": {
"start_batch": 0,
"aws_s3_config": {
"bucket": "blob-data",
"region": "us-west-2",
"access_key": "ACCESSKEY",
"secret_key": "SECRETKEY"
}
}
},
"db_config": {

View File

@@ -51,7 +51,8 @@ type BundleProposerConfig struct {
// BlobUploaderConfig loads blob_uploader configuration items.
type BlobUploaderConfig struct {
AWSS3Config *AWSS3Config `json:"aws_s3_config"`
StartBatch uint64 `json:"start_batch"`
AWSS3Config *AWSS3Config `json:"aws_s3_config"`
}
// AWSS3Config loads s3_uploader configuration items.
@@ -60,4 +61,4 @@ type AWSS3Config struct {
Region string `json:"region"`
AccessKey string `json:"access_key"`
SecretKey string `json:"secret_key"`
}
}

View File

@@ -25,9 +25,11 @@ type BlobUploader struct {
cfg *config.BlobUploaderConfig
s3Uploader *S3Uploader
batchOrm *orm.Batch
chunkOrm *orm.Chunk
l2BlockOrm *orm.L2Block
blobUploadOrm *orm.BlobUpload
batchOrm *orm.Batch
chunkOrm *orm.Chunk
l2BlockOrm *orm.L2Block
metrics *blobUploaderMetrics
}
@@ -44,12 +46,13 @@ func NewBlobUploader(ctx context.Context, db *gorm.DB, cfg *config.BlobUploaderC
}
blobUploader := &BlobUploader{
ctx: ctx,
cfg: cfg,
s3Uploader: s3Uploader,
batchOrm: orm.NewBatch(db),
chunkOrm: orm.NewChunk(db),
l2BlockOrm: orm.NewL2Block(db),
ctx: ctx,
cfg: cfg,
s3Uploader: s3Uploader,
batchOrm: orm.NewBatch(db),
chunkOrm: orm.NewChunk(db),
l2BlockOrm: orm.NewL2Block(db),
blobUploadOrm: orm.NewBlobUpload(db),
}
blobUploader.metrics = initblobUploaderMetrics(reg)
@@ -85,10 +88,10 @@ func (b *BlobUploader) UploadBlobToS3() {
return
}
// calculate versioned blob hash
// calculate versioned blob hash
versionedBlobHash, err := utils.CalculateVersionedBlobHash(*blob)
if err != nil {
log.Error("failed to versioned blob hash", "batch index", dbBatch.Index, "err", err)
log.Error("failed to calculate versioned blob hash", "batch index", dbBatch.Index, "err", err)
return
}
@@ -97,11 +100,21 @@ func (b *BlobUploader) UploadBlobToS3() {
err = b.s3Uploader.UploadData(b.ctx, blob[:], key)
if err != nil {
log.Error("failed to upload blob data to AWS S3", "batch index", dbBatch.Index, "versioned blob hash", key, "err", err)
// Update status to failed
if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); err != nil {
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", err)
}
return
}
// update db status
// Update status to uploaded
if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil {
log.Error("failed to update blob upload status to uploaded", "batch index", dbBatch.Index, "err", err)
return
}
b.metrics.rollupBlobUploaderUploadToS3Total.Inc()
log.Info("Successfully uploaded blob to S3", "batch index", dbBatch.Index, "versioned blob hash", key)
}
func (b *BlobUploader) constructBlobCodecV7(dbBatch *orm.Batch) (*kzg4844.Blob, error) {

View File

@@ -6,6 +6,7 @@ import (
"time"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"scroll-tech/common/types"
)
@@ -15,7 +16,7 @@ type BlobUpload struct {
db *gorm.DB `gorm:"-"`
BatchIndex uint64 `json:"batch_index" gorm:"column:batch_index;primaryKey"`
Platform string `json:"platform" gorm:"column:platform;primaryKey"`
Platform int16 `json:"platform" gorm:"column:platform;primaryKey"`
Status int16 `json:"status" gorm:"column:status"`
UpdatedAt time.Time `json:"updated_at" gorm:"column:updated_at"`
}
@@ -31,15 +32,19 @@ func (*BlobUpload) TableName() string {
}
// InsertBlobUpload inserts a new blob upload record into the database.
func (o *BlobUpload) InsertBlobUpload(ctx context.Context, batchIndex uint64, platform string, status types.BlobUploadStatus) error {
func (o *BlobUpload) InsertBlobUpload(ctx context.Context, batchIndex uint64, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error {
blobUpload := &BlobUpload{
BatchIndex: batchIndex,
Platform: platform,
Platform: int16(platform),
Status: int16(status),
UpdatedAt: time.Now(),
}
db := o.db.WithContext(ctx)
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
if err := db.Create(blobUpload).Error; err != nil {
return fmt.Errorf("BlobUpload.InsertBlobUpload error: %w, batch index: %v, platform: %v", err, batchIndex, platform)
}
@@ -47,8 +52,12 @@ func (o *BlobUpload) InsertBlobUpload(ctx context.Context, batchIndex uint64, pl
}
// UpdateBlobUploadStatus updates the status of a blob upload record.
func (o *BlobUpload) UpdateBlobUploadStatus(ctx context.Context, batchIndex uint64, platform string, status types.BlobUploadStatus) error {
db := o.db.WithContext(ctx)
func (o *BlobUpload) UpdateBlobUploadStatus(ctx context.Context, batchIndex uint64, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&BlobUpload{})
db = db.Where("batch_index = ? AND platform = ?", batchIndex, platform)
@@ -63,8 +72,30 @@ func (o *BlobUpload) UpdateBlobUploadStatus(ctx context.Context, batchIndex uint
return nil
}
// InsertOrUpdateBlobUpload inserts a new blob upload record or updates the existing one.
func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex uint64, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
blobUpload := &BlobUpload{
BatchIndex: batchIndex,
Platform: int16(platform),
Status: int16(status),
UpdatedAt: time.Now(),
}
if err := db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "batch_index"}, {Name: "platform"}},
DoUpdates: clause.AssignmentColumns([]string{"status", "updated_at"}),
}).Create(blobUpload).Error; err != nil {
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload error: %w, batch index: %v, platform: %v", err, batchIndex, platform)
}
return nil
}
// GetBlobUploadByBatchIndexAndPlatform retrieves a blob upload record by batch index and platform.
func (o *BlobUpload) GetBlobUploadByBatchIndexAndPlatform(ctx context.Context, batchIndex uint64, platform string) (*BlobUpload, error) {
func (o *BlobUpload) GetBlobUploadByBatchIndexAndPlatform(ctx context.Context, batchIndex uint64, platform types.BlobStoragePlatform) (*BlobUpload, error) {
db := o.db.WithContext(ctx)
db = db.Model(&BlobUpload{})
db = db.Where("batch_index = ? AND platform = ?", batchIndex, platform)
@@ -79,42 +110,31 @@ func (o *BlobUpload) GetBlobUploadByBatchIndexAndPlatform(ctx context.Context, b
return &blobUpload, nil
}
// GetPendingBlobUploads retrieves all pending blob upload records.
func (o *BlobUpload) GetPendingBlobUploads(ctx context.Context) ([]*BlobUpload, error) {
// GetPendingBlobUploadsByPlatform retrieves all pending blob upload records by platform.
func (o *BlobUpload) GetPendingBlobUploadsByPlatform(ctx context.Context, platform types.BlobStoragePlatform) ([]*BlobUpload, error) {
db := o.db.WithContext(ctx)
db = db.Model(&BlobUpload{})
db = db.Where("status = ?", types.BlobUploadStatusPending)
db = db.Where("status = ? AND platform = ?", types.BlobUploadStatusPending, platform)
db = db.Order("batch_index ASC")
var blobUploads []*BlobUpload
if err := db.Find(&blobUploads).Error; err != nil {
return nil, fmt.Errorf("BlobUpload.GetPendingBlobUploads error: %w", err)
return nil, fmt.Errorf("BlobUpload.GetPendingBlobUploadsByPlatform error: %w", err)
}
return blobUploads, nil
}
// GetFailedBlobUploads retrieves all failed blob upload records.
func (o *BlobUpload) GetFailedBlobUploads(ctx context.Context) ([]*BlobUpload, error) {
// GetFailedBlobUploadsByPlatform retrieves all failed blob upload records by platform.
func (o *BlobUpload) GetFailedBlobUploadsByPlatform(ctx context.Context, platform types.BlobStoragePlatform) ([]*BlobUpload, error) {
db := o.db.WithContext(ctx)
db = db.Model(&BlobUpload{})
db = db.Where("status = ?", types.BlobUploadStatusFailed)
db = db.Where("status = ? AND platform = ?", types.BlobUploadStatusFailed, platform)
db = db.Order("batch_index ASC")
var blobUploads []*BlobUpload
if err := db.Find(&blobUploads).Error; err != nil {
return nil, fmt.Errorf("BlobUpload.GetFailedBlobUploads error: %w", err)
return nil, fmt.Errorf("BlobUpload.GetFailedBlobUploadsByPlatform error: %w", err)
}
return blobUploads, nil
}
// DeleteBlobUpload deletes a blob upload record.
func (o *BlobUpload) DeleteBlobUpload(ctx context.Context, batchIndex uint64, platform string) error {
db := o.db.WithContext(ctx)
db = db.Model(&BlobUpload{})
db = db.Where("batch_index = ? AND platform = ?", batchIndex, platform)
if err := db.Delete(&BlobUpload{}).Error; err != nil {
return fmt.Errorf("BlobUpload.DeleteBlobUpload error: %w, batch index: %v, platform: %v", err, batchIndex, platform)
}
return nil
}