mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-11 23:18:07 -05:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1dfca3b7c0 | ||
|
|
826e847b5a |
@@ -148,7 +148,12 @@ func (p *BatchProposer) updateBatchInfoInDB(dbChunks []*orm.Chunk) error {
|
||||
}
|
||||
|
||||
func (p *BatchProposer) proposeBatchChunks() ([]*orm.Chunk, error) {
|
||||
dbChunks, err := p.chunkOrm.GetUnbatchedChunks(p.ctx, int(p.maxChunkNumPerBatch)+1)
|
||||
unbatchedChunkIndex, err := p.batchOrm.GetFirstUnbatchedChunkIndex(p.ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dbChunks, err := p.chunkOrm.GetChunksGEIndex(p.ctx, unbatchedChunkIndex, int(p.maxChunkNumPerBatch)+1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -39,11 +39,6 @@ func testBatchProposer(t *testing.T) {
|
||||
}, db, nil)
|
||||
bp.TryProposeBatch()
|
||||
|
||||
chunkOrm := orm.NewChunk(db)
|
||||
chunks, err := chunkOrm.GetUnbatchedChunks(context.Background(), 0)
|
||||
assert.NoError(t, err)
|
||||
assert.Empty(t, chunks)
|
||||
|
||||
batchOrm := orm.NewBatch(db)
|
||||
// get all batches.
|
||||
batches, err := batchOrm.GetBatches(context.Background(), map[string]interface{}{}, []string{}, 0)
|
||||
@@ -54,6 +49,7 @@ func testBatchProposer(t *testing.T) {
|
||||
assert.Equal(t, types.RollupPending, types.RollupStatus(batches[0].RollupStatus))
|
||||
assert.Equal(t, types.ProvingTaskUnassigned, types.ProvingStatus(batches[0].ProvingStatus))
|
||||
|
||||
chunkOrm := orm.NewChunk(db)
|
||||
dbChunks, err := chunkOrm.GetChunksInRange(context.Background(), 0, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, batches, 1)
|
||||
|
||||
@@ -186,7 +186,12 @@ func (p *ChunkProposer) updateChunkInfoInDB(chunk *types.Chunk) error {
|
||||
}
|
||||
|
||||
func (p *ChunkProposer) proposeChunk() (*types.Chunk, error) {
|
||||
blocks, err := p.l2BlockOrm.GetUnchunkedBlocks(p.ctx, maxNumBlockPerChunk)
|
||||
unchunkedBlockHeight, err := p.chunkOrm.GetUnchunkedBlockHeight(p.ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
blocks, err := p.l2BlockOrm.GetL2WrappedBlocksGEHeight(p.ctx, unchunkedBlockHeight, maxNumBlockPerChunk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ func testChunkProposer(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
|
||||
chunkOrm := orm.NewChunk(db)
|
||||
chunks, err := chunkOrm.GetUnbatchedChunks(context.Background(), 0)
|
||||
chunks, err := chunkOrm.GetChunksGEIndex(context.Background(), 0, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, chunks, 1)
|
||||
assert.Equal(t, expectedHash.Hex(), chunks[0].Hash)
|
||||
@@ -62,7 +62,7 @@ func testChunkProposerRowConsumption(t *testing.T) {
|
||||
cp.TryProposeChunk()
|
||||
|
||||
chunkOrm := orm.NewChunk(db)
|
||||
chunks, err := chunkOrm.GetUnbatchedChunks(context.Background(), 0)
|
||||
chunks, err := chunkOrm.GetChunksGEIndex(context.Background(), 0, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, chunks, 0)
|
||||
}
|
||||
|
||||
@@ -142,6 +142,21 @@ func (o *Batch) GetLatestBatch(ctx context.Context) (*Batch, error) {
|
||||
return &latestBatch, nil
|
||||
}
|
||||
|
||||
// GetFirstUnbatchedChunkIndex retrieves the first unbatched chunk index.
|
||||
func (o *Batch) GetFirstUnbatchedChunkIndex(ctx context.Context) (uint64, error) {
|
||||
// Get the latest batch
|
||||
latestBatch, err := o.GetLatestBatch(ctx)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("Chunk.GetChunkedBlockHeight error: %w", err)
|
||||
}
|
||||
// if parentBatch==nil then err==gorm.ErrRecordNotFound,
|
||||
// which means there is not batched chunk yet, thus returns 0
|
||||
if latestBatch == nil {
|
||||
return 0, nil
|
||||
}
|
||||
return latestBatch.EndChunkIndex + 1, nil
|
||||
}
|
||||
|
||||
// GetRollupStatusByHashList retrieves the rollup statuses for a list of batch hashes.
|
||||
func (o *Batch) GetRollupStatusByHashList(ctx context.Context, hashes []string) ([]types.RollupStatus, error) {
|
||||
if len(hashes) == 0 {
|
||||
|
||||
@@ -87,24 +87,6 @@ func (o *Chunk) GetChunksInRange(ctx context.Context, startIndex uint64, endInde
|
||||
return chunks, nil
|
||||
}
|
||||
|
||||
// GetUnbatchedChunks retrieves unbatched chunks from the database.
|
||||
func (o *Chunk) GetUnbatchedChunks(ctx context.Context, limit int) ([]*Chunk, error) {
|
||||
db := o.db.WithContext(ctx)
|
||||
db = db.Model(&Chunk{})
|
||||
db = db.Where("batch_hash IS NULL")
|
||||
db = db.Order("index asc")
|
||||
|
||||
if limit > 0 {
|
||||
db = db.Limit(limit)
|
||||
}
|
||||
|
||||
var chunks []*Chunk
|
||||
if err := db.Find(&chunks).Error; err != nil {
|
||||
return nil, fmt.Errorf("Chunk.GetUnbatchedChunks error: %w", err)
|
||||
}
|
||||
return chunks, nil
|
||||
}
|
||||
|
||||
// GetLatestChunk retrieves the latest chunk from the database.
|
||||
func (o *Chunk) GetLatestChunk(ctx context.Context) (*Chunk, error) {
|
||||
db := o.db.WithContext(ctx)
|
||||
@@ -118,6 +100,40 @@ func (o *Chunk) GetLatestChunk(ctx context.Context) (*Chunk, error) {
|
||||
return &latestChunk, nil
|
||||
}
|
||||
|
||||
// GetUnchunkedBlockHeight retrieves the first unchunked block number.
|
||||
func (o *Chunk) GetUnchunkedBlockHeight(ctx context.Context) (uint64, error) {
|
||||
// Get the latest chunk
|
||||
latestChunk, err := o.GetLatestChunk(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
// if there is no chunk, return block number 1,
|
||||
// because no need to chunk genesis block number
|
||||
return 1, nil
|
||||
}
|
||||
return 0, fmt.Errorf("Chunk.GetChunkedBlockHeight error: %w", err)
|
||||
}
|
||||
return latestChunk.EndBlockNumber + 1, nil
|
||||
}
|
||||
|
||||
// GetChunksGEIndex retrieves chunks that have a chunk index greater than the or equal to the given index.
|
||||
// The returned chunks are sorted in ascending order by their index.
|
||||
func (o *Chunk) GetChunksGEIndex(ctx context.Context, index uint64, limit int) ([]*Chunk, error) {
|
||||
db := o.db.WithContext(ctx)
|
||||
db = db.Model(&Chunk{})
|
||||
db = db.Where("index >= ?", index)
|
||||
db = db.Order("index ASC")
|
||||
|
||||
if limit > 0 {
|
||||
db = db.Limit(limit)
|
||||
}
|
||||
|
||||
var chunks []*Chunk
|
||||
if err := db.Find(&chunks).Error; err != nil {
|
||||
return nil, fmt.Errorf("Chunk.GetChunksGEIndex error: %w", err)
|
||||
}
|
||||
return chunks, nil
|
||||
}
|
||||
|
||||
// InsertChunk inserts a new chunk into the database.
|
||||
func (o *Chunk) InsertChunk(ctx context.Context, chunk *types.Chunk, dbTX ...*gorm.DB) (*Chunk, error) {
|
||||
if chunk == nil || len(chunk.Blocks) == 0 {
|
||||
|
||||
@@ -64,13 +64,14 @@ func (o *L2Block) GetL2BlocksLatestHeight(ctx context.Context) (uint64, error) {
|
||||
return maxNumber, nil
|
||||
}
|
||||
|
||||
// GetUnchunkedBlocks get the l2 blocks that have not been put into a chunk.
|
||||
// GetL2WrappedBlocksGEHeight retrieves L2 blocks that have a block number greater than or equal to the given height.
|
||||
// The blocks are converted into WrappedBlock format for output.
|
||||
// The returned blocks are sorted in ascending order by their block number.
|
||||
func (o *L2Block) GetUnchunkedBlocks(ctx context.Context, limit int) ([]*types.WrappedBlock, error) {
|
||||
func (o *L2Block) GetL2WrappedBlocksGEHeight(ctx context.Context, height uint64, limit int) ([]*types.WrappedBlock, error) {
|
||||
db := o.db.WithContext(ctx)
|
||||
db = db.Model(&L2Block{})
|
||||
db = db.Select("header, transactions, withdraw_root, row_consumption")
|
||||
db = db.Where("chunk_hash IS NULL")
|
||||
db = db.Where("number >= ?", height)
|
||||
db = db.Order("number ASC")
|
||||
|
||||
if limit > 0 {
|
||||
@@ -79,7 +80,7 @@ func (o *L2Block) GetUnchunkedBlocks(ctx context.Context, limit int) ([]*types.W
|
||||
|
||||
var l2Blocks []L2Block
|
||||
if err := db.Find(&l2Blocks).Error; err != nil {
|
||||
return nil, fmt.Errorf("L2Block.GetUnchunkedBlocks error: %w", err)
|
||||
return nil, fmt.Errorf("L2Block.GetL2WrappedBlocksGEHeight error: %w", err)
|
||||
}
|
||||
|
||||
var wrappedBlocks []*types.WrappedBlock
|
||||
@@ -87,18 +88,18 @@ func (o *L2Block) GetUnchunkedBlocks(ctx context.Context, limit int) ([]*types.W
|
||||
var wrappedBlock types.WrappedBlock
|
||||
|
||||
if err := json.Unmarshal([]byte(v.Transactions), &wrappedBlock.Transactions); err != nil {
|
||||
return nil, fmt.Errorf("L2Block.GetUnchunkedBlocks error: %w", err)
|
||||
return nil, fmt.Errorf("L2Block.GetL2WrappedBlocksGEHeight error: %w", err)
|
||||
}
|
||||
|
||||
wrappedBlock.Header = &gethTypes.Header{}
|
||||
if err := json.Unmarshal([]byte(v.Header), wrappedBlock.Header); err != nil {
|
||||
return nil, fmt.Errorf("L2Block.GetUnchunkedBlocks error: %w", err)
|
||||
return nil, fmt.Errorf("L2Block.GetL2WrappedBlocksGEHeight error: %w", err)
|
||||
}
|
||||
|
||||
wrappedBlock.WithdrawRoot = common.HexToHash(v.WithdrawRoot)
|
||||
|
||||
if err := json.Unmarshal([]byte(v.RowConsumption), &wrappedBlock.RowConsumption); err != nil {
|
||||
return nil, fmt.Errorf("L2Block.GetUnchunkedBlocks error: %w", err)
|
||||
return nil, fmt.Errorf("L2Block.GetL2WrappedBlocksGEHeight error: %w", err)
|
||||
}
|
||||
|
||||
wrappedBlocks = append(wrappedBlocks, &wrappedBlock)
|
||||
|
||||
@@ -101,25 +101,26 @@ func TestL2BlockOrm(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, uint64(3), height)
|
||||
|
||||
blocks, err := l2BlockOrm.GetUnchunkedBlocks(context.Background(), 0)
|
||||
blocks, err := l2BlockOrm.GetL2Blocks(context.Background(), map[string]interface{}{}, []string{}, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, blocks, 2)
|
||||
assert.Equal(t, wrappedBlock1, blocks[0])
|
||||
assert.Equal(t, wrappedBlock2, blocks[1])
|
||||
assert.Equal(t, "", blocks[0].ChunkHash)
|
||||
assert.Equal(t, "", blocks[1].ChunkHash)
|
||||
|
||||
blocks, err = l2BlockOrm.GetL2BlocksInRange(context.Background(), 2, 3)
|
||||
wrappedBlocks, err := l2BlockOrm.GetL2BlocksInRange(context.Background(), 2, 3)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, blocks, 2)
|
||||
assert.Equal(t, wrappedBlock1, blocks[0])
|
||||
assert.Equal(t, wrappedBlock2, blocks[1])
|
||||
assert.Equal(t, wrappedBlock1, wrappedBlocks[0])
|
||||
assert.Equal(t, wrappedBlock2, wrappedBlocks[1])
|
||||
|
||||
err = l2BlockOrm.UpdateChunkHashInRange(context.Background(), 2, 2, "test hash")
|
||||
assert.NoError(t, err)
|
||||
|
||||
blocks, err = l2BlockOrm.GetUnchunkedBlocks(context.Background(), 0)
|
||||
blocks, err = l2BlockOrm.GetL2Blocks(context.Background(), map[string]interface{}{}, []string{}, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, blocks, 1)
|
||||
assert.Equal(t, wrappedBlock2, blocks[0])
|
||||
assert.Len(t, blocks, 2)
|
||||
assert.Equal(t, "test hash", blocks[0].ChunkHash)
|
||||
assert.Equal(t, "", blocks[1].ChunkHash)
|
||||
}
|
||||
|
||||
func TestChunkOrm(t *testing.T) {
|
||||
@@ -135,11 +136,13 @@ func TestChunkOrm(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, dbChunk2.Hash, chunkHash2.Hex())
|
||||
|
||||
chunks, err := chunkOrm.GetUnbatchedChunks(context.Background(), 0)
|
||||
chunks, err := chunkOrm.GetChunksGEIndex(context.Background(), 0, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, chunks, 2)
|
||||
assert.Equal(t, chunkHash1.Hex(), chunks[0].Hash)
|
||||
assert.Equal(t, chunkHash2.Hex(), chunks[1].Hash)
|
||||
assert.Equal(t, "", chunks[0].BatchHash)
|
||||
assert.Equal(t, "", chunks[1].BatchHash)
|
||||
|
||||
err = chunkOrm.UpdateProvingStatus(context.Background(), chunkHash1.Hex(), types.ProvingTaskVerified)
|
||||
assert.NoError(t, err)
|
||||
@@ -156,9 +159,13 @@ func TestChunkOrm(t *testing.T) {
|
||||
|
||||
err = chunkOrm.UpdateBatchHashInRange(context.Background(), 0, 0, "test hash")
|
||||
assert.NoError(t, err)
|
||||
chunks, err = chunkOrm.GetUnbatchedChunks(context.Background(), 0)
|
||||
chunks, err = chunkOrm.GetChunksGEIndex(context.Background(), 0, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, chunks, 1)
|
||||
assert.Len(t, chunks, 2)
|
||||
assert.Equal(t, chunkHash1.Hex(), chunks[0].Hash)
|
||||
assert.Equal(t, chunkHash2.Hex(), chunks[1].Hash)
|
||||
assert.Equal(t, "test hash", chunks[0].BatchHash)
|
||||
assert.Equal(t, "", chunks[1].BatchHash)
|
||||
}
|
||||
|
||||
func TestBatchOrm(t *testing.T) {
|
||||
|
||||
@@ -66,8 +66,12 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {
|
||||
}, db, nil)
|
||||
cp.TryProposeChunk()
|
||||
|
||||
batchOrm := orm.NewBatch(db)
|
||||
unbatchedChunkIndex, err := batchOrm.GetFirstUnbatchedChunkIndex(context.Background())
|
||||
assert.NoError(t, err)
|
||||
|
||||
chunkOrm := orm.NewChunk(db)
|
||||
chunks, err := chunkOrm.GetUnbatchedChunks(context.Background(), 0)
|
||||
chunks, err := chunkOrm.GetChunksGEIndex(context.Background(), unbatchedChunkIndex, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, chunks, 1)
|
||||
|
||||
@@ -81,7 +85,6 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {
|
||||
|
||||
l2Relayer.ProcessPendingBatches()
|
||||
|
||||
batchOrm := orm.NewBatch(db)
|
||||
batch, err := batchOrm.GetLatestBatch(context.Background())
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, batch)
|
||||
|
||||
@@ -211,6 +211,7 @@ func (a *ProofMsg) PublicKey() (string, error) {
|
||||
|
||||
// TaskMsg is a wrapper type around db ProveTask type.
|
||||
type TaskMsg struct {
|
||||
UUID string `json:"uuid"`
|
||||
ID string `json:"id"`
|
||||
Type ProofType `json:"type,omitempty"`
|
||||
BatchTaskDetail *BatchTaskDetail `json:"batch_task_detail,omitempty"`
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
var tag = "v4.1.116"
|
||||
var tag = "v4.2.0"
|
||||
|
||||
var commit = func() string {
|
||||
if info, ok := debug.ReadBuildInfo(); ok {
|
||||
|
||||
@@ -47,7 +47,10 @@ require (
|
||||
google.golang.org/protobuf v1.31.0 // indirect
|
||||
)
|
||||
|
||||
require github.com/prometheus/client_golang v1.14.0
|
||||
require (
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/prometheus/client_golang v1.14.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
|
||||
@@ -73,6 +73,8 @@ github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/iden3/go-iden3-crypto v0.0.15 h1:4MJYlrot1l31Fzlo2sF56u7EVFeHHJkxGXXZCtESgK4=
|
||||
|
||||
@@ -163,16 +163,14 @@ func (c *Collector) check(assignedProverTasks []orm.ProverTask, timeout promethe
|
||||
"prover public key", assignedProverTask.ProverPublicKey, "prover name", assignedProverTask.ProverName, "task type", assignedProverTask.TaskType)
|
||||
err := c.db.Transaction(func(tx *gorm.DB) error {
|
||||
// update prover task proving status as ProverProofInvalid
|
||||
if err := c.proverTaskOrm.UpdateProverTaskProvingStatus(c.ctx, message.ProofType(assignedProverTask.TaskType),
|
||||
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverProofInvalid, tx); err != nil {
|
||||
log.Error("update prover task proving status failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
|
||||
if err := c.proverTaskOrm.UpdateProverTaskProvingStatus(c.ctx, assignedProverTask.UUID, types.ProverProofInvalid, tx); err != nil {
|
||||
log.Error("update prover task proving status failure", "uuid", assignedProverTask.UUID, "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// update prover task failure type
|
||||
if err := c.proverTaskOrm.UpdateProverTaskFailureType(c.ctx, message.ProofType(assignedProverTask.TaskType),
|
||||
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverTaskFailureTypeTimeout, tx); err != nil {
|
||||
log.Error("update prover task failure type failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
|
||||
if err := c.proverTaskOrm.UpdateProverTaskFailureType(c.ctx, assignedProverTask.UUID, types.ProverTaskFailureTypeTimeout, tx); err != nil {
|
||||
log.Error("update prover task failure type failure", "uuid", assignedProverTask.UUID, "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -128,13 +128,13 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
|
||||
}
|
||||
|
||||
// Store session info.
|
||||
if err = bp.proverTaskOrm.SetProverTask(ctx, &proverTask); err != nil {
|
||||
if err = bp.proverTaskOrm.InsertProverTask(ctx, &proverTask); err != nil {
|
||||
bp.recoverProvingStatus(ctx, batchTask)
|
||||
log.Error("db set session info fail", "task hash", batchTask.Hash, "prover name", proverName.(string), "prover pubKey", publicKey.(string), "err", err)
|
||||
log.Error("insert batch prover task info fail", "taskID", batchTask.Hash, "publicKey", publicKey, "err", err)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
|
||||
taskMsg, err := bp.formatProverTask(ctx, batchTask.Hash)
|
||||
taskMsg, err := bp.formatProverTask(ctx, &proverTask)
|
||||
if err != nil {
|
||||
bp.recoverProvingStatus(ctx, batchTask)
|
||||
log.Error("format prover task failure", "hash", batchTask.Hash, "err", err)
|
||||
@@ -146,11 +146,11 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
|
||||
return taskMsg, nil
|
||||
}
|
||||
|
||||
func (bp *BatchProverTask) formatProverTask(ctx context.Context, taskID string) (*coordinatorType.GetTaskSchema, error) {
|
||||
func (bp *BatchProverTask) formatProverTask(ctx context.Context, task *orm.ProverTask) (*coordinatorType.GetTaskSchema, error) {
|
||||
// get chunk from db
|
||||
chunks, err := bp.chunkOrm.GetChunksByBatchHash(ctx, taskID)
|
||||
chunks, err := bp.chunkOrm.GetChunksByBatchHash(ctx, task.TaskID)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to get chunk proofs for batch task id:%s err:%w ", taskID, err)
|
||||
err = fmt.Errorf("failed to get chunk proofs for batch task id:%s err:%w ", task.TaskID, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -159,7 +159,7 @@ func (bp *BatchProverTask) formatProverTask(ctx context.Context, taskID string)
|
||||
for _, chunk := range chunks {
|
||||
var proof message.ChunkProof
|
||||
if encodeErr := json.Unmarshal(chunk.Proof, &proof); encodeErr != nil {
|
||||
return nil, fmt.Errorf("Chunk.GetProofsByBatchHash unmarshal proof error: %w, batch hash: %v, chunk hash: %v", encodeErr, taskID, chunk.Hash)
|
||||
return nil, fmt.Errorf("Chunk.GetProofsByBatchHash unmarshal proof error: %w, batch hash: %v, chunk hash: %v", encodeErr, task.TaskID, chunk.Hash)
|
||||
}
|
||||
chunkProofs = append(chunkProofs, &proof)
|
||||
|
||||
@@ -181,11 +181,12 @@ func (bp *BatchProverTask) formatProverTask(ctx context.Context, taskID string)
|
||||
|
||||
chunkProofsBytes, err := json.Marshal(taskDetail)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal chunk proofs, taskID:%s err:%w", taskID, err)
|
||||
return nil, fmt.Errorf("failed to marshal chunk proofs, taskID:%s err:%w", task.TaskID, err)
|
||||
}
|
||||
|
||||
taskMsg := &coordinatorType.GetTaskSchema{
|
||||
TaskID: taskID,
|
||||
UUID: task.UUID.String(),
|
||||
TaskID: task.TaskID,
|
||||
TaskType: int(message.ProofTypeBatch),
|
||||
TaskData: string(chunkProofsBytes),
|
||||
}
|
||||
|
||||
@@ -132,13 +132,14 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
|
||||
// here why need use UTC time. see scroll/common/databased/db.go
|
||||
AssignedAt: utils.NowUTC(),
|
||||
}
|
||||
if err = cp.proverTaskOrm.SetProverTask(ctx, &proverTask); err != nil {
|
||||
|
||||
if err = cp.proverTaskOrm.InsertProverTask(ctx, &proverTask); err != nil {
|
||||
cp.recoverProvingStatus(ctx, chunkTask)
|
||||
log.Error("db set session info fail", "task hash", chunkTask.Hash, "prover name", proverName.(string), "prover pubKey", publicKey.(string), "err", err)
|
||||
log.Error("insert chunk prover task fail", "taskID", chunkTask.Hash, "publicKey", publicKey, "err", err)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
|
||||
taskMsg, err := cp.formatProverTask(ctx, chunkTask.Hash)
|
||||
taskMsg, err := cp.formatProverTask(ctx, &proverTask)
|
||||
if err != nil {
|
||||
cp.recoverProvingStatus(ctx, chunkTask)
|
||||
log.Error("format prover task failure", "hash", chunkTask.Hash, "err", err)
|
||||
@@ -150,11 +151,11 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
|
||||
return taskMsg, nil
|
||||
}
|
||||
|
||||
func (cp *ChunkProverTask) formatProverTask(ctx context.Context, hash string) (*coordinatorType.GetTaskSchema, error) {
|
||||
func (cp *ChunkProverTask) formatProverTask(ctx context.Context, task *orm.ProverTask) (*coordinatorType.GetTaskSchema, error) {
|
||||
// Get block hashes.
|
||||
wrappedBlocks, wrappedErr := cp.blockOrm.GetL2BlocksByChunkHash(ctx, hash)
|
||||
wrappedBlocks, wrappedErr := cp.blockOrm.GetL2BlocksByChunkHash(ctx, task.TaskID)
|
||||
if wrappedErr != nil || len(wrappedBlocks) == 0 {
|
||||
return nil, fmt.Errorf("failed to fetch wrapped blocks, batch hash:%s err:%w", hash, wrappedErr)
|
||||
return nil, fmt.Errorf("failed to fetch wrapped blocks, chunk hash:%s err:%w", task.TaskID, wrappedErr)
|
||||
}
|
||||
|
||||
blockHashes := make([]common.Hash, len(wrappedBlocks))
|
||||
@@ -167,11 +168,12 @@ func (cp *ChunkProverTask) formatProverTask(ctx context.Context, hash string) (*
|
||||
}
|
||||
blockHashesBytes, err := json.Marshal(taskDetail)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal block hashes hash:%s, err:%w", hash, err)
|
||||
return nil, fmt.Errorf("failed to marshal block hashes hash:%s, err:%w", task.TaskID, err)
|
||||
}
|
||||
|
||||
proverTaskSchema := &coordinatorType.GetTaskSchema{
|
||||
TaskID: hash,
|
||||
UUID: task.UUID.String(),
|
||||
TaskID: task.TaskID,
|
||||
TaskType: int(message.ProofTypeChunk),
|
||||
TaskData: string(blockHashesBytes),
|
||||
}
|
||||
|
||||
@@ -57,10 +57,6 @@ func (b *BaseProverTask) checkAttemptsExceeded(hash string, taskType message.Pro
|
||||
log.Error("failed to update batch proving_status as failed", "msg.ID", hash, "error", err)
|
||||
}
|
||||
}
|
||||
// update the prover task status to let timeout checker don't check it.
|
||||
if err := b.proverTaskOrm.UpdateAllProverTaskProvingStatusOfTaskID(b.ctx, message.ProofType(proverTasks[0].TaskType), hash, types.ProverProofInvalid, tx); err != nil {
|
||||
log.Error("failed to update prover task proving_status as failed", "msg.ID", hash, "error", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if transErr == nil {
|
||||
|
||||
@@ -135,10 +135,21 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
|
||||
return fmt.Errorf("get ProverVersion from context failed")
|
||||
}
|
||||
|
||||
proverTask, err := m.proverTaskOrm.GetProverTaskByTaskIDAndProver(ctx, proofMsg.Type, proofMsg.ID, pk, pv)
|
||||
if proverTask == nil || err != nil {
|
||||
log.Error("get none prover task for the proof", "key", pk, "taskID", proofMsg.ID, "error", err)
|
||||
return ErrValidatorFailureProverTaskEmpty
|
||||
var proverTask *orm.ProverTask
|
||||
var err error
|
||||
if proofParameter.UUID != "" {
|
||||
proverTask, err = m.proverTaskOrm.GetProverTaskByUUIDAndPublicKey(ctx, proofParameter.UUID, pk)
|
||||
if proverTask == nil || err != nil {
|
||||
log.Error("get none prover task for the proof", "uuid", proofParameter.UUID, "key", pk, "taskID", proofMsg.ID, "error", err)
|
||||
return ErrValidatorFailureProverTaskEmpty
|
||||
}
|
||||
} else {
|
||||
// TODO When prover all have upgrade, need delete this logic
|
||||
proverTask, err = m.proverTaskOrm.GetAssignedProverTaskByTaskIDAndProver(ctx, proofMsg.Type, proofMsg.ID, pk, pv)
|
||||
if proverTask == nil || err != nil {
|
||||
log.Error("get none prover task for the proof", "key", pk, "taskID", proofMsg.ID, "error", err)
|
||||
return ErrValidatorFailureProverTaskEmpty
|
||||
}
|
||||
}
|
||||
|
||||
proofTime := time.Since(proverTask.CreatedAt)
|
||||
@@ -163,7 +174,7 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
|
||||
|
||||
if verifyErr != nil || !success {
|
||||
m.verifierFailureTotal.WithLabelValues(pv).Inc()
|
||||
m.proofRecover(ctx, proofMsg.ID, pk, proofMsg)
|
||||
m.proofRecover(ctx, proverTask, proofMsg)
|
||||
|
||||
log.Info("proof verified by coordinator failed", "proof id", proofMsg.ID, "prover name", proverTask.ProverName,
|
||||
"prover pk", pk, "prove type", proofMsg.Type, "proof time", proofTimeSec, "error", verifyErr)
|
||||
@@ -179,9 +190,9 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
|
||||
log.Info("proof verified and valid", "proof id", proofMsg.ID, "prover name", proverTask.ProverName,
|
||||
"prover pk", pk, "prove type", proofMsg.Type, "proof time", proofTimeSec)
|
||||
|
||||
if err := m.closeProofTask(ctx, proofMsg.ID, pk, proofMsg, proofTimeSec); err != nil {
|
||||
if err := m.closeProofTask(ctx, proverTask, proofMsg, proofTimeSec); err != nil {
|
||||
m.proofSubmitFailure.Inc()
|
||||
m.proofRecover(ctx, proofMsg.ID, pk, proofMsg)
|
||||
m.proofRecover(ctx, proverTask, proofMsg)
|
||||
return ErrCoordinatorInternalFailure
|
||||
}
|
||||
|
||||
@@ -239,14 +250,14 @@ func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.Prov
|
||||
// Verify if the proving task has already been assigned to another prover.
|
||||
// Upon receiving an error message, it's possible the proving status has been reset by another prover
|
||||
// and the task has been reassigned. In this case, the coordinator should avoid resetting the proving status.
|
||||
m.processProverErr(ctx, proofMsg.ID, pk, proofMsg.Type)
|
||||
m.processProverErr(ctx, proverTask)
|
||||
|
||||
m.validateFailureProverTaskStatusNotOk.Inc()
|
||||
|
||||
log.Info("proof generated by prover failed",
|
||||
"taskType", proofMsg.Type, "hash", proofMsg.ID, "proverName", proverTask.ProverName,
|
||||
"proverVersion", proverTask.ProverVersion, "proverPublicKey", pk, "failureType", proofParameter.FailureType,
|
||||
"failureMessage", "failureMessage", failureMsg)
|
||||
"failureMessage", failureMsg)
|
||||
return ErrValidatorFailureProofMsgStatusNotOk
|
||||
}
|
||||
|
||||
@@ -259,7 +270,7 @@ func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.Prov
|
||||
}
|
||||
|
||||
// store the proof to prover task
|
||||
if updateTaskProofErr := m.updateProverTaskProof(ctx, pk, proofMsg); updateTaskProofErr != nil {
|
||||
if updateTaskProofErr := m.updateProverTaskProof(ctx, proverTask, proofMsg); updateTaskProofErr != nil {
|
||||
log.Warn("update prover task proof failure", "hash", proofMsg.ID, "proverPublicKey", pk,
|
||||
"taskType", proverTask.TaskType, "proverName", proverTask.ProverName, "error", updateTaskProofErr)
|
||||
}
|
||||
@@ -274,28 +285,28 @@ func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.Prov
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *ProofReceiverLogic) proofRecover(ctx context.Context, hash string, pubKey string, proofMsg *message.ProofMsg) {
|
||||
log.Info("proof recover update proof status", "hash", hash, "proverPublicKey", pubKey,
|
||||
func (m *ProofReceiverLogic) proofRecover(ctx context.Context, proverTask *orm.ProverTask, proofMsg *message.ProofMsg) {
|
||||
log.Info("proof recover update proof status", "hash", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey,
|
||||
"taskType", proofMsg.Type.String(), "status", types.ProvingTaskUnassigned.String())
|
||||
|
||||
if err := m.updateProofStatus(ctx, hash, pubKey, proofMsg, types.ProvingTaskUnassigned, 0); err != nil {
|
||||
log.Error("failed to updated proof status ProvingTaskUnassigned", "hash", hash, "pubKey", pubKey, "error", err)
|
||||
if err := m.updateProofStatus(ctx, proverTask, proofMsg, types.ProvingTaskUnassigned, 0); err != nil {
|
||||
log.Error("failed to updated proof status ProvingTaskUnassigned", "hash", proverTask.TaskID, "pubKey", proverTask.ProverPublicKey, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *ProofReceiverLogic) closeProofTask(ctx context.Context, hash string, pubKey string, proofMsg *message.ProofMsg, proofTimeSec uint64) error {
|
||||
log.Info("proof close task update proof status", "hash", hash, "proverPublicKey", pubKey,
|
||||
func (m *ProofReceiverLogic) closeProofTask(ctx context.Context, proverTask *orm.ProverTask, proofMsg *message.ProofMsg, proofTimeSec uint64) error {
|
||||
log.Info("proof close task update proof status", "hash", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey,
|
||||
"taskType", proofMsg.Type.String(), "status", types.ProvingTaskVerified.String())
|
||||
|
||||
if err := m.updateProofStatus(ctx, hash, pubKey, proofMsg, types.ProvingTaskVerified, proofTimeSec); err != nil {
|
||||
log.Error("failed to updated proof status ProvingTaskVerified", "hash", hash, "proverPublicKey", pubKey, "error", err)
|
||||
if err := m.updateProofStatus(ctx, proverTask, proofMsg, types.ProvingTaskVerified, proofTimeSec); err != nil {
|
||||
log.Error("failed to updated proof status ProvingTaskVerified", "hash", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "error", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateProofStatus update the chunk/batch task and session info status
|
||||
func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string, proverPublicKey string, proofMsg *message.ProofMsg, status types.ProvingStatus, proofTimeSec uint64) error {
|
||||
func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, proverTask *orm.ProverTask, proofMsg *message.ProofMsg, status types.ProvingStatus, proofTimeSec uint64) error {
|
||||
var proverTaskStatus types.ProverProveStatus
|
||||
switch status {
|
||||
case types.ProvingTaskFailed, types.ProvingTaskUnassigned:
|
||||
@@ -305,13 +316,13 @@ func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string,
|
||||
}
|
||||
|
||||
err := m.db.Transaction(func(tx *gorm.DB) error {
|
||||
if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatus(ctx, proofMsg.Type, hash, proverPublicKey, proverTaskStatus, tx); updateErr != nil {
|
||||
if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatus(ctx, proverTask.UUID, proverTaskStatus, tx); updateErr != nil {
|
||||
return updateErr
|
||||
}
|
||||
|
||||
// if the block batch has proof verified, so the failed status not update block batch proving status
|
||||
if m.checkIsTaskSuccess(ctx, hash, proofMsg.Type) {
|
||||
log.Info("update proof status skip because this chunk / batch has been verified", "hash", hash, "public key", proverPublicKey)
|
||||
if m.checkIsTaskSuccess(ctx, proverTask.TaskID, proofMsg.Type) {
|
||||
log.Info("update proof status skip because this chunk / batch has been verified", "hash", proverTask.TaskID, "public key", proverTask.ProverPublicKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -324,20 +335,20 @@ func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string,
|
||||
storeProofErr = m.batchOrm.UpdateProofByHash(ctx, proofMsg.ID, proofMsg.BatchProof, proofTimeSec, tx)
|
||||
}
|
||||
if storeProofErr != nil {
|
||||
log.Error("failed to store chunk/batch proof into db", "hash", hash, "public key", proverPublicKey, "error", storeProofErr)
|
||||
log.Error("failed to store chunk/batch proof into db", "hash", proverTask.TaskID, "public key", proverTask.ProverPublicKey, "error", storeProofErr)
|
||||
return storeProofErr
|
||||
}
|
||||
}
|
||||
|
||||
switch proofMsg.Type {
|
||||
case message.ProofTypeChunk:
|
||||
if err := m.chunkOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil {
|
||||
log.Error("failed to update chunk proving_status as failed", "msg.ID", hash, "error", err)
|
||||
if err := m.chunkOrm.UpdateProvingStatus(ctx, proverTask.TaskID, status, tx); err != nil {
|
||||
log.Error("failed to update chunk proving_status as failed", "hash", proverTask.TaskID, "error", err)
|
||||
return err
|
||||
}
|
||||
case message.ProofTypeBatch:
|
||||
if err := m.batchOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil {
|
||||
log.Error("failed to update batch proving_status as failed", "msg.ID", hash, "error", err)
|
||||
if err := m.batchOrm.UpdateProvingStatus(ctx, proverTask.TaskID, status, tx); err != nil {
|
||||
log.Error("failed to update batch proving_status as failed", "hash", proverTask.TaskID, "error", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -349,7 +360,7 @@ func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string,
|
||||
}
|
||||
|
||||
if status == types.ProvingTaskVerified && proofMsg.Type == message.ProofTypeChunk {
|
||||
if checkReadyErr := m.checkAreAllChunkProofsReady(ctx, hash); checkReadyErr != nil {
|
||||
if checkReadyErr := m.checkAreAllChunkProofsReady(ctx, proverTask.TaskID); checkReadyErr != nil {
|
||||
log.Error("failed to check are all chunk proofs ready", "error", checkReadyErr)
|
||||
return checkReadyErr
|
||||
}
|
||||
@@ -378,14 +389,15 @@ func (m *ProofReceiverLogic) checkIsTaskSuccess(ctx context.Context, hash string
|
||||
return provingStatus == types.ProvingTaskVerified
|
||||
}
|
||||
|
||||
func (m *ProofReceiverLogic) processProverErr(ctx context.Context, taskID, pk string, taskType message.ProofType) {
|
||||
if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatus(ctx, taskType, taskID, pk, types.ProverProofInvalid); updateErr != nil {
|
||||
log.Error("update prover task proving status failure", "taskID", taskID, "proverPublicKey", pk, "taskType", taskType, "error", updateErr)
|
||||
func (m *ProofReceiverLogic) processProverErr(ctx context.Context, proverTask *orm.ProverTask) {
|
||||
if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatus(ctx, proverTask.UUID, types.ProverProofInvalid); updateErr != nil {
|
||||
log.Error("update prover task proving status failure", "uuid", proverTask.UUID, "taskID", proverTask.TaskID, "proverPublicKey",
|
||||
proverTask.ProverPublicKey, "taskType", message.ProofType(proverTask.TaskType).String(), "error", updateErr)
|
||||
}
|
||||
|
||||
proverTasks, err := m.proverTaskOrm.GetAssignedTaskOfOtherProvers(ctx, taskType, taskID, pk)
|
||||
proverTasks, err := m.proverTaskOrm.GetAssignedTaskOfOtherProvers(ctx, message.ProofType(proverTask.TaskType), proverTask.TaskID, proverTask.ProverPublicKey)
|
||||
if err != nil {
|
||||
log.Warn("checkIsAssignedToOtherProver failure", "taskID", taskID, "proverPublicKey", pk, "taskType", taskType, "error", err)
|
||||
log.Warn("checkIsAssignedToOtherProver failure", "taskID", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "taskType", message.ProofType(proverTask.TaskType).String(), "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -393,19 +405,19 @@ func (m *ProofReceiverLogic) processProverErr(ctx context.Context, taskID, pk st
|
||||
return
|
||||
}
|
||||
|
||||
switch taskType {
|
||||
switch message.ProofType(proverTask.TaskType) {
|
||||
case message.ProofTypeChunk:
|
||||
if err := m.chunkOrm.UpdateProvingStatusFromProverError(ctx, taskID, types.ProvingTaskUnassigned); err != nil {
|
||||
log.Error("failed to update chunk proving_status as failed", taskID, "proverPublicKey", pk, "taskType", taskType, "error", err)
|
||||
if err := m.chunkOrm.UpdateProvingStatusFromProverError(ctx, proverTask.TaskID, types.ProvingTaskUnassigned); err != nil {
|
||||
log.Error("failed to update chunk proving_status as failed", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "taskType", message.ProofType(proverTask.TaskType).String(), "error", err)
|
||||
}
|
||||
case message.ProofTypeBatch:
|
||||
if err := m.batchOrm.UpdateProvingStatusFromProverError(ctx, taskID, types.ProvingTaskUnassigned); err != nil {
|
||||
log.Error("failed to update batch proving_status as failed", taskID, "proverPublicKey", pk, "taskType", taskType, "error", err)
|
||||
if err := m.batchOrm.UpdateProvingStatusFromProverError(ctx, proverTask.TaskID, types.ProvingTaskUnassigned); err != nil {
|
||||
log.Error("failed to update batch proving_status as failed", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "taskType", message.ProofType(proverTask.TaskType).String(), "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *ProofReceiverLogic) updateProverTaskProof(ctx context.Context, pk string, proofMsg *message.ProofMsg) error {
|
||||
func (m *ProofReceiverLogic) updateProverTaskProof(ctx context.Context, proverTask *orm.ProverTask, proofMsg *message.ProofMsg) error {
|
||||
// store the proof to prover task
|
||||
var proofBytes []byte
|
||||
var marshalErr error
|
||||
@@ -419,5 +431,5 @@ func (m *ProofReceiverLogic) updateProverTaskProof(ctx context.Context, pk strin
|
||||
if len(proofBytes) == 0 || marshalErr != nil {
|
||||
return fmt.Errorf("updateProverTaskProof marshal proof error:%w", marshalErr)
|
||||
}
|
||||
return m.proverTaskOrm.UpdateProverTaskProof(ctx, proofMsg.Type, proofMsg.ID, pk, proofBytes)
|
||||
return m.proverTaskOrm.UpdateProverTaskProof(ctx, proverTask.UUID, proofBytes)
|
||||
}
|
||||
|
||||
@@ -306,15 +306,8 @@ func (o *Chunk) UpdateProvingStatus(ctx context.Context, hash string, status typ
|
||||
func (o *Chunk) UpdateProvingStatusFromProverError(ctx context.Context, hash string, status types.ProvingStatus) error {
|
||||
updateFields := make(map[string]interface{})
|
||||
updateFields["proving_status"] = int(status)
|
||||
updateFields["prover_assigned_at"] = nil
|
||||
|
||||
switch status {
|
||||
case types.ProvingTaskAssigned:
|
||||
updateFields["prover_assigned_at"] = time.Now()
|
||||
case types.ProvingTaskUnassigned:
|
||||
updateFields["prover_assigned_at"] = nil
|
||||
case types.ProvingTaskVerified:
|
||||
updateFields["proved_at"] = time.Now()
|
||||
}
|
||||
db := o.db.WithContext(ctx)
|
||||
db = db.Model(&Chunk{})
|
||||
db = db.Where("hash", hash).Where("proving_status", types.ProvingTaskAssigned)
|
||||
|
||||
@@ -77,12 +77,13 @@ func TestProverTaskOrm(t *testing.T) {
|
||||
AssignedAt: utils.NowUTC(),
|
||||
}
|
||||
|
||||
err = proverTaskOrm.SetProverTask(context.Background(), &proverTask)
|
||||
err = proverTaskOrm.InsertProverTask(context.Background(), &proverTask)
|
||||
assert.NoError(t, err)
|
||||
proverTasks, err := proverTaskOrm.GetProverTasksByHashes(context.Background(), message.ProofTypeChunk, []string{"test-hash"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, len(proverTasks))
|
||||
assert.Equal(t, proverTask.ProverName, proverTasks[0].ProverName)
|
||||
assert.NotEqual(t, proverTask.UUID.String(), "00000000-0000-0000-0000-000000000000")
|
||||
|
||||
// test decimal reward, get reward
|
||||
resultReward := proverTasks[0].Reward.BigInt()
|
||||
@@ -91,12 +92,8 @@ func TestProverTaskOrm(t *testing.T) {
|
||||
|
||||
proverTask.ProvingStatus = int16(types.ProverProofValid)
|
||||
proverTask.AssignedAt = utils.NowUTC()
|
||||
err = proverTaskOrm.SetProverTask(context.Background(), &proverTask)
|
||||
assert.NoError(t, err)
|
||||
proverTasks, err = proverTaskOrm.GetProverTasksByHashes(context.Background(), message.ProofTypeChunk, []string{"test-hash"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, len(proverTasks))
|
||||
assert.Equal(t, proverTask.ProvingStatus, proverTasks[0].ProvingStatus)
|
||||
err = proverTaskOrm.InsertProverTask(context.Background(), &proverTask)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestProverTaskOrmUint256(t *testing.T) {
|
||||
@@ -117,8 +114,9 @@ func TestProverTaskOrmUint256(t *testing.T) {
|
||||
AssignedAt: utils.NowUTC(),
|
||||
}
|
||||
|
||||
err = proverTaskOrm.SetProverTask(context.Background(), &proverTask)
|
||||
err = proverTaskOrm.InsertProverTask(context.Background(), &proverTask)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, proverTask.UUID.String(), "00000000-0000-0000-0000-000000000000")
|
||||
proverTasksUint256, err := proverTaskOrm.GetProverTasksByHashes(context.Background(), message.ProofTypeChunk, []string{"test-hash"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, len(proverTasksUint256))
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/shopspring/decimal"
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
@@ -18,7 +19,8 @@ import (
|
||||
type ProverTask struct {
|
||||
db *gorm.DB `gorm:"column:-"`
|
||||
|
||||
ID int64 `json:"id" gorm:"column:id"`
|
||||
ID int64 `json:"id" gorm:"column:id"`
|
||||
UUID uuid.UUID `json:"uuid" gorm:"column:uuid;type:uuid;default:gen_random_uuid()"`
|
||||
|
||||
// prover
|
||||
ProverPublicKey string `json:"prover_public_key" gorm:"column:prover_public_key"`
|
||||
@@ -114,14 +116,16 @@ func (o *ProverTask) GetProverTasksByHashes(ctx context.Context, taskType messag
|
||||
return proverTasks, nil
|
||||
}
|
||||
|
||||
// GetProverTaskByTaskIDAndProver get prover task taskID and public key
|
||||
func (o *ProverTask) GetProverTaskByTaskIDAndProver(ctx context.Context, taskType message.ProofType, taskID, proverPublicKey, proverVersion string) (*ProverTask, error) {
|
||||
// GetAssignedProverTaskByTaskIDAndProver get prover task taskID and public key
|
||||
// TODO: when prover all upgrade need DEPRECATED this function
|
||||
func (o *ProverTask) GetAssignedProverTaskByTaskIDAndProver(ctx context.Context, taskType message.ProofType, taskID, proverPublicKey, proverVersion string) (*ProverTask, error) {
|
||||
db := o.db.WithContext(ctx)
|
||||
db = db.Model(&ProverTask{})
|
||||
db = db.Where("task_type", int(taskType))
|
||||
db = db.Where("task_id", taskID)
|
||||
db = db.Where("prover_public_key", proverPublicKey)
|
||||
db = db.Where("prover_version", proverVersion)
|
||||
db = db.Where("proving_status", types.ProverAssigned)
|
||||
|
||||
var proverTask ProverTask
|
||||
err := db.First(&proverTask).Error
|
||||
@@ -131,6 +135,21 @@ func (o *ProverTask) GetProverTaskByTaskIDAndProver(ctx context.Context, taskTyp
|
||||
return &proverTask, nil
|
||||
}
|
||||
|
||||
// GetProverTaskByUUIDAndPublicKey get prover task taskID by uuid and public key
|
||||
func (o *ProverTask) GetProverTaskByUUIDAndPublicKey(ctx context.Context, uuid, publicKey string) (*ProverTask, error) {
|
||||
db := o.db.WithContext(ctx)
|
||||
db = db.Model(&ProverTask{})
|
||||
db = db.Where("uuid", uuid)
|
||||
db = db.Where("prover_public_key", publicKey)
|
||||
|
||||
var proverTask ProverTask
|
||||
err := db.First(&proverTask).Error
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ProverTask.GetProverTaskByUUID err:%w, uuid:%s publicKey:%s", err, uuid, publicKey)
|
||||
}
|
||||
return &proverTask, nil
|
||||
}
|
||||
|
||||
// GetAssignedTaskOfOtherProvers get the chunk/batch task assigned other provers
|
||||
func (o *ProverTask) GetAssignedTaskOfOtherProvers(ctx context.Context, taskType message.ProofType, taskID, proverPublicKey string) ([]ProverTask, error) {
|
||||
db := o.db.WithContext(ctx)
|
||||
@@ -199,96 +218,59 @@ func (o *ProverTask) TaskTimeoutMoreThanOnce(ctx context.Context, taskType messa
|
||||
return false
|
||||
}
|
||||
|
||||
// SetProverTask updates or inserts a ProverTask record.
|
||||
func (o *ProverTask) SetProverTask(ctx context.Context, proverTask *ProverTask, dbTX ...*gorm.DB) error {
|
||||
// InsertProverTask insert a prover Task record
|
||||
func (o *ProverTask) InsertProverTask(ctx context.Context, proverTask *ProverTask, dbTX ...*gorm.DB) error {
|
||||
db := o.db.WithContext(ctx)
|
||||
if len(dbTX) > 0 && dbTX[0] != nil {
|
||||
db = dbTX[0]
|
||||
}
|
||||
|
||||
db = db.Clauses(clause.Returning{})
|
||||
db = db.Model(&ProverTask{})
|
||||
db = db.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "task_type"}, {Name: "task_id"}, {Name: "prover_public_key"}, {Name: "prover_version"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{"proving_status", "failure_type", "assigned_at"}),
|
||||
})
|
||||
|
||||
if err := db.Create(&proverTask).Error; err != nil {
|
||||
return fmt.Errorf("ProverTask.SetProverTask error: %w, prover task: %v", err, proverTask)
|
||||
if err := db.Create(proverTask).Error; err != nil {
|
||||
return fmt.Errorf("ProverTask.InsertProverTask error: %w, prover task: %v", err, proverTask)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateProverTaskProof update the prover task's proof
|
||||
func (o *ProverTask) UpdateProverTaskProof(ctx context.Context, proofType message.ProofType, taskID string, pk string, proof []byte) error {
|
||||
func (o *ProverTask) UpdateProverTaskProof(ctx context.Context, uuid uuid.UUID, proof []byte) error {
|
||||
db := o.db
|
||||
db = db.WithContext(ctx)
|
||||
db = db.Model(&ProverTask{})
|
||||
db = db.Where("task_type = ? AND task_id = ? AND prover_public_key = ?", int(proofType), taskID, pk)
|
||||
|
||||
db = db.Where("uuid = ?", uuid)
|
||||
if err := db.Update("proof", proof).Error; err != nil {
|
||||
return fmt.Errorf("ProverTask.UpdateProverTaskProof error: %w, proof type: %v, taskID: %v, prover public key: %v", err, proofType.String(), taskID, pk)
|
||||
return fmt.Errorf("ProverTask.UpdateProverTaskProof error: %w, uuid: %v", err, uuid)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateProverTaskProvingStatus updates the proving_status of a specific ProverTask record.
|
||||
func (o *ProverTask) UpdateProverTaskProvingStatus(ctx context.Context, proofType message.ProofType, taskID string, pk string, status types.ProverProveStatus, dbTX ...*gorm.DB) error {
|
||||
func (o *ProverTask) UpdateProverTaskProvingStatus(ctx context.Context, uuid uuid.UUID, status types.ProverProveStatus, dbTX ...*gorm.DB) error {
|
||||
db := o.db
|
||||
if len(dbTX) > 0 && dbTX[0] != nil {
|
||||
db = dbTX[0]
|
||||
}
|
||||
db = db.WithContext(ctx)
|
||||
db = db.Model(&ProverTask{})
|
||||
db = db.Where("task_type = ? AND task_id = ? AND prover_public_key = ?", int(proofType), taskID, pk)
|
||||
db = db.Where("uuid = ?", uuid)
|
||||
|
||||
if err := db.Update("proving_status", status).Error; err != nil {
|
||||
return fmt.Errorf("ProverTask.UpdateProverTaskProvingStatus error: %w, proof type: %v, taskID: %v, prover public key: %v, status: %v", err, proofType.String(), taskID, pk, status.String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateAllProverTaskProvingStatusOfTaskID updates all the proving_status of a specific task id.
|
||||
func (o *ProverTask) UpdateAllProverTaskProvingStatusOfTaskID(ctx context.Context, proofType message.ProofType, taskID string, status types.ProverProveStatus, dbTX ...*gorm.DB) error {
|
||||
db := o.db
|
||||
if len(dbTX) > 0 && dbTX[0] != nil {
|
||||
db = dbTX[0]
|
||||
}
|
||||
db = db.WithContext(ctx)
|
||||
db = db.Model(&ProverTask{})
|
||||
db = db.Where("task_type = ? AND task_id = ?", int(proofType), taskID)
|
||||
|
||||
if err := db.Update("proving_status", status).Error; err != nil {
|
||||
return fmt.Errorf("ProverTask.UpdateAllProverTaskProvingStatusOfTaskID error: %w, proof type: %v, taskID: %v, status: %v", err, proofType.String(), taskID, status.String())
|
||||
return fmt.Errorf("ProverTask.UpdateProverTaskProvingStatus error: %w, uuid:%s, status: %v", err, uuid, status.String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateProverTaskFailureType update the prover task failure type
|
||||
func (o *ProverTask) UpdateProverTaskFailureType(ctx context.Context, proofType message.ProofType, taskID string, pk string, failureType types.ProverTaskFailureType, dbTX ...*gorm.DB) error {
|
||||
func (o *ProverTask) UpdateProverTaskFailureType(ctx context.Context, uuid uuid.UUID, failureType types.ProverTaskFailureType, dbTX ...*gorm.DB) error {
|
||||
db := o.db
|
||||
if len(dbTX) > 0 && dbTX[0] != nil {
|
||||
db = dbTX[0]
|
||||
}
|
||||
db = db.WithContext(ctx)
|
||||
db = db.Model(&ProverTask{})
|
||||
db = db.Where("task_id", taskID).Where("prover_public_key", pk).Where("task_type", int(proofType))
|
||||
db = db.Where("uuid", uuid)
|
||||
if err := db.Update("failure_type", int(failureType)).Error; err != nil {
|
||||
return fmt.Errorf("ProverTask.UpdateProverTaskFailureType error: %w, proof type: %v, taskID: %v, prover public key: %v, failure type: %v", err, proofType.String(), taskID, pk, failureType.String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateAllProverTaskFailureTypeOfTaskID update the prover task failure type
|
||||
func (o *ProverTask) UpdateAllProverTaskFailureTypeOfTaskID(ctx context.Context, proofType message.ProofType, taskID string, failureType types.ProverTaskFailureType, dbTX ...*gorm.DB) error {
|
||||
db := o.db
|
||||
if len(dbTX) > 0 && dbTX[0] != nil {
|
||||
db = dbTX[0]
|
||||
}
|
||||
db = db.WithContext(ctx)
|
||||
db = db.Model(&ProverTask{})
|
||||
db = db.Where("task_id", taskID).Where("task_type", int(proofType))
|
||||
if err := db.Update("failure_type", int(failureType)).Error; err != nil {
|
||||
return fmt.Errorf("ProverTask.UpdateAllProverTaskFailureTypeOfTaskID error: %w, proof type: %v, taskID: %v, failure type: %v", err, proofType.String(), taskID, failureType.String())
|
||||
return fmt.Errorf("ProverTask.UpdateProverTaskFailureType error: %w, uuid:%s, failure type: %v", err, uuid.String(), failureType.String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ type GetTaskParameter struct {
|
||||
|
||||
// GetTaskSchema the schema data return to prover for get prover task
|
||||
type GetTaskSchema struct {
|
||||
UUID string `json:"uuid"`
|
||||
TaskID string `json:"task_id"`
|
||||
TaskType int `json:"task_type"`
|
||||
TaskData string `json:"task_data"`
|
||||
|
||||
@@ -2,6 +2,8 @@ package types
|
||||
|
||||
// SubmitProofParameter the SubmitProof api request parameter
|
||||
type SubmitProofParameter struct {
|
||||
// TODO when prover have upgrade, need change this field to required
|
||||
UUID string `form:"uuid" json:"uuid"`
|
||||
TaskID string `form:"task_id" json:"task_id" binding:"required"`
|
||||
TaskType int `form:"task_type" json:"task_type" binding:"required"`
|
||||
Status int `form:"status" json:"status"`
|
||||
|
||||
@@ -63,7 +63,7 @@ func testResetDB(t *testing.T) {
|
||||
cur, err := Current(pgDB.DB)
|
||||
assert.NoError(t, err)
|
||||
// total number of tables.
|
||||
assert.Equal(t, 8, int(cur))
|
||||
assert.Equal(t, 9, int(cur))
|
||||
}
|
||||
|
||||
func testMigrate(t *testing.T) {
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
-- +goose Up
|
||||
-- +goose StatementBegin
|
||||
ALTER TABLE prover_task ADD COLUMN uuid uuid DEFAULT gen_random_uuid() NOT NULL UNIQUE;
|
||||
|
||||
create index if not exists idx_uuid on prover_task (uuid) where deleted_at IS NULL;
|
||||
|
||||
ALTER TABLE prover_task DROP CONSTRAINT uk_tasktype_taskid_publickey_version;
|
||||
|
||||
drop index if exists uk_tasktype_taskid_publickey_version;
|
||||
-- +goose StatementEnd
|
||||
|
||||
-- +goose Down
|
||||
-- +goose StatementBegin
|
||||
create unique index if not exists uk_tasktype_taskid_publickey_version
|
||||
on prover_task (task_type, task_id, prover_public_key, prover_version) where deleted_at IS NULL;
|
||||
-- +goose StatementEnd
|
||||
@@ -51,6 +51,7 @@ type GetTaskResponse struct {
|
||||
ErrCode int `json:"errcode"`
|
||||
ErrMsg string `json:"errmsg"`
|
||||
Data *struct {
|
||||
UUID string `json:"uuid"`
|
||||
TaskID string `json:"task_id"`
|
||||
TaskType int `json:"task_type"`
|
||||
TaskData string `json:"task_data"`
|
||||
@@ -59,6 +60,7 @@ type GetTaskResponse struct {
|
||||
|
||||
// SubmitProofRequest defines the request structure for the SubmitProof API.
|
||||
type SubmitProofRequest struct {
|
||||
UUID string `json:"uuid"`
|
||||
TaskID string `json:"task_id"`
|
||||
TaskType int `json:"task_type"`
|
||||
Status int `json:"status"`
|
||||
|
||||
@@ -4,6 +4,7 @@ go 1.19
|
||||
|
||||
require (
|
||||
github.com/go-resty/resty/v2 v2.7.0
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/scroll-tech/go-ethereum v1.10.14-0.20230812030736-25fe3ba69a28
|
||||
github.com/stretchr/testify v1.8.3
|
||||
github.com/urfave/cli/v2 v2.25.7
|
||||
@@ -19,7 +20,6 @@ require (
|
||||
github.com/go-ole/go-ole v1.2.6 // indirect
|
||||
github.com/go-stack/stack v1.8.1 // indirect
|
||||
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/gorilla/websocket v1.5.0 // indirect
|
||||
github.com/holiman/uint256 v1.2.3 // indirect
|
||||
github.com/huin/goupnp v1.0.3 // indirect
|
||||
|
||||
@@ -164,7 +164,7 @@ func (r *Prover) proveAndSubmit() error {
|
||||
log.Error("failed to prove task", "task_type", task.Task.Type, "task-id", task.Task.ID, "err", err)
|
||||
return r.submitErr(task, message.ProofFailureNoPanic, err)
|
||||
}
|
||||
return r.submitProof(proofMsg)
|
||||
return r.submitProof(proofMsg, task.Task.UUID)
|
||||
}
|
||||
|
||||
// if tried times >= 3, it's probably due to circuit proving panic
|
||||
@@ -203,6 +203,7 @@ func (r *Prover) fetchTaskFromCoordinator() (*store.ProvingTask, error) {
|
||||
|
||||
// create a new TaskMsg
|
||||
taskMsg := message.TaskMsg{
|
||||
UUID: resp.Data.UUID,
|
||||
ID: resp.Data.TaskID,
|
||||
Type: message.ProofType(resp.Data.TaskType),
|
||||
}
|
||||
@@ -295,9 +296,10 @@ func (r *Prover) proveBatch(task *store.ProvingTask) (*message.BatchProof, error
|
||||
return r.proverCore.ProveBatch(task.Task.ID, task.Task.BatchTaskDetail.ChunkInfos, task.Task.BatchTaskDetail.ChunkProofs)
|
||||
}
|
||||
|
||||
func (r *Prover) submitProof(msg *message.ProofDetail) error {
|
||||
func (r *Prover) submitProof(msg *message.ProofDetail, uuid string) error {
|
||||
// prepare the submit request
|
||||
req := &client.SubmitProofRequest{
|
||||
UUID: uuid,
|
||||
TaskID: msg.ID,
|
||||
TaskType: int(msg.Type),
|
||||
Status: int(msg.Status),
|
||||
@@ -344,6 +346,7 @@ func (r *Prover) submitProof(msg *message.ProofDetail) error {
|
||||
func (r *Prover) submitErr(task *store.ProvingTask, proofFailureType message.ProofFailureType, err error) error {
|
||||
// prepare the submit request
|
||||
req := &client.SubmitProofRequest{
|
||||
UUID: task.Task.UUID,
|
||||
TaskID: task.Task.ID,
|
||||
TaskType: int(task.Task.Type),
|
||||
Status: int(message.StatusProofError),
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"scroll-tech/common/types/message"
|
||||
@@ -23,9 +24,12 @@ func TestStack(t *testing.T) {
|
||||
defer s.Close()
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
taskUUID, uuidErr := uuid.NewRandom()
|
||||
assert.NoError(t, uuidErr)
|
||||
task := &ProvingTask{
|
||||
Task: &message.TaskMsg{
|
||||
ID: strconv.Itoa(i),
|
||||
UUID: taskUUID.String(),
|
||||
ID: strconv.Itoa(i),
|
||||
},
|
||||
Times: 0,
|
||||
}
|
||||
@@ -44,9 +48,12 @@ func TestStack(t *testing.T) {
|
||||
}
|
||||
|
||||
// test times
|
||||
taskUUID, uuidErr := uuid.NewRandom()
|
||||
assert.NoError(t, uuidErr)
|
||||
task := &ProvingTask{
|
||||
Task: &message.TaskMsg{
|
||||
ID: strconv.Itoa(1),
|
||||
UUID: taskUUID.String(),
|
||||
ID: strconv.Itoa(1),
|
||||
},
|
||||
Times: 0,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user