Compare commits

..

2 Commits

Author SHA1 Message Date
georgehao
1dfca3b7c0 feat(coordinator): prover task record unique (#845)
Co-authored-by: georgehao <georgehao@users.noreply.github.com>
Co-authored-by: Péter Garamvölgyi <peter@scroll.io>
Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
2023-08-28 10:51:01 +08:00
colin
826e847b5a fix(rollup-relayer): determine first block height and first chunk index (#861)
Co-authored-by: colinlyguo <colinlyguo@users.noreply.github.com>
Co-authored-by: Haichen Shen <shenhaichen@gmail.com>
Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
Co-authored-by: HAOYUatHZ <HAOYUatHZ@users.noreply.github.com>
2023-08-26 15:04:52 +08:00
29 changed files with 260 additions and 193 deletions

View File

@@ -148,7 +148,12 @@ func (p *BatchProposer) updateBatchInfoInDB(dbChunks []*orm.Chunk) error {
}
func (p *BatchProposer) proposeBatchChunks() ([]*orm.Chunk, error) {
dbChunks, err := p.chunkOrm.GetUnbatchedChunks(p.ctx, int(p.maxChunkNumPerBatch)+1)
unbatchedChunkIndex, err := p.batchOrm.GetFirstUnbatchedChunkIndex(p.ctx)
if err != nil {
return nil, err
}
dbChunks, err := p.chunkOrm.GetChunksGEIndex(p.ctx, unbatchedChunkIndex, int(p.maxChunkNumPerBatch)+1)
if err != nil {
return nil, err
}

View File

@@ -39,11 +39,6 @@ func testBatchProposer(t *testing.T) {
}, db, nil)
bp.TryProposeBatch()
chunkOrm := orm.NewChunk(db)
chunks, err := chunkOrm.GetUnbatchedChunks(context.Background(), 0)
assert.NoError(t, err)
assert.Empty(t, chunks)
batchOrm := orm.NewBatch(db)
// get all batches.
batches, err := batchOrm.GetBatches(context.Background(), map[string]interface{}{}, []string{}, 0)
@@ -54,6 +49,7 @@ func testBatchProposer(t *testing.T) {
assert.Equal(t, types.RollupPending, types.RollupStatus(batches[0].RollupStatus))
assert.Equal(t, types.ProvingTaskUnassigned, types.ProvingStatus(batches[0].ProvingStatus))
chunkOrm := orm.NewChunk(db)
dbChunks, err := chunkOrm.GetChunksInRange(context.Background(), 0, 0)
assert.NoError(t, err)
assert.Len(t, batches, 1)

View File

@@ -186,7 +186,12 @@ func (p *ChunkProposer) updateChunkInfoInDB(chunk *types.Chunk) error {
}
func (p *ChunkProposer) proposeChunk() (*types.Chunk, error) {
blocks, err := p.l2BlockOrm.GetUnchunkedBlocks(p.ctx, maxNumBlockPerChunk)
unchunkedBlockHeight, err := p.chunkOrm.GetUnchunkedBlockHeight(p.ctx)
if err != nil {
return nil, err
}
blocks, err := p.l2BlockOrm.GetL2WrappedBlocksGEHeight(p.ctx, unchunkedBlockHeight, maxNumBlockPerChunk)
if err != nil {
return nil, err
}

View File

@@ -38,7 +38,7 @@ func testChunkProposer(t *testing.T) {
assert.NoError(t, err)
chunkOrm := orm.NewChunk(db)
chunks, err := chunkOrm.GetUnbatchedChunks(context.Background(), 0)
chunks, err := chunkOrm.GetChunksGEIndex(context.Background(), 0, 0)
assert.NoError(t, err)
assert.Len(t, chunks, 1)
assert.Equal(t, expectedHash.Hex(), chunks[0].Hash)
@@ -62,7 +62,7 @@ func testChunkProposerRowConsumption(t *testing.T) {
cp.TryProposeChunk()
chunkOrm := orm.NewChunk(db)
chunks, err := chunkOrm.GetUnbatchedChunks(context.Background(), 0)
chunks, err := chunkOrm.GetChunksGEIndex(context.Background(), 0, 0)
assert.NoError(t, err)
assert.Len(t, chunks, 0)
}

View File

@@ -142,6 +142,21 @@ func (o *Batch) GetLatestBatch(ctx context.Context) (*Batch, error) {
return &latestBatch, nil
}
// GetFirstUnbatchedChunkIndex retrieves the first unbatched chunk index.
func (o *Batch) GetFirstUnbatchedChunkIndex(ctx context.Context) (uint64, error) {
// Get the latest batch
latestBatch, err := o.GetLatestBatch(ctx)
if err != nil {
return 0, fmt.Errorf("Chunk.GetChunkedBlockHeight error: %w", err)
}
// if parentBatch==nil then err==gorm.ErrRecordNotFound,
// which means there is not batched chunk yet, thus returns 0
if latestBatch == nil {
return 0, nil
}
return latestBatch.EndChunkIndex + 1, nil
}
// GetRollupStatusByHashList retrieves the rollup statuses for a list of batch hashes.
func (o *Batch) GetRollupStatusByHashList(ctx context.Context, hashes []string) ([]types.RollupStatus, error) {
if len(hashes) == 0 {

View File

@@ -87,24 +87,6 @@ func (o *Chunk) GetChunksInRange(ctx context.Context, startIndex uint64, endInde
return chunks, nil
}
// GetUnbatchedChunks retrieves unbatched chunks from the database.
func (o *Chunk) GetUnbatchedChunks(ctx context.Context, limit int) ([]*Chunk, error) {
db := o.db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("batch_hash IS NULL")
db = db.Order("index asc")
if limit > 0 {
db = db.Limit(limit)
}
var chunks []*Chunk
if err := db.Find(&chunks).Error; err != nil {
return nil, fmt.Errorf("Chunk.GetUnbatchedChunks error: %w", err)
}
return chunks, nil
}
// GetLatestChunk retrieves the latest chunk from the database.
func (o *Chunk) GetLatestChunk(ctx context.Context) (*Chunk, error) {
db := o.db.WithContext(ctx)
@@ -118,6 +100,40 @@ func (o *Chunk) GetLatestChunk(ctx context.Context) (*Chunk, error) {
return &latestChunk, nil
}
// GetUnchunkedBlockHeight retrieves the first unchunked block number.
func (o *Chunk) GetUnchunkedBlockHeight(ctx context.Context) (uint64, error) {
// Get the latest chunk
latestChunk, err := o.GetLatestChunk(ctx)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
// if there is no chunk, return block number 1,
// because no need to chunk genesis block number
return 1, nil
}
return 0, fmt.Errorf("Chunk.GetChunkedBlockHeight error: %w", err)
}
return latestChunk.EndBlockNumber + 1, nil
}
// GetChunksGEIndex retrieves chunks that have a chunk index greater than the or equal to the given index.
// The returned chunks are sorted in ascending order by their index.
func (o *Chunk) GetChunksGEIndex(ctx context.Context, index uint64, limit int) ([]*Chunk, error) {
db := o.db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("index >= ?", index)
db = db.Order("index ASC")
if limit > 0 {
db = db.Limit(limit)
}
var chunks []*Chunk
if err := db.Find(&chunks).Error; err != nil {
return nil, fmt.Errorf("Chunk.GetChunksGEIndex error: %w", err)
}
return chunks, nil
}
// InsertChunk inserts a new chunk into the database.
func (o *Chunk) InsertChunk(ctx context.Context, chunk *types.Chunk, dbTX ...*gorm.DB) (*Chunk, error) {
if chunk == nil || len(chunk.Blocks) == 0 {

View File

@@ -64,13 +64,14 @@ func (o *L2Block) GetL2BlocksLatestHeight(ctx context.Context) (uint64, error) {
return maxNumber, nil
}
// GetUnchunkedBlocks get the l2 blocks that have not been put into a chunk.
// GetL2WrappedBlocksGEHeight retrieves L2 blocks that have a block number greater than or equal to the given height.
// The blocks are converted into WrappedBlock format for output.
// The returned blocks are sorted in ascending order by their block number.
func (o *L2Block) GetUnchunkedBlocks(ctx context.Context, limit int) ([]*types.WrappedBlock, error) {
func (o *L2Block) GetL2WrappedBlocksGEHeight(ctx context.Context, height uint64, limit int) ([]*types.WrappedBlock, error) {
db := o.db.WithContext(ctx)
db = db.Model(&L2Block{})
db = db.Select("header, transactions, withdraw_root, row_consumption")
db = db.Where("chunk_hash IS NULL")
db = db.Where("number >= ?", height)
db = db.Order("number ASC")
if limit > 0 {
@@ -79,7 +80,7 @@ func (o *L2Block) GetUnchunkedBlocks(ctx context.Context, limit int) ([]*types.W
var l2Blocks []L2Block
if err := db.Find(&l2Blocks).Error; err != nil {
return nil, fmt.Errorf("L2Block.GetUnchunkedBlocks error: %w", err)
return nil, fmt.Errorf("L2Block.GetL2WrappedBlocksGEHeight error: %w", err)
}
var wrappedBlocks []*types.WrappedBlock
@@ -87,18 +88,18 @@ func (o *L2Block) GetUnchunkedBlocks(ctx context.Context, limit int) ([]*types.W
var wrappedBlock types.WrappedBlock
if err := json.Unmarshal([]byte(v.Transactions), &wrappedBlock.Transactions); err != nil {
return nil, fmt.Errorf("L2Block.GetUnchunkedBlocks error: %w", err)
return nil, fmt.Errorf("L2Block.GetL2WrappedBlocksGEHeight error: %w", err)
}
wrappedBlock.Header = &gethTypes.Header{}
if err := json.Unmarshal([]byte(v.Header), wrappedBlock.Header); err != nil {
return nil, fmt.Errorf("L2Block.GetUnchunkedBlocks error: %w", err)
return nil, fmt.Errorf("L2Block.GetL2WrappedBlocksGEHeight error: %w", err)
}
wrappedBlock.WithdrawRoot = common.HexToHash(v.WithdrawRoot)
if err := json.Unmarshal([]byte(v.RowConsumption), &wrappedBlock.RowConsumption); err != nil {
return nil, fmt.Errorf("L2Block.GetUnchunkedBlocks error: %w", err)
return nil, fmt.Errorf("L2Block.GetL2WrappedBlocksGEHeight error: %w", err)
}
wrappedBlocks = append(wrappedBlocks, &wrappedBlock)

View File

@@ -101,25 +101,26 @@ func TestL2BlockOrm(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, uint64(3), height)
blocks, err := l2BlockOrm.GetUnchunkedBlocks(context.Background(), 0)
blocks, err := l2BlockOrm.GetL2Blocks(context.Background(), map[string]interface{}{}, []string{}, 0)
assert.NoError(t, err)
assert.Len(t, blocks, 2)
assert.Equal(t, wrappedBlock1, blocks[0])
assert.Equal(t, wrappedBlock2, blocks[1])
assert.Equal(t, "", blocks[0].ChunkHash)
assert.Equal(t, "", blocks[1].ChunkHash)
blocks, err = l2BlockOrm.GetL2BlocksInRange(context.Background(), 2, 3)
wrappedBlocks, err := l2BlockOrm.GetL2BlocksInRange(context.Background(), 2, 3)
assert.NoError(t, err)
assert.Len(t, blocks, 2)
assert.Equal(t, wrappedBlock1, blocks[0])
assert.Equal(t, wrappedBlock2, blocks[1])
assert.Equal(t, wrappedBlock1, wrappedBlocks[0])
assert.Equal(t, wrappedBlock2, wrappedBlocks[1])
err = l2BlockOrm.UpdateChunkHashInRange(context.Background(), 2, 2, "test hash")
assert.NoError(t, err)
blocks, err = l2BlockOrm.GetUnchunkedBlocks(context.Background(), 0)
blocks, err = l2BlockOrm.GetL2Blocks(context.Background(), map[string]interface{}{}, []string{}, 0)
assert.NoError(t, err)
assert.Len(t, blocks, 1)
assert.Equal(t, wrappedBlock2, blocks[0])
assert.Len(t, blocks, 2)
assert.Equal(t, "test hash", blocks[0].ChunkHash)
assert.Equal(t, "", blocks[1].ChunkHash)
}
func TestChunkOrm(t *testing.T) {
@@ -135,11 +136,13 @@ func TestChunkOrm(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, dbChunk2.Hash, chunkHash2.Hex())
chunks, err := chunkOrm.GetUnbatchedChunks(context.Background(), 0)
chunks, err := chunkOrm.GetChunksGEIndex(context.Background(), 0, 0)
assert.NoError(t, err)
assert.Len(t, chunks, 2)
assert.Equal(t, chunkHash1.Hex(), chunks[0].Hash)
assert.Equal(t, chunkHash2.Hex(), chunks[1].Hash)
assert.Equal(t, "", chunks[0].BatchHash)
assert.Equal(t, "", chunks[1].BatchHash)
err = chunkOrm.UpdateProvingStatus(context.Background(), chunkHash1.Hex(), types.ProvingTaskVerified)
assert.NoError(t, err)
@@ -156,9 +159,13 @@ func TestChunkOrm(t *testing.T) {
err = chunkOrm.UpdateBatchHashInRange(context.Background(), 0, 0, "test hash")
assert.NoError(t, err)
chunks, err = chunkOrm.GetUnbatchedChunks(context.Background(), 0)
chunks, err = chunkOrm.GetChunksGEIndex(context.Background(), 0, 0)
assert.NoError(t, err)
assert.Len(t, chunks, 1)
assert.Len(t, chunks, 2)
assert.Equal(t, chunkHash1.Hex(), chunks[0].Hash)
assert.Equal(t, chunkHash2.Hex(), chunks[1].Hash)
assert.Equal(t, "test hash", chunks[0].BatchHash)
assert.Equal(t, "", chunks[1].BatchHash)
}
func TestBatchOrm(t *testing.T) {

View File

@@ -66,8 +66,12 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {
}, db, nil)
cp.TryProposeChunk()
batchOrm := orm.NewBatch(db)
unbatchedChunkIndex, err := batchOrm.GetFirstUnbatchedChunkIndex(context.Background())
assert.NoError(t, err)
chunkOrm := orm.NewChunk(db)
chunks, err := chunkOrm.GetUnbatchedChunks(context.Background(), 0)
chunks, err := chunkOrm.GetChunksGEIndex(context.Background(), unbatchedChunkIndex, 0)
assert.NoError(t, err)
assert.Len(t, chunks, 1)
@@ -81,7 +85,6 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {
l2Relayer.ProcessPendingBatches()
batchOrm := orm.NewBatch(db)
batch, err := batchOrm.GetLatestBatch(context.Background())
assert.NoError(t, err)
assert.NotNil(t, batch)

View File

@@ -211,6 +211,7 @@ func (a *ProofMsg) PublicKey() (string, error) {
// TaskMsg is a wrapper type around db ProveTask type.
type TaskMsg struct {
UUID string `json:"uuid"`
ID string `json:"id"`
Type ProofType `json:"type,omitempty"`
BatchTaskDetail *BatchTaskDetail `json:"batch_task_detail,omitempty"`

View File

@@ -7,7 +7,7 @@ import (
"strings"
)
var tag = "v4.1.116"
var tag = "v4.2.0"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -47,7 +47,10 @@ require (
google.golang.org/protobuf v1.31.0 // indirect
)
require github.com/prometheus/client_golang v1.14.0
require (
github.com/google/uuid v1.3.0
github.com/prometheus/client_golang v1.14.0
)
require (
github.com/beorn7/perks v1.0.1 // indirect

View File

@@ -73,6 +73,8 @@ github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/iden3/go-iden3-crypto v0.0.15 h1:4MJYlrot1l31Fzlo2sF56u7EVFeHHJkxGXXZCtESgK4=

View File

@@ -163,16 +163,14 @@ func (c *Collector) check(assignedProverTasks []orm.ProverTask, timeout promethe
"prover public key", assignedProverTask.ProverPublicKey, "prover name", assignedProverTask.ProverName, "task type", assignedProverTask.TaskType)
err := c.db.Transaction(func(tx *gorm.DB) error {
// update prover task proving status as ProverProofInvalid
if err := c.proverTaskOrm.UpdateProverTaskProvingStatus(c.ctx, message.ProofType(assignedProverTask.TaskType),
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverProofInvalid, tx); err != nil {
log.Error("update prover task proving status failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
if err := c.proverTaskOrm.UpdateProverTaskProvingStatus(c.ctx, assignedProverTask.UUID, types.ProverProofInvalid, tx); err != nil {
log.Error("update prover task proving status failure", "uuid", assignedProverTask.UUID, "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
return err
}
// update prover task failure type
if err := c.proverTaskOrm.UpdateProverTaskFailureType(c.ctx, message.ProofType(assignedProverTask.TaskType),
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverTaskFailureTypeTimeout, tx); err != nil {
log.Error("update prover task failure type failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
if err := c.proverTaskOrm.UpdateProverTaskFailureType(c.ctx, assignedProverTask.UUID, types.ProverTaskFailureTypeTimeout, tx); err != nil {
log.Error("update prover task failure type failure", "uuid", assignedProverTask.UUID, "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
return err
}

View File

@@ -128,13 +128,13 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
}
// Store session info.
if err = bp.proverTaskOrm.SetProverTask(ctx, &proverTask); err != nil {
if err = bp.proverTaskOrm.InsertProverTask(ctx, &proverTask); err != nil {
bp.recoverProvingStatus(ctx, batchTask)
log.Error("db set session info fail", "task hash", batchTask.Hash, "prover name", proverName.(string), "prover pubKey", publicKey.(string), "err", err)
log.Error("insert batch prover task info fail", "taskID", batchTask.Hash, "publicKey", publicKey, "err", err)
return nil, ErrCoordinatorInternalFailure
}
taskMsg, err := bp.formatProverTask(ctx, batchTask.Hash)
taskMsg, err := bp.formatProverTask(ctx, &proverTask)
if err != nil {
bp.recoverProvingStatus(ctx, batchTask)
log.Error("format prover task failure", "hash", batchTask.Hash, "err", err)
@@ -146,11 +146,11 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
return taskMsg, nil
}
func (bp *BatchProverTask) formatProverTask(ctx context.Context, taskID string) (*coordinatorType.GetTaskSchema, error) {
func (bp *BatchProverTask) formatProverTask(ctx context.Context, task *orm.ProverTask) (*coordinatorType.GetTaskSchema, error) {
// get chunk from db
chunks, err := bp.chunkOrm.GetChunksByBatchHash(ctx, taskID)
chunks, err := bp.chunkOrm.GetChunksByBatchHash(ctx, task.TaskID)
if err != nil {
err = fmt.Errorf("failed to get chunk proofs for batch task id:%s err:%w ", taskID, err)
err = fmt.Errorf("failed to get chunk proofs for batch task id:%s err:%w ", task.TaskID, err)
return nil, err
}
@@ -159,7 +159,7 @@ func (bp *BatchProverTask) formatProverTask(ctx context.Context, taskID string)
for _, chunk := range chunks {
var proof message.ChunkProof
if encodeErr := json.Unmarshal(chunk.Proof, &proof); encodeErr != nil {
return nil, fmt.Errorf("Chunk.GetProofsByBatchHash unmarshal proof error: %w, batch hash: %v, chunk hash: %v", encodeErr, taskID, chunk.Hash)
return nil, fmt.Errorf("Chunk.GetProofsByBatchHash unmarshal proof error: %w, batch hash: %v, chunk hash: %v", encodeErr, task.TaskID, chunk.Hash)
}
chunkProofs = append(chunkProofs, &proof)
@@ -181,11 +181,12 @@ func (bp *BatchProverTask) formatProverTask(ctx context.Context, taskID string)
chunkProofsBytes, err := json.Marshal(taskDetail)
if err != nil {
return nil, fmt.Errorf("failed to marshal chunk proofs, taskID:%s err:%w", taskID, err)
return nil, fmt.Errorf("failed to marshal chunk proofs, taskID:%s err:%w", task.TaskID, err)
}
taskMsg := &coordinatorType.GetTaskSchema{
TaskID: taskID,
UUID: task.UUID.String(),
TaskID: task.TaskID,
TaskType: int(message.ProofTypeBatch),
TaskData: string(chunkProofsBytes),
}

View File

@@ -132,13 +132,14 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
// here why need use UTC time. see scroll/common/databased/db.go
AssignedAt: utils.NowUTC(),
}
if err = cp.proverTaskOrm.SetProverTask(ctx, &proverTask); err != nil {
if err = cp.proverTaskOrm.InsertProverTask(ctx, &proverTask); err != nil {
cp.recoverProvingStatus(ctx, chunkTask)
log.Error("db set session info fail", "task hash", chunkTask.Hash, "prover name", proverName.(string), "prover pubKey", publicKey.(string), "err", err)
log.Error("insert chunk prover task fail", "taskID", chunkTask.Hash, "publicKey", publicKey, "err", err)
return nil, ErrCoordinatorInternalFailure
}
taskMsg, err := cp.formatProverTask(ctx, chunkTask.Hash)
taskMsg, err := cp.formatProverTask(ctx, &proverTask)
if err != nil {
cp.recoverProvingStatus(ctx, chunkTask)
log.Error("format prover task failure", "hash", chunkTask.Hash, "err", err)
@@ -150,11 +151,11 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
return taskMsg, nil
}
func (cp *ChunkProverTask) formatProverTask(ctx context.Context, hash string) (*coordinatorType.GetTaskSchema, error) {
func (cp *ChunkProverTask) formatProverTask(ctx context.Context, task *orm.ProverTask) (*coordinatorType.GetTaskSchema, error) {
// Get block hashes.
wrappedBlocks, wrappedErr := cp.blockOrm.GetL2BlocksByChunkHash(ctx, hash)
wrappedBlocks, wrappedErr := cp.blockOrm.GetL2BlocksByChunkHash(ctx, task.TaskID)
if wrappedErr != nil || len(wrappedBlocks) == 0 {
return nil, fmt.Errorf("failed to fetch wrapped blocks, batch hash:%s err:%w", hash, wrappedErr)
return nil, fmt.Errorf("failed to fetch wrapped blocks, chunk hash:%s err:%w", task.TaskID, wrappedErr)
}
blockHashes := make([]common.Hash, len(wrappedBlocks))
@@ -167,11 +168,12 @@ func (cp *ChunkProverTask) formatProverTask(ctx context.Context, hash string) (*
}
blockHashesBytes, err := json.Marshal(taskDetail)
if err != nil {
return nil, fmt.Errorf("failed to marshal block hashes hash:%s, err:%w", hash, err)
return nil, fmt.Errorf("failed to marshal block hashes hash:%s, err:%w", task.TaskID, err)
}
proverTaskSchema := &coordinatorType.GetTaskSchema{
TaskID: hash,
UUID: task.UUID.String(),
TaskID: task.TaskID,
TaskType: int(message.ProofTypeChunk),
TaskData: string(blockHashesBytes),
}

View File

@@ -57,10 +57,6 @@ func (b *BaseProverTask) checkAttemptsExceeded(hash string, taskType message.Pro
log.Error("failed to update batch proving_status as failed", "msg.ID", hash, "error", err)
}
}
// update the prover task status to let timeout checker don't check it.
if err := b.proverTaskOrm.UpdateAllProverTaskProvingStatusOfTaskID(b.ctx, message.ProofType(proverTasks[0].TaskType), hash, types.ProverProofInvalid, tx); err != nil {
log.Error("failed to update prover task proving_status as failed", "msg.ID", hash, "error", err)
}
return nil
})
if transErr == nil {

View File

@@ -135,10 +135,21 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
return fmt.Errorf("get ProverVersion from context failed")
}
proverTask, err := m.proverTaskOrm.GetProverTaskByTaskIDAndProver(ctx, proofMsg.Type, proofMsg.ID, pk, pv)
if proverTask == nil || err != nil {
log.Error("get none prover task for the proof", "key", pk, "taskID", proofMsg.ID, "error", err)
return ErrValidatorFailureProverTaskEmpty
var proverTask *orm.ProverTask
var err error
if proofParameter.UUID != "" {
proverTask, err = m.proverTaskOrm.GetProverTaskByUUIDAndPublicKey(ctx, proofParameter.UUID, pk)
if proverTask == nil || err != nil {
log.Error("get none prover task for the proof", "uuid", proofParameter.UUID, "key", pk, "taskID", proofMsg.ID, "error", err)
return ErrValidatorFailureProverTaskEmpty
}
} else {
// TODO When prover all have upgrade, need delete this logic
proverTask, err = m.proverTaskOrm.GetAssignedProverTaskByTaskIDAndProver(ctx, proofMsg.Type, proofMsg.ID, pk, pv)
if proverTask == nil || err != nil {
log.Error("get none prover task for the proof", "key", pk, "taskID", proofMsg.ID, "error", err)
return ErrValidatorFailureProverTaskEmpty
}
}
proofTime := time.Since(proverTask.CreatedAt)
@@ -163,7 +174,7 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
if verifyErr != nil || !success {
m.verifierFailureTotal.WithLabelValues(pv).Inc()
m.proofRecover(ctx, proofMsg.ID, pk, proofMsg)
m.proofRecover(ctx, proverTask, proofMsg)
log.Info("proof verified by coordinator failed", "proof id", proofMsg.ID, "prover name", proverTask.ProverName,
"prover pk", pk, "prove type", proofMsg.Type, "proof time", proofTimeSec, "error", verifyErr)
@@ -179,9 +190,9 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
log.Info("proof verified and valid", "proof id", proofMsg.ID, "prover name", proverTask.ProverName,
"prover pk", pk, "prove type", proofMsg.Type, "proof time", proofTimeSec)
if err := m.closeProofTask(ctx, proofMsg.ID, pk, proofMsg, proofTimeSec); err != nil {
if err := m.closeProofTask(ctx, proverTask, proofMsg, proofTimeSec); err != nil {
m.proofSubmitFailure.Inc()
m.proofRecover(ctx, proofMsg.ID, pk, proofMsg)
m.proofRecover(ctx, proverTask, proofMsg)
return ErrCoordinatorInternalFailure
}
@@ -239,14 +250,14 @@ func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.Prov
// Verify if the proving task has already been assigned to another prover.
// Upon receiving an error message, it's possible the proving status has been reset by another prover
// and the task has been reassigned. In this case, the coordinator should avoid resetting the proving status.
m.processProverErr(ctx, proofMsg.ID, pk, proofMsg.Type)
m.processProverErr(ctx, proverTask)
m.validateFailureProverTaskStatusNotOk.Inc()
log.Info("proof generated by prover failed",
"taskType", proofMsg.Type, "hash", proofMsg.ID, "proverName", proverTask.ProverName,
"proverVersion", proverTask.ProverVersion, "proverPublicKey", pk, "failureType", proofParameter.FailureType,
"failureMessage", "failureMessage", failureMsg)
"failureMessage", failureMsg)
return ErrValidatorFailureProofMsgStatusNotOk
}
@@ -259,7 +270,7 @@ func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.Prov
}
// store the proof to prover task
if updateTaskProofErr := m.updateProverTaskProof(ctx, pk, proofMsg); updateTaskProofErr != nil {
if updateTaskProofErr := m.updateProverTaskProof(ctx, proverTask, proofMsg); updateTaskProofErr != nil {
log.Warn("update prover task proof failure", "hash", proofMsg.ID, "proverPublicKey", pk,
"taskType", proverTask.TaskType, "proverName", proverTask.ProverName, "error", updateTaskProofErr)
}
@@ -274,28 +285,28 @@ func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.Prov
return nil
}
func (m *ProofReceiverLogic) proofRecover(ctx context.Context, hash string, pubKey string, proofMsg *message.ProofMsg) {
log.Info("proof recover update proof status", "hash", hash, "proverPublicKey", pubKey,
func (m *ProofReceiverLogic) proofRecover(ctx context.Context, proverTask *orm.ProverTask, proofMsg *message.ProofMsg) {
log.Info("proof recover update proof status", "hash", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey,
"taskType", proofMsg.Type.String(), "status", types.ProvingTaskUnassigned.String())
if err := m.updateProofStatus(ctx, hash, pubKey, proofMsg, types.ProvingTaskUnassigned, 0); err != nil {
log.Error("failed to updated proof status ProvingTaskUnassigned", "hash", hash, "pubKey", pubKey, "error", err)
if err := m.updateProofStatus(ctx, proverTask, proofMsg, types.ProvingTaskUnassigned, 0); err != nil {
log.Error("failed to updated proof status ProvingTaskUnassigned", "hash", proverTask.TaskID, "pubKey", proverTask.ProverPublicKey, "error", err)
}
}
func (m *ProofReceiverLogic) closeProofTask(ctx context.Context, hash string, pubKey string, proofMsg *message.ProofMsg, proofTimeSec uint64) error {
log.Info("proof close task update proof status", "hash", hash, "proverPublicKey", pubKey,
func (m *ProofReceiverLogic) closeProofTask(ctx context.Context, proverTask *orm.ProverTask, proofMsg *message.ProofMsg, proofTimeSec uint64) error {
log.Info("proof close task update proof status", "hash", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey,
"taskType", proofMsg.Type.String(), "status", types.ProvingTaskVerified.String())
if err := m.updateProofStatus(ctx, hash, pubKey, proofMsg, types.ProvingTaskVerified, proofTimeSec); err != nil {
log.Error("failed to updated proof status ProvingTaskVerified", "hash", hash, "proverPublicKey", pubKey, "error", err)
if err := m.updateProofStatus(ctx, proverTask, proofMsg, types.ProvingTaskVerified, proofTimeSec); err != nil {
log.Error("failed to updated proof status ProvingTaskVerified", "hash", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "error", err)
return err
}
return nil
}
// UpdateProofStatus update the chunk/batch task and session info status
func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string, proverPublicKey string, proofMsg *message.ProofMsg, status types.ProvingStatus, proofTimeSec uint64) error {
func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, proverTask *orm.ProverTask, proofMsg *message.ProofMsg, status types.ProvingStatus, proofTimeSec uint64) error {
var proverTaskStatus types.ProverProveStatus
switch status {
case types.ProvingTaskFailed, types.ProvingTaskUnassigned:
@@ -305,13 +316,13 @@ func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string,
}
err := m.db.Transaction(func(tx *gorm.DB) error {
if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatus(ctx, proofMsg.Type, hash, proverPublicKey, proverTaskStatus, tx); updateErr != nil {
if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatus(ctx, proverTask.UUID, proverTaskStatus, tx); updateErr != nil {
return updateErr
}
// if the block batch has proof verified, so the failed status not update block batch proving status
if m.checkIsTaskSuccess(ctx, hash, proofMsg.Type) {
log.Info("update proof status skip because this chunk / batch has been verified", "hash", hash, "public key", proverPublicKey)
if m.checkIsTaskSuccess(ctx, proverTask.TaskID, proofMsg.Type) {
log.Info("update proof status skip because this chunk / batch has been verified", "hash", proverTask.TaskID, "public key", proverTask.ProverPublicKey)
return nil
}
@@ -324,20 +335,20 @@ func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string,
storeProofErr = m.batchOrm.UpdateProofByHash(ctx, proofMsg.ID, proofMsg.BatchProof, proofTimeSec, tx)
}
if storeProofErr != nil {
log.Error("failed to store chunk/batch proof into db", "hash", hash, "public key", proverPublicKey, "error", storeProofErr)
log.Error("failed to store chunk/batch proof into db", "hash", proverTask.TaskID, "public key", proverTask.ProverPublicKey, "error", storeProofErr)
return storeProofErr
}
}
switch proofMsg.Type {
case message.ProofTypeChunk:
if err := m.chunkOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil {
log.Error("failed to update chunk proving_status as failed", "msg.ID", hash, "error", err)
if err := m.chunkOrm.UpdateProvingStatus(ctx, proverTask.TaskID, status, tx); err != nil {
log.Error("failed to update chunk proving_status as failed", "hash", proverTask.TaskID, "error", err)
return err
}
case message.ProofTypeBatch:
if err := m.batchOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil {
log.Error("failed to update batch proving_status as failed", "msg.ID", hash, "error", err)
if err := m.batchOrm.UpdateProvingStatus(ctx, proverTask.TaskID, status, tx); err != nil {
log.Error("failed to update batch proving_status as failed", "hash", proverTask.TaskID, "error", err)
return err
}
}
@@ -349,7 +360,7 @@ func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string,
}
if status == types.ProvingTaskVerified && proofMsg.Type == message.ProofTypeChunk {
if checkReadyErr := m.checkAreAllChunkProofsReady(ctx, hash); checkReadyErr != nil {
if checkReadyErr := m.checkAreAllChunkProofsReady(ctx, proverTask.TaskID); checkReadyErr != nil {
log.Error("failed to check are all chunk proofs ready", "error", checkReadyErr)
return checkReadyErr
}
@@ -378,14 +389,15 @@ func (m *ProofReceiverLogic) checkIsTaskSuccess(ctx context.Context, hash string
return provingStatus == types.ProvingTaskVerified
}
func (m *ProofReceiverLogic) processProverErr(ctx context.Context, taskID, pk string, taskType message.ProofType) {
if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatus(ctx, taskType, taskID, pk, types.ProverProofInvalid); updateErr != nil {
log.Error("update prover task proving status failure", "taskID", taskID, "proverPublicKey", pk, "taskType", taskType, "error", updateErr)
func (m *ProofReceiverLogic) processProverErr(ctx context.Context, proverTask *orm.ProverTask) {
if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatus(ctx, proverTask.UUID, types.ProverProofInvalid); updateErr != nil {
log.Error("update prover task proving status failure", "uuid", proverTask.UUID, "taskID", proverTask.TaskID, "proverPublicKey",
proverTask.ProverPublicKey, "taskType", message.ProofType(proverTask.TaskType).String(), "error", updateErr)
}
proverTasks, err := m.proverTaskOrm.GetAssignedTaskOfOtherProvers(ctx, taskType, taskID, pk)
proverTasks, err := m.proverTaskOrm.GetAssignedTaskOfOtherProvers(ctx, message.ProofType(proverTask.TaskType), proverTask.TaskID, proverTask.ProverPublicKey)
if err != nil {
log.Warn("checkIsAssignedToOtherProver failure", "taskID", taskID, "proverPublicKey", pk, "taskType", taskType, "error", err)
log.Warn("checkIsAssignedToOtherProver failure", "taskID", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "taskType", message.ProofType(proverTask.TaskType).String(), "error", err)
return
}
@@ -393,19 +405,19 @@ func (m *ProofReceiverLogic) processProverErr(ctx context.Context, taskID, pk st
return
}
switch taskType {
switch message.ProofType(proverTask.TaskType) {
case message.ProofTypeChunk:
if err := m.chunkOrm.UpdateProvingStatusFromProverError(ctx, taskID, types.ProvingTaskUnassigned); err != nil {
log.Error("failed to update chunk proving_status as failed", taskID, "proverPublicKey", pk, "taskType", taskType, "error", err)
if err := m.chunkOrm.UpdateProvingStatusFromProverError(ctx, proverTask.TaskID, types.ProvingTaskUnassigned); err != nil {
log.Error("failed to update chunk proving_status as failed", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "taskType", message.ProofType(proverTask.TaskType).String(), "error", err)
}
case message.ProofTypeBatch:
if err := m.batchOrm.UpdateProvingStatusFromProverError(ctx, taskID, types.ProvingTaskUnassigned); err != nil {
log.Error("failed to update batch proving_status as failed", taskID, "proverPublicKey", pk, "taskType", taskType, "error", err)
if err := m.batchOrm.UpdateProvingStatusFromProverError(ctx, proverTask.TaskID, types.ProvingTaskUnassigned); err != nil {
log.Error("failed to update batch proving_status as failed", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "taskType", message.ProofType(proverTask.TaskType).String(), "error", err)
}
}
}
func (m *ProofReceiverLogic) updateProverTaskProof(ctx context.Context, pk string, proofMsg *message.ProofMsg) error {
func (m *ProofReceiverLogic) updateProverTaskProof(ctx context.Context, proverTask *orm.ProverTask, proofMsg *message.ProofMsg) error {
// store the proof to prover task
var proofBytes []byte
var marshalErr error
@@ -419,5 +431,5 @@ func (m *ProofReceiverLogic) updateProverTaskProof(ctx context.Context, pk strin
if len(proofBytes) == 0 || marshalErr != nil {
return fmt.Errorf("updateProverTaskProof marshal proof error:%w", marshalErr)
}
return m.proverTaskOrm.UpdateProverTaskProof(ctx, proofMsg.Type, proofMsg.ID, pk, proofBytes)
return m.proverTaskOrm.UpdateProverTaskProof(ctx, proverTask.UUID, proofBytes)
}

View File

@@ -306,15 +306,8 @@ func (o *Chunk) UpdateProvingStatus(ctx context.Context, hash string, status typ
func (o *Chunk) UpdateProvingStatusFromProverError(ctx context.Context, hash string, status types.ProvingStatus) error {
updateFields := make(map[string]interface{})
updateFields["proving_status"] = int(status)
updateFields["prover_assigned_at"] = nil
switch status {
case types.ProvingTaskAssigned:
updateFields["prover_assigned_at"] = time.Now()
case types.ProvingTaskUnassigned:
updateFields["prover_assigned_at"] = nil
case types.ProvingTaskVerified:
updateFields["proved_at"] = time.Now()
}
db := o.db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("hash", hash).Where("proving_status", types.ProvingTaskAssigned)

View File

@@ -77,12 +77,13 @@ func TestProverTaskOrm(t *testing.T) {
AssignedAt: utils.NowUTC(),
}
err = proverTaskOrm.SetProverTask(context.Background(), &proverTask)
err = proverTaskOrm.InsertProverTask(context.Background(), &proverTask)
assert.NoError(t, err)
proverTasks, err := proverTaskOrm.GetProverTasksByHashes(context.Background(), message.ProofTypeChunk, []string{"test-hash"})
assert.NoError(t, err)
assert.Equal(t, 1, len(proverTasks))
assert.Equal(t, proverTask.ProverName, proverTasks[0].ProverName)
assert.NotEqual(t, proverTask.UUID.String(), "00000000-0000-0000-0000-000000000000")
// test decimal reward, get reward
resultReward := proverTasks[0].Reward.BigInt()
@@ -91,12 +92,8 @@ func TestProverTaskOrm(t *testing.T) {
proverTask.ProvingStatus = int16(types.ProverProofValid)
proverTask.AssignedAt = utils.NowUTC()
err = proverTaskOrm.SetProverTask(context.Background(), &proverTask)
assert.NoError(t, err)
proverTasks, err = proverTaskOrm.GetProverTasksByHashes(context.Background(), message.ProofTypeChunk, []string{"test-hash"})
assert.NoError(t, err)
assert.Equal(t, 1, len(proverTasks))
assert.Equal(t, proverTask.ProvingStatus, proverTasks[0].ProvingStatus)
err = proverTaskOrm.InsertProverTask(context.Background(), &proverTask)
assert.Error(t, err)
}
func TestProverTaskOrmUint256(t *testing.T) {
@@ -117,8 +114,9 @@ func TestProverTaskOrmUint256(t *testing.T) {
AssignedAt: utils.NowUTC(),
}
err = proverTaskOrm.SetProverTask(context.Background(), &proverTask)
err = proverTaskOrm.InsertProverTask(context.Background(), &proverTask)
assert.NoError(t, err)
assert.NotEqual(t, proverTask.UUID.String(), "00000000-0000-0000-0000-000000000000")
proverTasksUint256, err := proverTaskOrm.GetProverTasksByHashes(context.Background(), message.ProofTypeChunk, []string{"test-hash"})
assert.NoError(t, err)
assert.Equal(t, 1, len(proverTasksUint256))

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"time"
"github.com/google/uuid"
"github.com/shopspring/decimal"
"gorm.io/gorm"
"gorm.io/gorm/clause"
@@ -18,7 +19,8 @@ import (
type ProverTask struct {
db *gorm.DB `gorm:"column:-"`
ID int64 `json:"id" gorm:"column:id"`
ID int64 `json:"id" gorm:"column:id"`
UUID uuid.UUID `json:"uuid" gorm:"column:uuid;type:uuid;default:gen_random_uuid()"`
// prover
ProverPublicKey string `json:"prover_public_key" gorm:"column:prover_public_key"`
@@ -114,14 +116,16 @@ func (o *ProverTask) GetProverTasksByHashes(ctx context.Context, taskType messag
return proverTasks, nil
}
// GetProverTaskByTaskIDAndProver get prover task taskID and public key
func (o *ProverTask) GetProverTaskByTaskIDAndProver(ctx context.Context, taskType message.ProofType, taskID, proverPublicKey, proverVersion string) (*ProverTask, error) {
// GetAssignedProverTaskByTaskIDAndProver get prover task taskID and public key
// TODO: when prover all upgrade need DEPRECATED this function
func (o *ProverTask) GetAssignedProverTaskByTaskIDAndProver(ctx context.Context, taskType message.ProofType, taskID, proverPublicKey, proverVersion string) (*ProverTask, error) {
db := o.db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("task_type", int(taskType))
db = db.Where("task_id", taskID)
db = db.Where("prover_public_key", proverPublicKey)
db = db.Where("prover_version", proverVersion)
db = db.Where("proving_status", types.ProverAssigned)
var proverTask ProverTask
err := db.First(&proverTask).Error
@@ -131,6 +135,21 @@ func (o *ProverTask) GetProverTaskByTaskIDAndProver(ctx context.Context, taskTyp
return &proverTask, nil
}
// GetProverTaskByUUIDAndPublicKey get prover task taskID by uuid and public key
func (o *ProverTask) GetProverTaskByUUIDAndPublicKey(ctx context.Context, uuid, publicKey string) (*ProverTask, error) {
db := o.db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("uuid", uuid)
db = db.Where("prover_public_key", publicKey)
var proverTask ProverTask
err := db.First(&proverTask).Error
if err != nil {
return nil, fmt.Errorf("ProverTask.GetProverTaskByUUID err:%w, uuid:%s publicKey:%s", err, uuid, publicKey)
}
return &proverTask, nil
}
// GetAssignedTaskOfOtherProvers get the chunk/batch task assigned other provers
func (o *ProverTask) GetAssignedTaskOfOtherProvers(ctx context.Context, taskType message.ProofType, taskID, proverPublicKey string) ([]ProverTask, error) {
db := o.db.WithContext(ctx)
@@ -199,96 +218,59 @@ func (o *ProverTask) TaskTimeoutMoreThanOnce(ctx context.Context, taskType messa
return false
}
// SetProverTask updates or inserts a ProverTask record.
func (o *ProverTask) SetProverTask(ctx context.Context, proverTask *ProverTask, dbTX ...*gorm.DB) error {
// InsertProverTask insert a prover Task record
func (o *ProverTask) InsertProverTask(ctx context.Context, proverTask *ProverTask, dbTX ...*gorm.DB) error {
db := o.db.WithContext(ctx)
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.Clauses(clause.Returning{})
db = db.Model(&ProverTask{})
db = db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "task_type"}, {Name: "task_id"}, {Name: "prover_public_key"}, {Name: "prover_version"}},
DoUpdates: clause.AssignmentColumns([]string{"proving_status", "failure_type", "assigned_at"}),
})
if err := db.Create(&proverTask).Error; err != nil {
return fmt.Errorf("ProverTask.SetProverTask error: %w, prover task: %v", err, proverTask)
if err := db.Create(proverTask).Error; err != nil {
return fmt.Errorf("ProverTask.InsertProverTask error: %w, prover task: %v", err, proverTask)
}
return nil
}
// UpdateProverTaskProof update the prover task's proof
func (o *ProverTask) UpdateProverTaskProof(ctx context.Context, proofType message.ProofType, taskID string, pk string, proof []byte) error {
func (o *ProverTask) UpdateProverTaskProof(ctx context.Context, uuid uuid.UUID, proof []byte) error {
db := o.db
db = db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("task_type = ? AND task_id = ? AND prover_public_key = ?", int(proofType), taskID, pk)
db = db.Where("uuid = ?", uuid)
if err := db.Update("proof", proof).Error; err != nil {
return fmt.Errorf("ProverTask.UpdateProverTaskProof error: %w, proof type: %v, taskID: %v, prover public key: %v", err, proofType.String(), taskID, pk)
return fmt.Errorf("ProverTask.UpdateProverTaskProof error: %w, uuid: %v", err, uuid)
}
return nil
}
// UpdateProverTaskProvingStatus updates the proving_status of a specific ProverTask record.
func (o *ProverTask) UpdateProverTaskProvingStatus(ctx context.Context, proofType message.ProofType, taskID string, pk string, status types.ProverProveStatus, dbTX ...*gorm.DB) error {
func (o *ProverTask) UpdateProverTaskProvingStatus(ctx context.Context, uuid uuid.UUID, status types.ProverProveStatus, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("task_type = ? AND task_id = ? AND prover_public_key = ?", int(proofType), taskID, pk)
db = db.Where("uuid = ?", uuid)
if err := db.Update("proving_status", status).Error; err != nil {
return fmt.Errorf("ProverTask.UpdateProverTaskProvingStatus error: %w, proof type: %v, taskID: %v, prover public key: %v, status: %v", err, proofType.String(), taskID, pk, status.String())
}
return nil
}
// UpdateAllProverTaskProvingStatusOfTaskID updates all the proving_status of a specific task id.
func (o *ProverTask) UpdateAllProverTaskProvingStatusOfTaskID(ctx context.Context, proofType message.ProofType, taskID string, status types.ProverProveStatus, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("task_type = ? AND task_id = ?", int(proofType), taskID)
if err := db.Update("proving_status", status).Error; err != nil {
return fmt.Errorf("ProverTask.UpdateAllProverTaskProvingStatusOfTaskID error: %w, proof type: %v, taskID: %v, status: %v", err, proofType.String(), taskID, status.String())
return fmt.Errorf("ProverTask.UpdateProverTaskProvingStatus error: %w, uuid:%s, status: %v", err, uuid, status.String())
}
return nil
}
// UpdateProverTaskFailureType update the prover task failure type
func (o *ProverTask) UpdateProverTaskFailureType(ctx context.Context, proofType message.ProofType, taskID string, pk string, failureType types.ProverTaskFailureType, dbTX ...*gorm.DB) error {
func (o *ProverTask) UpdateProverTaskFailureType(ctx context.Context, uuid uuid.UUID, failureType types.ProverTaskFailureType, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("task_id", taskID).Where("prover_public_key", pk).Where("task_type", int(proofType))
db = db.Where("uuid", uuid)
if err := db.Update("failure_type", int(failureType)).Error; err != nil {
return fmt.Errorf("ProverTask.UpdateProverTaskFailureType error: %w, proof type: %v, taskID: %v, prover public key: %v, failure type: %v", err, proofType.String(), taskID, pk, failureType.String())
}
return nil
}
// UpdateAllProverTaskFailureTypeOfTaskID update the prover task failure type
func (o *ProverTask) UpdateAllProverTaskFailureTypeOfTaskID(ctx context.Context, proofType message.ProofType, taskID string, failureType types.ProverTaskFailureType, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("task_id", taskID).Where("task_type", int(proofType))
if err := db.Update("failure_type", int(failureType)).Error; err != nil {
return fmt.Errorf("ProverTask.UpdateAllProverTaskFailureTypeOfTaskID error: %w, proof type: %v, taskID: %v, failure type: %v", err, proofType.String(), taskID, failureType.String())
return fmt.Errorf("ProverTask.UpdateProverTaskFailureType error: %w, uuid:%s, failure type: %v", err, uuid.String(), failureType.String())
}
return nil
}

View File

@@ -9,6 +9,7 @@ type GetTaskParameter struct {
// GetTaskSchema the schema data return to prover for get prover task
type GetTaskSchema struct {
UUID string `json:"uuid"`
TaskID string `json:"task_id"`
TaskType int `json:"task_type"`
TaskData string `json:"task_data"`

View File

@@ -2,6 +2,8 @@ package types
// SubmitProofParameter the SubmitProof api request parameter
type SubmitProofParameter struct {
// TODO when prover have upgrade, need change this field to required
UUID string `form:"uuid" json:"uuid"`
TaskID string `form:"task_id" json:"task_id" binding:"required"`
TaskType int `form:"task_type" json:"task_type" binding:"required"`
Status int `form:"status" json:"status"`

View File

@@ -63,7 +63,7 @@ func testResetDB(t *testing.T) {
cur, err := Current(pgDB.DB)
assert.NoError(t, err)
// total number of tables.
assert.Equal(t, 8, int(cur))
assert.Equal(t, 9, int(cur))
}
func testMigrate(t *testing.T) {

View File

@@ -0,0 +1,16 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE prover_task ADD COLUMN uuid uuid DEFAULT gen_random_uuid() NOT NULL UNIQUE;
create index if not exists idx_uuid on prover_task (uuid) where deleted_at IS NULL;
ALTER TABLE prover_task DROP CONSTRAINT uk_tasktype_taskid_publickey_version;
drop index if exists uk_tasktype_taskid_publickey_version;
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
create unique index if not exists uk_tasktype_taskid_publickey_version
on prover_task (task_type, task_id, prover_public_key, prover_version) where deleted_at IS NULL;
-- +goose StatementEnd

View File

@@ -51,6 +51,7 @@ type GetTaskResponse struct {
ErrCode int `json:"errcode"`
ErrMsg string `json:"errmsg"`
Data *struct {
UUID string `json:"uuid"`
TaskID string `json:"task_id"`
TaskType int `json:"task_type"`
TaskData string `json:"task_data"`
@@ -59,6 +60,7 @@ type GetTaskResponse struct {
// SubmitProofRequest defines the request structure for the SubmitProof API.
type SubmitProofRequest struct {
UUID string `json:"uuid"`
TaskID string `json:"task_id"`
TaskType int `json:"task_type"`
Status int `json:"status"`

View File

@@ -4,6 +4,7 @@ go 1.19
require (
github.com/go-resty/resty/v2 v2.7.0
github.com/google/uuid v1.3.0
github.com/scroll-tech/go-ethereum v1.10.14-0.20230812030736-25fe3ba69a28
github.com/stretchr/testify v1.8.3
github.com/urfave/cli/v2 v2.25.7
@@ -19,7 +20,6 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/holiman/uint256 v1.2.3 // indirect
github.com/huin/goupnp v1.0.3 // indirect

View File

@@ -164,7 +164,7 @@ func (r *Prover) proveAndSubmit() error {
log.Error("failed to prove task", "task_type", task.Task.Type, "task-id", task.Task.ID, "err", err)
return r.submitErr(task, message.ProofFailureNoPanic, err)
}
return r.submitProof(proofMsg)
return r.submitProof(proofMsg, task.Task.UUID)
}
// if tried times >= 3, it's probably due to circuit proving panic
@@ -203,6 +203,7 @@ func (r *Prover) fetchTaskFromCoordinator() (*store.ProvingTask, error) {
// create a new TaskMsg
taskMsg := message.TaskMsg{
UUID: resp.Data.UUID,
ID: resp.Data.TaskID,
Type: message.ProofType(resp.Data.TaskType),
}
@@ -295,9 +296,10 @@ func (r *Prover) proveBatch(task *store.ProvingTask) (*message.BatchProof, error
return r.proverCore.ProveBatch(task.Task.ID, task.Task.BatchTaskDetail.ChunkInfos, task.Task.BatchTaskDetail.ChunkProofs)
}
func (r *Prover) submitProof(msg *message.ProofDetail) error {
func (r *Prover) submitProof(msg *message.ProofDetail, uuid string) error {
// prepare the submit request
req := &client.SubmitProofRequest{
UUID: uuid,
TaskID: msg.ID,
TaskType: int(msg.Type),
Status: int(msg.Status),
@@ -344,6 +346,7 @@ func (r *Prover) submitProof(msg *message.ProofDetail) error {
func (r *Prover) submitErr(task *store.ProvingTask, proofFailureType message.ProofFailureType, err error) error {
// prepare the submit request
req := &client.SubmitProofRequest{
UUID: task.Task.UUID,
TaskID: task.Task.ID,
TaskType: int(task.Task.Type),
Status: int(message.StatusProofError),

View File

@@ -6,6 +6,7 @@ import (
"strconv"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"scroll-tech/common/types/message"
@@ -23,9 +24,12 @@ func TestStack(t *testing.T) {
defer s.Close()
for i := 0; i < 3; i++ {
taskUUID, uuidErr := uuid.NewRandom()
assert.NoError(t, uuidErr)
task := &ProvingTask{
Task: &message.TaskMsg{
ID: strconv.Itoa(i),
UUID: taskUUID.String(),
ID: strconv.Itoa(i),
},
Times: 0,
}
@@ -44,9 +48,12 @@ func TestStack(t *testing.T) {
}
// test times
taskUUID, uuidErr := uuid.NewRandom()
assert.NoError(t, uuidErr)
task := &ProvingTask{
Task: &message.TaskMsg{
ID: strconv.Itoa(1),
UUID: taskUUID.String(),
ID: strconv.Itoa(1),
},
Times: 0,
}