feat(blob-uploader): blob_upload table add batch hash (#1677)

This commit is contained in:
Morty
2025-06-11 18:21:14 +08:00
committed by GitHub
parent 4ee459a602
commit 5d6b5a89f4
4 changed files with 142 additions and 65 deletions

View File

@@ -2,6 +2,7 @@ package blob_uploader
import (
"context"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
@@ -67,7 +68,7 @@ func (b *BlobUploader) UploadBlobToS3() {
}
// get un-uploaded batches from database in ascending order by their index.
dbBatch, err := b.batchOrm.GetFirstUnuploadedBatchByPlatform(b.ctx, b.cfg.StartBatch, types.BlobStoragePlatformS3)
dbBatch, err := b.GetFirstUnuploadedBatchByPlatform(b.ctx, b.cfg.StartBatch, types.BlobStoragePlatformS3)
if err != nil {
log.Error("Failed to fetch unuploaded batch", "err", err)
return
@@ -85,7 +86,7 @@ func (b *BlobUploader) UploadBlobToS3() {
if err != nil {
log.Error("failed to construct constructBlobCodec payload ", "codecVersion", codecVersion, "batch index", dbBatch.Index, "err", err)
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr)
}
return
@@ -97,7 +98,7 @@ func (b *BlobUploader) UploadBlobToS3() {
log.Error("failed to calculate versioned blob hash", "batch index", dbBatch.Index, "err", err)
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
// update status to failed
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr)
}
return
@@ -110,14 +111,14 @@ func (b *BlobUploader) UploadBlobToS3() {
log.Error("failed to upload blob data to AWS S3", "batch index", dbBatch.Index, "versioned blob hash", key, "err", err)
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
// update status to failed
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr)
}
return
}
// update status to uploaded
if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil {
if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil {
log.Error("failed to update blob upload status to uploaded", "batch index", dbBatch.Index, "err", err)
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
return
@@ -195,3 +196,56 @@ func (b *BlobUploader) constructBlobCodec(dbBatch *orm.Batch) (*kzg4844.Blob, er
return daBatch.Blob(), nil
}
// GetFirstUnuploadedBatchByPlatform retrieves the first batch that either hasn't been uploaded to corresponding blob storage service
// The batch must have a commit_tx_hash (committed).
func (b *BlobUploader) GetFirstUnuploadedBatchByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (*orm.Batch, error) {
batchIndex, err := b.blobUploadOrm.GetNextBatchIndexToUploadByPlatform(ctx, startBatch, platform)
if err != nil {
return nil, err
}
var batch *orm.Batch
for {
var err error
batch, err = b.batchOrm.GetBatchByIndex(ctx, batchIndex)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
log.Debug("got batch not proposed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
return nil, nil
}
return nil, err
}
// to check if the parent batch uploaded
// if no, there is a batch revert happened, we need to fallback to upload previous batch
// skip the check if the parent batch is genesis batch
if batchIndex <= 1 || batchIndex == startBatch {
break
}
fields := map[string]interface{}{
"batch_index = ?": batchIndex - 1,
"batch_hash = ?": batch.ParentBatchHash,
"platform = ?": platform,
"status = ?": types.BlobUploadStatusUploaded,
}
blobUpload, err := b.blobUploadOrm.GetBlobUploads(ctx, fields, nil, 1)
if err != nil {
return nil, err
}
if len(blobUpload) == 0 {
batchIndex--
continue
}
break
}
if len(batch.CommitTxHash) == 0 {
log.Debug("got batch not committed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
return nil, nil
}
return batch, nil
}

View File

@@ -263,44 +263,6 @@ func (o *Batch) GetBatchByIndex(ctx context.Context, index uint64) (*Batch, erro
return &batch, nil
}
// GetFirstUnuploadedBatchByPlatform retrieves the first batch that either hasn't been uploaded to corresponding blob storage service
// The batch must have a commit_tx_hash (committed).
func (o *Batch) GetFirstUnuploadedBatchByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (*Batch, error) {
db := o.db.WithContext(ctx)
db = db.Model(&BlobUpload{})
db = db.Where("platform = ? AND status = ?", platform, types.BlobUploadStatusUploaded)
db = db.Order("batch_index DESC")
db = db.Limit(1)
var blobUpload BlobUpload
var batchIndex uint64
if err := db.First(&blobUpload).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
batchIndex = startBatch
} else {
return nil, fmt.Errorf("Batch.GetFirstUnuploadedBatchByPlatform error: %w", err)
}
} else {
batchIndex = blobUpload.BatchIndex + 1
}
batch, err := o.GetBatchByIndex(ctx, batchIndex)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
log.Debug("got batch not proposed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
return nil, nil
}
return nil, fmt.Errorf("Batch.GetFirstUnuploadedBatchByPlatform error: %w", err)
}
if len(batch.CommitTxHash) == 0 {
log.Debug("got batch not committed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
return nil, nil
}
return batch, nil
}
// InsertBatch inserts a new batch into the database.
func (o *Batch) InsertBatch(ctx context.Context, batch *encoding.Batch, codecVersion encoding.CodecVersion, metrics rutils.BatchMetrics, dbTX ...*gorm.DB) (*Batch, error) {
if batch == nil {

View File

@@ -2,11 +2,11 @@ package orm
import (
"context"
"errors"
"fmt"
"time"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"scroll-tech/common/types"
)
@@ -16,8 +16,9 @@ type BlobUpload struct {
db *gorm.DB `gorm:"-"`
// blob upload
BatchIndex uint64 `json:"batch_index" gorm:"column:batch_index;primaryKey"`
Platform int16 `json:"platform" gorm:"column:platform;primaryKey"`
BatchIndex uint64 `json:"batch_index" gorm:"column:batch_index"`
BatchHash string `json:"batch_hash" gorm:"column:batch_hash"`
Platform int16 `json:"platform" gorm:"column:platform"`
Status int16 `json:"status" gorm:"column:status"`
// metadata
@@ -36,23 +37,87 @@ func (*BlobUpload) TableName() string {
return "blob_upload"
}
// GetNextBatchIndexToUploadByPlatform retrieves the next batch index that hasn't been uploaded to corresponding blob storage service
func (o *BlobUpload) GetNextBatchIndexToUploadByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (uint64, error) {
db := o.db.WithContext(ctx)
db = db.Model(&BlobUpload{})
db = db.Where("platform = ? AND status = ?", platform, types.BlobUploadStatusUploaded)
db = db.Order("batch_index DESC")
db = db.Limit(1)
var blobUpload BlobUpload
var batchIndex uint64
if err := db.First(&blobUpload).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
batchIndex = startBatch
} else {
return 0, fmt.Errorf("BlobUpload.GetNextBatchIndexToUploadByPlatform error: %w", err)
}
} else {
batchIndex = blobUpload.BatchIndex + 1
}
return batchIndex, nil
}
// GetBlobUpload retrieves the selected blob uploads from the database.
func (o *BlobUpload) GetBlobUploads(ctx context.Context, fields map[string]interface{}, orderByList []string, limit int) ([]*BlobUpload, error) {
db := o.db.WithContext(ctx)
db = db.Model(&BlobUpload{})
for key, value := range fields {
db = db.Where(key, value)
}
for _, orderBy := range orderByList {
db = db.Order(orderBy)
}
if limit > 0 {
db = db.Limit(limit)
}
db = db.Order("batch_index ASC")
var blobUploads []*BlobUpload
if err := db.Find(&blobUploads).Error; err != nil {
return nil, fmt.Errorf("BlobUpload.GetBlobUploads error: %w", err)
}
return blobUploads, nil
}
// InsertOrUpdateBlobUpload inserts a new blob upload record or updates the existing one.
func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex uint64, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error {
func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex uint64, batchHash string, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
blobUpload := &BlobUpload{
BatchIndex: batchIndex,
Platform: int16(platform),
Status: int16(status),
var existing BlobUpload
err := db.Where("batch_index = ? AND batch_hash = ? AND platform = ? AND deleted_at IS NULL",
batchIndex, batchHash, int16(platform),
).First(&existing).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
newRecord := BlobUpload{
BatchIndex: batchIndex,
BatchHash: batchHash,
Platform: int16(platform),
Status: int16(status),
}
if err := db.Create(&newRecord).Error; err != nil {
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload insert error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform)
}
return nil
} else if err != nil {
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload query error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform)
}
if err := db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "batch_index"}, {Name: "platform"}},
DoUpdates: clause.AssignmentColumns([]string{"status"}),
}).Create(blobUpload).Error; err != nil {
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload error: %w, batch index: %v, platform: %v", err, batchIndex, platform)
if err := db.Model(&existing).Update("status", int16(status)).Error; err != nil {
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload update error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform)
}
return nil
}