Compare commits

..

13 Commits

Author SHA1 Message Date
georgehao
06dd3f6100 feat: update 2023-08-17 17:46:00 +08:00
georgehao
aa6c216405 feat: fix bug 2023-08-17 17:08:50 +08:00
colinlyguo
308f8f0502 format 2023-08-17 15:34:34 +08:00
colin
e92e2d54ba Merge branch 'develop' into feat-coordinator-multi-provers-per-task- 2023-08-17 15:29:47 +08:00
colinlyguo
a7870fc19c add ProverTaskFailureTypeValidatedFailed failed type 2023-08-16 17:05:30 +08:00
colinlyguo
5dc77d98d9 tweak 2023-08-16 17:00:50 +08:00
colin
158655dadb Merge branch 'develop' into feat-coordinator-multi-provers-per-task- 2023-08-16 16:50:12 +08:00
colinlyguo
2350802af9 add index 2023-08-16 16:49:08 +08:00
colinlyguo
5e7dbbce71 chore: auto version bump [bot] 2023-08-16 08:31:10 +00:00
colinlyguo
49376db8e3 address comments 2023-08-16 16:30:38 +08:00
colin
360f5ef72b Merge branch 'develop' into feat-coordinator-multi-provers-per-task- 2023-08-16 15:43:31 +08:00
colinlyguo
a88d1428bc address comments 2023-08-16 15:41:34 +08:00
colinlyguo
5f6e50d883 feat(coordinator): supporting multiple provers per task 2023-08-16 13:51:03 +08:00
18 changed files with 331 additions and 340 deletions

View File

@@ -40,6 +40,8 @@ type Batch struct {
ProverAssignedAt *time.Time `json:"prover_assigned_at" gorm:"column:prover_assigned_at;default:NULL"`
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
TotalAttempts int16 `json:"total_attempts" gorm:"column:total_attempts;default:0"`
ActiveAttempts int16 `json:"active_attempts" gorm:"column:active_attempts;default:0"`
// rollup
RollupStatus int16 `json:"rollup_status" gorm:"column:rollup_status;default:1"`
@@ -267,6 +269,8 @@ func (o *Batch) InsertBatch(ctx context.Context, startChunkIndex, endChunkIndex
BatchHeader: batchHeader.Encode(),
ChunkProofsStatus: int16(types.ChunkProofsStatusPending),
ProvingStatus: int16(types.ProvingTaskUnassigned),
TotalAttempts: 0,
ActiveAttempts: 0,
RollupStatus: int16(types.RollupPending),
OracleStatus: int16(types.GasOraclePending),
}

View File

@@ -37,6 +37,8 @@ type Chunk struct {
ProverAssignedAt *time.Time `json:"prover_assigned_at" gorm:"column:prover_assigned_at;default:NULL"`
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
TotalAttempts int16 `json:"total_attempts" gorm:"column:total_attempts;default:0"`
ActiveAttempts int16 `json:"active_attempts" gorm:"column:active_attempts;default:0"`
// batch
BatchHash string `json:"batch_hash" gorm:"column:batch_hash;default:NULL"`
@@ -175,6 +177,8 @@ func (o *Chunk) InsertChunk(ctx context.Context, chunk *types.Chunk, dbTX ...*go
ParentChunkStateRoot: parentChunkStateRoot,
WithdrawRoot: chunk.Blocks[numBlocks-1].WithdrawRoot.Hex(),
ProvingStatus: int16(types.ProvingTaskUnassigned),
TotalAttempts: 0,
ActiveAttempts: 0,
}
db := o.db

View File

@@ -5,6 +5,7 @@ import (
"math/big"
"testing"
"github.com/scroll-tech/go-ethereum/accounts/abi/bind"
"github.com/scroll-tech/go-ethereum/common"
gethTypes "github.com/scroll-tech/go-ethereum/core/types"
"github.com/stretchr/testify/assert"
@@ -12,7 +13,6 @@ import (
"scroll-tech/common/database"
"scroll-tech/common/types"
"scroll-tech/common/types/message"
"scroll-tech/common/utils"
"scroll-tech/bridge/internal/config"
"scroll-tech/bridge/internal/controller/relayer"
@@ -91,22 +91,20 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {
assert.NotEmpty(t, batch.CommitTxHash)
assert.Equal(t, types.RollupCommitting, types.RollupStatus(batch.RollupStatus))
success := utils.TryTimes(30, func() bool {
var receipt *gethTypes.Receipt
receipt, err = l1Client.TransactionReceipt(context.Background(), common.HexToHash(batch.CommitTxHash))
return err == nil && receipt.Status == 1
})
assert.True(t, success)
assert.NoError(t, err)
commitTx, _, err := l1Client.TransactionByHash(context.Background(), common.HexToHash(batch.CommitTxHash))
assert.NoError(t, err)
commitTxReceipt, err := bind.WaitMined(context.Background(), l1Client, commitTx)
assert.NoError(t, err)
assert.Equal(t, len(commitTxReceipt.Logs), 1)
// fetch rollup events
success = utils.TryTimes(30, func() bool {
err = l1Watcher.FetchContractEvent()
assert.NoError(t, err)
var statuses []types.RollupStatus
statuses, err = batchOrm.GetRollupStatusByHashList(context.Background(), []string{batchHash})
return err == nil && len(statuses) == 1 && types.RollupCommitted == statuses[0]
})
assert.True(t, success)
err = l1Watcher.FetchContractEvent()
assert.NoError(t, err)
statuses, err := batchOrm.GetRollupStatusByHashList(context.Background(), []string{batchHash})
assert.NoError(t, err)
assert.Equal(t, 1, len(statuses))
assert.Equal(t, types.RollupCommitted, statuses[0])
// add dummy proof
proof := &message.BatchProof{
@@ -120,7 +118,7 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {
// process committed batch and check status
l2Relayer.ProcessCommittedBatches()
statuses, err := batchOrm.GetRollupStatusByHashList(context.Background(), []string{batchHash})
statuses, err = batchOrm.GetRollupStatusByHashList(context.Background(), []string{batchHash})
assert.NoError(t, err)
assert.Equal(t, 1, len(statuses))
assert.Equal(t, types.RollupFinalizing, statuses[0])
@@ -130,20 +128,17 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {
assert.NotNil(t, batch)
assert.NotEmpty(t, batch.FinalizeTxHash)
success = utils.TryTimes(30, func() bool {
var receipt *gethTypes.Receipt
receipt, err = l1Client.TransactionReceipt(context.Background(), common.HexToHash(batch.FinalizeTxHash))
return err == nil && receipt.Status == 1
})
assert.True(t, success)
finalizeTx, _, err := l1Client.TransactionByHash(context.Background(), common.HexToHash(batch.FinalizeTxHash))
assert.NoError(t, err)
finalizeTxReceipt, err := bind.WaitMined(context.Background(), l1Client, finalizeTx)
assert.NoError(t, err)
assert.Equal(t, len(finalizeTxReceipt.Logs), 1)
// fetch rollup events
success = utils.TryTimes(30, func() bool {
err = l1Watcher.FetchContractEvent()
assert.NoError(t, err)
var statuses []types.RollupStatus
statuses, err = batchOrm.GetRollupStatusByHashList(context.Background(), []string{batchHash})
return err == nil && len(statuses) == 1 && types.RollupFinalized == statuses[0]
})
assert.True(t, success)
err = l1Watcher.FetchContractEvent()
assert.NoError(t, err)
statuses, err = batchOrm.GetRollupStatusByHashList(context.Background(), []string{batchHash})
assert.NoError(t, err)
assert.Equal(t, 1, len(statuses))
assert.Equal(t, types.RollupFinalized, statuses[0])
}

View File

@@ -103,6 +103,10 @@ const (
ProverTaskFailureTypeUndefined ProverTaskFailureType = iota
// ProverTaskFailureTypeTimeout prover task failure of timeout
ProverTaskFailureTypeTimeout
// ProverTaskFailureTypeValidatedFailed prover task failure of validated failed by coordinator
ProverTaskFailureTypeValidatedFailed
// ProverTaskFailureTypeVerifiedFailed prover task failure of verified failed by coordinator
ProverTaskFailureTypeVerifiedFailed
)
func (r ProverTaskFailureType) String() string {
@@ -111,8 +115,12 @@ func (r ProverTaskFailureType) String() string {
return "prover task failure undefined"
case ProverTaskFailureTypeTimeout:
return "prover task failure timeout"
case ProverTaskFailureTypeValidatedFailed:
return "prover task failure validated failed"
case ProverTaskFailureTypeVerifiedFailed:
return "prover task failure verified failed"
default:
return "illegal prover task failure type"
return fmt.Sprintf("illegal prover task failure type (%d)", int32(r))
}
}

View File

@@ -6,7 +6,7 @@ import (
"strings"
)
var tag = "v4.1.61"
var tag = "v4.1.60"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -146,13 +146,7 @@ func (c *Collector) timeoutChunkProofTask() {
}
func (c *Collector) check(assignedProverTasks []orm.ProverTask, timeout prometheus.Counter) {
// here not update the block batch proving status failed, because the collector loop will check
// the attempt times. if reach the times, the collector will set the block batch proving status.
for _, assignedProverTask := range assignedProverTasks {
if c.proverTaskOrm.TaskTimeoutMoreThanOnce(c.ctx, assignedProverTask.TaskID) {
log.Warn("Task timeout more than once", "hash", assignedProverTask.TaskID)
}
timeout.Inc()
log.Warn("proof task have reach the timeout", "task id", assignedProverTask.TaskID,
"prover public key", assignedProverTask.ProverPublicKey, "prover name", assignedProverTask.ProverName, "task type", assignedProverTask.TaskType)
@@ -160,26 +154,55 @@ func (c *Collector) check(assignedProverTasks []orm.ProverTask, timeout promethe
// update prover task proving status as ProverProofInvalid
if err := c.proverTaskOrm.UpdateProverTaskProvingStatus(c.ctx, message.ProofType(assignedProverTask.TaskType),
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverProofInvalid, tx); err != nil {
log.Error("update prover task proving status failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
log.Error("update prover task proving status failure",
"hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey,
"prover proving status", types.ProverProofInvalid, "err", err)
return err
}
// update prover task failure type
// update prover task failure type as ProverTaskFailureTypeTimeout
if err := c.proverTaskOrm.UpdateProverTaskFailureType(c.ctx, message.ProofType(assignedProverTask.TaskType),
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverTaskFailureTypeTimeout, tx); err != nil {
log.Error("update prover task failure type failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
log.Error("update prover task failure type failure",
"hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey,
"prover failure type", types.ProverTaskFailureTypeTimeout, "err", err)
return err
}
// update the task to unassigned, let collector restart it
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeChunk {
if err := c.chunkOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskUnassigned, tx); err != nil {
log.Error("update chunk proving status to unassigned to restart it failure", "hash", assignedProverTask.TaskID, "err", err)
if err := c.chunkOrm.DecreaseActiveAttemptsByHash(c.ctx, assignedProverTask.TaskID, tx); err != nil {
log.Error("decrease active attempts of chunk failure", "hash", assignedProverTask.TaskID, "err", err)
return err
}
}
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeBatch {
if err := c.batchOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskUnassigned, tx); err != nil {
log.Error("update batch proving status to unassigned to restart it failure", "hash", assignedProverTask.TaskID, "err", err)
if err := c.batchOrm.DecreaseActiveAttemptsByHash(c.ctx, assignedProverTask.TaskID, tx); err != nil {
log.Error("decrease active attempts of batch failure", "hash", assignedProverTask.TaskID, "err", err)
return err
}
}
var failedAssignmentCount uint64
failedAssignmentCount, err := c.proverTaskOrm.GetFailedTaskAssignmentCount(c.ctx, assignedProverTask.TaskID, tx)
if err != nil {
log.Error("get failed task assignment count failure", "taskID", assignedProverTask.TaskID, "err", err)
return err
}
if failedAssignmentCount >= uint64(c.cfg.ProverManager.SessionAttempts) {
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeChunk {
if err := c.chunkOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskFailed, tx); err != nil {
log.Error("update chunk proving status failure", "hash", assignedProverTask.TaskID,
"status", types.ProvingTaskFailed, "err", err)
return err
}
}
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeBatch {
if err := c.batchOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskFailed, tx); err != nil {
log.Error("update batch proving status failure", "hash", assignedProverTask.TaskID,
"status", types.ProvingTaskFailed, "err", err)
return err
}
}
}
return nil

View File

@@ -24,7 +24,12 @@ import (
// BatchProverTask is prover task implement for batch proof
type BatchProverTask struct {
BaseProverTask
cfg *config.Config
db *gorm.DB
batchOrm *orm.Batch
chunkOrm *orm.Chunk
proverTaskOrm *orm.ProverTask
batchAttemptsExceedTotal prometheus.Counter
batchTaskGetTaskTotal prometheus.Counter
@@ -33,13 +38,11 @@ type BatchProverTask struct {
// NewBatchProverTask new a batch collector
func NewBatchProverTask(cfg *config.Config, db *gorm.DB, reg prometheus.Registerer) *BatchProverTask {
bp := &BatchProverTask{
BaseProverTask: BaseProverTask{
db: db,
cfg: cfg,
chunkOrm: orm.NewChunk(db),
batchOrm: orm.NewBatch(db),
proverTaskOrm: orm.NewProverTask(db),
},
db: db,
cfg: cfg,
chunkOrm: orm.NewChunk(db),
batchOrm: orm.NewBatch(db),
proverTaskOrm: orm.NewProverTask(db),
batchAttemptsExceedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "coordinator_batch_attempts_exceed_total",
Help: "Total number of batch attempts exceed.",
@@ -81,27 +84,17 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
return nil, fmt.Errorf("prover with publicKey %s is already assigned a task", publicKey)
}
batchTasks, err := bp.batchOrm.UpdateUnassignedBatchReturning(ctx, 1)
// load and send a batch task
batchTask, err := bp.batchOrm.UpdateBatchAttemptsReturning(ctx, bp.cfg.ProverManager.ProversPerSession, bp.cfg.ProverManager.SessionAttempts)
if err != nil {
return nil, fmt.Errorf("failed to get unassigned batch proving tasks, error:%w", err)
}
if len(batchTasks) == 0 {
if batchTask == nil {
return nil, nil
}
if len(batchTasks) != 1 {
return nil, fmt.Errorf("get unassigned batch proving task len not 1, batch tasks:%v", batchTasks)
}
batchTask := batchTasks[0]
log.Info("start batch proof generation session", "id", batchTask.Hash, "public key", publicKey, "prover name", proverName)
if !bp.checkAttemptsExceeded(batchTask.Hash, message.ProofTypeBatch) {
bp.batchAttemptsExceedTotal.Inc()
return nil, fmt.Errorf("the batch task id:%s check attempts have reach the maximum", batchTask.Hash)
}
proverTask := orm.ProverTask{
TaskID: batchTask.Hash,
ProverPublicKey: publicKey.(string),

View File

@@ -24,7 +24,12 @@ import (
// ChunkProverTask the chunk prover task
type ChunkProverTask struct {
BaseProverTask
cfg *config.Config
db *gorm.DB
chunkOrm *orm.Chunk
blockOrm *orm.L2Block
proverTaskOrm *orm.ProverTask
chunkAttemptsExceedTotal prometheus.Counter
chunkTaskGetTaskTotal prometheus.Counter
@@ -33,14 +38,11 @@ type ChunkProverTask struct {
// NewChunkProverTask new a chunk prover task
func NewChunkProverTask(cfg *config.Config, db *gorm.DB, reg prometheus.Registerer) *ChunkProverTask {
cp := &ChunkProverTask{
BaseProverTask: BaseProverTask{
db: db,
cfg: cfg,
chunkOrm: orm.NewChunk(db),
blockOrm: orm.NewL2Block(db),
proverTaskOrm: orm.NewProverTask(db),
},
db: db,
cfg: cfg,
chunkOrm: orm.NewChunk(db),
blockOrm: orm.NewL2Block(db),
proverTaskOrm: orm.NewProverTask(db),
chunkAttemptsExceedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "coordinator_chunk_attempts_exceed_total",
Help: "Total number of chunk attempts exceed.",
@@ -82,29 +84,18 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
return nil, fmt.Errorf("prover with publicKey %s is already assigned a task", publicKey)
}
// load and send chunk tasks
chunkTasks, err := cp.chunkOrm.UpdateUnassignedChunkReturning(ctx, getTaskParameter.ProverHeight, 1)
// load and send a chunk task
chunkTask, err := cp.chunkOrm.UpdateChunkAttemptsReturning(ctx, getTaskParameter.ProverHeight, cp.cfg.ProverManager.ProversPerSession, cp.cfg.ProverManager.SessionAttempts)
if err != nil {
return nil, fmt.Errorf("failed to get unassigned chunk proving tasks, error:%w", err)
}
if len(chunkTasks) == 0 {
if chunkTask == nil {
return nil, nil
}
if len(chunkTasks) != 1 {
return nil, fmt.Errorf("get unassigned chunk proving task len not 1, chunk tasks:%v", chunkTasks)
}
chunkTask := chunkTasks[0]
log.Info("start chunk generation session", "id", chunkTask.Hash, "public key", publicKey, "prover name", proverName)
if !cp.checkAttemptsExceeded(chunkTask.Hash, message.ProofTypeChunk) {
cp.chunkAttemptsExceedTotal.Inc()
return nil, fmt.Errorf("chunk proof hash id:%s check attempts have reach the maximum", chunkTask.Hash)
}
proverTask := orm.ProverTask{
TaskID: chunkTask.Hash,
ProverPublicKey: publicKey.(string),

View File

@@ -1,17 +1,8 @@
package provertask
import (
"context"
"github.com/gin-gonic/gin"
"github.com/scroll-tech/go-ethereum/log"
"gorm.io/gorm"
"scroll-tech/common/types"
"scroll-tech/common/types/message"
"scroll-tech/coordinator/internal/config"
"scroll-tech/coordinator/internal/orm"
coordinatorType "scroll-tech/coordinator/internal/types"
)
@@ -19,53 +10,3 @@ import (
type ProverTask interface {
Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error)
}
// BaseProverTask a base prover task which contain series functions
type BaseProverTask struct {
cfg *config.Config
ctx context.Context
db *gorm.DB
batchOrm *orm.Batch
chunkOrm *orm.Chunk
blockOrm *orm.L2Block
proverTaskOrm *orm.ProverTask
}
// checkAttempts use the count of prover task info to check the attempts
func (b *BaseProverTask) checkAttemptsExceeded(hash string, taskType message.ProofType) bool {
whereFields := make(map[string]interface{})
whereFields["task_id"] = hash
whereFields["task_type"] = int16(taskType)
proverTasks, err := b.proverTaskOrm.GetProverTasks(b.ctx, whereFields, nil, 0, 0)
if err != nil {
log.Error("get prover task error", "hash id", hash, "error", err)
return true
}
if len(proverTasks) >= int(b.cfg.ProverManager.SessionAttempts) {
log.Warn("proof generation prover task reach the max attempts", "hash", hash)
transErr := b.db.Transaction(func(tx *gorm.DB) error {
switch message.ProofType(proverTasks[0].TaskType) {
case message.ProofTypeChunk:
if err := b.chunkOrm.UpdateProvingStatus(b.ctx, hash, types.ProvingTaskFailed, tx); err != nil {
log.Error("failed to update chunk proving_status as failed", "msg.ID", hash, "error", err)
}
case message.ProofTypeBatch:
if err := b.batchOrm.UpdateProvingStatus(b.ctx, hash, types.ProvingTaskFailed, tx); err != nil {
log.Error("failed to update batch proving_status as failed", "msg.ID", hash, "error", err)
}
}
// update the prover task status to let timeout checker don't check it.
if err := b.proverTaskOrm.UpdateAllProverTaskProvingStatusOfTaskID(b.ctx, message.ProofType(proverTasks[0].TaskType), hash, types.ProverProofInvalid, tx); err != nil {
log.Error("failed to update prover task proving_status as failed", "msg.ID", hash, "error", err)
}
return nil
})
if transErr == nil {
return false
}
}
return true
}

View File

@@ -145,6 +145,7 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
"prover pk", pk, "prove type", proverTask.TaskType, "proof time", proofTimeSec)
if err = m.validator(ctx, proverTask, pk, proofMsg); err != nil {
m.proofFailure(ctx, proverTask, types.ProverTaskFailureTypeValidatedFailed)
return err
}
@@ -161,7 +162,8 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
if verifyErr != nil || !success {
m.verifierFailureTotal.WithLabelValues(proverVersion).Inc()
m.proofFailure(ctx, proofMsg.ID, pk, proofMsg)
m.proofFailure(ctx, proverTask, types.ProverTaskFailureTypeVerifiedFailed)
log.Info("proof verified by coordinator failed", "proof id", proofMsg.ID, "prover name", proverTask.ProverName,
"prover pk", pk, "prove type", proofMsg.Type, "proof time", proofTimeSec, "error", verifyErr)
@@ -186,25 +188,6 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
return nil
}
func (m *ProofReceiverLogic) checkAreAllChunkProofsReady(ctx context.Context, chunkHash string) error {
batchHash, err := m.chunkOrm.GetChunkBatchHash(ctx, chunkHash)
if err != nil {
return err
}
allReady, err := m.chunkOrm.CheckIfBatchChunkProofsAreReady(ctx, batchHash)
if err != nil {
return err
}
if allReady {
err := m.batchOrm.UpdateChunkProofsStatusByBatchHash(ctx, batchHash, types.ChunkProofsStatusReady)
if err != nil {
return err
}
}
return nil
}
func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.ProverTask, pk string, proofMsg *message.ProofMsg) (err error) {
defer func() {
if err != nil {
@@ -267,12 +250,40 @@ func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.Prov
return nil
}
func (m *ProofReceiverLogic) proofFailure(ctx context.Context, hash string, pubKey string, proofMsg *message.ProofMsg) {
log.Info("proof failure update proof status", "hash", hash, "public key", pubKey,
"proof type", proofMsg.Type.String(), "status", types.ProvingTaskFailed.String())
func (m *ProofReceiverLogic) proofFailure(ctx context.Context, proverTask *orm.ProverTask, failedType types.ProverTaskFailureType) {
err := m.db.Transaction(func(tx *gorm.DB) error {
// update prover task proving status as ProverProofInvalid
if err := m.proverTaskOrm.UpdateProverTaskProvingStatus(ctx, message.ProofType(proverTask.TaskType),
proverTask.TaskID, proverTask.ProverPublicKey, types.ProverProofInvalid); err != nil {
log.Error("update prover task proving status failure", "hash", proverTask.TaskID, "pubKey", proverTask.ProverPublicKey,
"prover proving status", types.ProverProofInvalid, "err", err)
return err
}
if err := m.updateProofStatus(ctx, hash, pubKey, proofMsg, types.ProvingTaskFailed, 0); err != nil {
log.Error("failed to updated proof status ProvingTaskFailed", "hash", hash, "pubKey", pubKey, "error", err)
// update prover task failure type as ProverTaskFailureTypeVerifiedFailed
if err := m.proverTaskOrm.UpdateProverTaskFailureType(ctx, message.ProofType(proverTask.TaskType),
proverTask.TaskID, proverTask.ProverPublicKey, failedType, tx); err != nil {
log.Error("update prover task failure type failure", "hash", proverTask.TaskID, "pubKey", proverTask.ProverPublicKey,
"prover failure type", failedType, "err", err)
return err
}
if message.ProofType(proverTask.TaskType) == message.ProofTypeChunk {
if err := m.chunkOrm.DecreaseActiveAttemptsByHash(ctx, proverTask.TaskID, tx); err != nil {
log.Error("decrease active attempts of chunk failure", "hash", proverTask.TaskID, "err", err)
return err
}
}
if message.ProofType(proverTask.TaskType) == message.ProofTypeBatch {
if err := m.batchOrm.DecreaseActiveAttemptsByHash(ctx, proverTask.TaskID, tx); err != nil {
log.Error("decrease active attempts of batch failure", "hash", proverTask.TaskID, "err", err)
return err
}
}
return nil
})
if err != nil {
log.Error("failed to update proof status as failed", "error", err)
}
}
@@ -334,30 +345,18 @@ func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string,
switch proofMsg.Type {
case message.ProofTypeChunk:
if err := m.chunkOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil {
log.Error("failed to update chunk proving_status as failed", "msg.ID", hash, "error", err)
log.Error("failed to update chunk proving_status", "msg.ID", hash, "error", err)
return err
}
case message.ProofTypeBatch:
if err := m.batchOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil {
log.Error("failed to update batch proving_status as failed", "msg.ID", hash, "error", err)
log.Error("failed to update batch proving_status", "msg.ID", hash, "error", err)
return err
}
}
return nil
})
if err != nil {
return err
}
if status == types.ProvingTaskVerified && proofMsg.Type == message.ProofTypeChunk {
if checkReadyErr := m.checkAreAllChunkProofsReady(ctx, hash); checkReadyErr != nil {
log.Error("failed to check are all chunk proofs ready", "error", checkReadyErr)
return checkReadyErr
}
}
return nil
return err
}
func (m *ProofReceiverLogic) checkIsTaskSuccess(ctx context.Context, hash string, proofType message.ProofType) bool {

View File

@@ -41,6 +41,8 @@ type Batch struct {
ProverAssignedAt *time.Time `json:"prover_assigned_at" gorm:"column:prover_assigned_at;default:NULL"`
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
TotalAttempts int16 `json:"total_attempts" gorm:"column:total_attempts;default:0"`
ActiveAttempts int16 `json:"active_attempts" gorm:"column:active_attempts;default:0"`
// rollup
RollupStatus int16 `json:"rollup_status" gorm:"column:rollup_status;default:1"`
@@ -191,6 +193,8 @@ func (o *Batch) InsertBatch(ctx context.Context, startChunkIndex, endChunkIndex
BatchHeader: batchHeader.Encode(),
ChunkProofsStatus: int16(types.ChunkProofsStatusPending),
ProvingStatus: int16(types.ProvingTaskUnassigned),
TotalAttempts: 0,
ActiveAttempts: 0,
RollupStatus: int16(types.RollupPending),
OracleStatus: int16(types.GasOraclePending),
}
@@ -209,19 +213,6 @@ func (o *Batch) InsertBatch(ctx context.Context, startChunkIndex, endChunkIndex
return &newBatch, nil
}
// UpdateChunkProofsStatusByBatchHash updates the status of chunk_proofs_status field for a given batch hash.
// The function will set the chunk_proofs_status to the status provided.
func (o *Batch) UpdateChunkProofsStatusByBatchHash(ctx context.Context, batchHash string, status types.ChunkProofsStatus) error {
db := o.db.WithContext(ctx)
db = db.Model(&Batch{})
db = db.Where("hash = ?", batchHash)
if err := db.Update("chunk_proofs_status", status).Error; err != nil {
return fmt.Errorf("Batch.UpdateChunkProofsStatusByBatchHash error: %w, batch hash: %v, status: %v", err, batchHash, status.String())
}
return nil
}
// UpdateProvingStatus updates the proving status of a batch.
func (o *Batch) UpdateProvingStatus(ctx context.Context, hash string, status types.ProvingStatus, dbTX ...*gorm.DB) error {
db := o.db
@@ -275,28 +266,52 @@ func (o *Batch) UpdateProofByHash(ctx context.Context, hash string, proof *messa
return nil
}
// UpdateUnassignedBatchReturning update the unassigned batch and return the update record
func (o *Batch) UpdateUnassignedBatchReturning(ctx context.Context, limit int) ([]*Batch, error) {
if limit < 0 {
return nil, errors.New("limit must not be smaller than zero")
// UpdateBatchAttemptsReturning atomically increments the attempts count for the earliest available batch that meets the conditions.
func (o *Batch) UpdateBatchAttemptsReturning(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
db := o.db.WithContext(ctx)
subQueryDB := db.Model(&Batch{})
subQueryDB = subQueryDB.Select("batch.index")
// Lock the selected row to ensure atomic updates
subQueryDB = subQueryDB.Clauses(clause.Locking{Strength: "UPDATE"})
subQueryDB = subQueryDB.Where("batch.total_attempts < ?", maxTotalAttempts)
subQueryDB = subQueryDB.Where("batch.active_attempts < ?", maxActiveAttempts)
subQueryDB = subQueryDB.Joins("JOIN chunk ON chunk.batch_hash = batch.hash AND chunk.proving_status = ?", types.ProvingTaskVerified)
subQueryDB = subQueryDB.Order("index ASC")
subQueryDB = subQueryDB.Limit(1)
// Perform the update and return the modified batch
var updatedBatch Batch
db = db.Model(&updatedBatch).Clauses(clause.Returning{})
db = db.Where("index = (?)", subQueryDB)
result := db.Updates(map[string]interface{}{
"total_attempts": gorm.Expr("total_attempts + ?", 1),
"active_attempts": gorm.Expr("active_attempts + ?", 1),
})
if result.Error != nil {
return nil, fmt.Errorf("failed to select and update batch, max active attempts: %v, max total attempts: %v, err: %w",
maxActiveAttempts, maxTotalAttempts, result.Error)
}
if limit == 0 {
if result.RowsAffected == 0 {
return nil, nil
}
db := o.db.WithContext(ctx)
subQueryDB := db.Model(&Batch{}).Select("index")
subQueryDB = subQueryDB.Where("proving_status = ? AND chunk_proofs_status = ?", types.ProvingTaskUnassigned, types.ChunkProofsStatusReady)
subQueryDB = subQueryDB.Order("index ASC")
subQueryDB = subQueryDB.Limit(limit)
var batches []*Batch
db = db.Model(&batches).Clauses(clause.Returning{})
db = db.Where("index = (?)", subQueryDB)
db = db.Where("proving_status = ?", types.ProvingTaskUnassigned)
if err := db.Update("proving_status", types.ProvingTaskAssigned).Error; err != nil {
return nil, fmt.Errorf("Batch.UpdateUnassignedBatchReturning error: %w", err)
}
return batches, nil
return &updatedBatch, nil
}
// DecreaseActiveAttemptsByHash decrements the active_attempts of a batch given its hash.
func (o *Batch) DecreaseActiveAttemptsByHash(ctx context.Context, batchHash string, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&Batch{})
db = db.Where("hash = ?", batchHash)
if err := db.UpdateColumn("active_attempts", gorm.Expr("active_attempts - ?", 1)).Error; err != nil {
return fmt.Errorf("Batch.DecreaseActiveAttemptsByHash error: %w, batch hash: %v", err, batchHash)
}
return nil
}

View File

@@ -40,6 +40,8 @@ type Chunk struct {
ProverAssignedAt *time.Time `json:"prover_assigned_at" gorm:"column:prover_assigned_at;default:NULL"`
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
TotalAttempts int16 `json:"total_attempts" gorm:"column:total_attempts;default:0"`
ActiveAttempts int16 `json:"active_attempts" gorm:"column:active_attempts;default:0"`
// batch
BatchHash string `json:"batch_hash" gorm:"column:batch_hash;default:NULL"`
@@ -168,19 +170,6 @@ func (o *Chunk) GetAssignedChunks(ctx context.Context) ([]*Chunk, error) {
return chunks, nil
}
// CheckIfBatchChunkProofsAreReady checks if all proofs for all chunks of a given batchHash are collected.
func (o *Chunk) CheckIfBatchChunkProofsAreReady(ctx context.Context, batchHash string) (bool, error) {
db := o.db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("batch_hash = ? AND proving_status != ?", batchHash, types.ProvingTaskVerified)
var count int64
if err := db.Count(&count).Error; err != nil {
return false, fmt.Errorf("Chunk.CheckIfBatchChunkProofsAreReady error: %w, batch hash: %v", err, batchHash)
}
return count == 0, nil
}
// GetChunkBatchHash retrieves the batchHash of a given chunk.
func (o *Chunk) GetChunkBatchHash(ctx context.Context, chunkHash string) (string, error) {
db := o.db.WithContext(ctx)
@@ -259,6 +248,8 @@ func (o *Chunk) InsertChunk(ctx context.Context, chunk *types.Chunk, dbTX ...*go
ParentChunkStateRoot: parentChunkStateRoot,
WithdrawRoot: chunk.Blocks[numBlocks-1].WithdrawRoot.Hex(),
ProvingStatus: int16(types.ProvingTaskUnassigned),
TotalAttempts: 0,
ActiveAttempts: 0,
}
db := o.db
@@ -341,32 +332,52 @@ func (o *Chunk) UpdateBatchHashInRange(ctx context.Context, startIndex uint64, e
return nil
}
// UpdateUnassignedChunkReturning update the unassigned batch which end_block_number <= height and return the update record
func (o *Chunk) UpdateUnassignedChunkReturning(ctx context.Context, height, limit int) ([]*Chunk, error) {
if height <= 0 {
return nil, errors.New("Chunk.UpdateUnassignedBatchReturning error: height must be larger than zero")
// UpdateChunkAttemptsReturning atomically increments the attempts count for the earliest available chunk that meets the conditions.
func (o *Chunk) UpdateChunkAttemptsReturning(ctx context.Context, proverBlockHeight int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) {
db := o.db.WithContext(ctx)
subQueryDB := db.Model(&Chunk{})
subQueryDB = subQueryDB.Select("index")
// Lock the selected row to ensure atomic updates
subQueryDB = subQueryDB.Clauses(clause.Locking{Strength: "UPDATE"})
subQueryDB = subQueryDB.Where("total_attempts < ?", maxTotalAttempts)
subQueryDB = subQueryDB.Where("active_attempts < ?", maxActiveAttempts)
subQueryDB = subQueryDB.Where("end_block_number <= ?", proverBlockHeight)
subQueryDB = subQueryDB.Order("index ASC")
subQueryDB = subQueryDB.Limit(1)
// Perform the update and return the modified chunk
var updatedChunk Chunk
db = db.Model(&updatedChunk).Clauses(clause.Returning{})
db = db.Where("index = (?)", subQueryDB)
result := db.Updates(map[string]interface{}{
"total_attempts": gorm.Expr("total_attempts + ?", 1),
"active_attempts": gorm.Expr("active_attempts + ?", 1),
})
if result.Error != nil {
return nil, fmt.Errorf("failed to select and update batch, max active attempts: %v, max total attempts: %v, err: %w",
maxActiveAttempts, maxTotalAttempts, result.Error)
}
if limit < 0 {
return nil, errors.New("Chunk.UpdateUnassignedBatchReturning error: limit must not be smaller than zero")
}
if limit == 0 {
if result.RowsAffected == 0 {
return nil, nil
}
db := o.db.WithContext(ctx)
subQueryDB := db.Model(&Chunk{}).Select("index")
subQueryDB = subQueryDB.Where("proving_status = ?", types.ProvingTaskUnassigned)
subQueryDB = subQueryDB.Where("end_block_number <= ?", height)
subQueryDB = subQueryDB.Order("index ASC")
subQueryDB = subQueryDB.Limit(limit)
var chunks []*Chunk
db = db.Model(&chunks).Clauses(clause.Returning{})
db = db.Where("index = (?)", subQueryDB)
db = db.Where("proving_status = ?", types.ProvingTaskUnassigned)
if err := db.Update("proving_status", types.ProvingTaskAssigned).Error; err != nil {
return nil, fmt.Errorf("Chunk.UpdateUnassignedBatchReturning error: %w", err)
}
return chunks, nil
return &updatedChunk, nil
}
// DecreaseActiveAttemptsByHash decrements the active_attempts of a chunk given its hash.
func (o *Chunk) DecreaseActiveAttemptsByHash(ctx context.Context, chunkHash string, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("hash = ?", chunkHash)
if err := db.UpdateColumn("active_attempts", gorm.Expr("active_attempts - ?", 1)).Error; err != nil {
return fmt.Errorf("Chunk.DecreaseActiveAttemptsByHash error: %w, chunk hash: %v", err, chunkHash)
}
return nil
}

View File

@@ -94,6 +94,24 @@ func (o *ProverTask) GetProverTasks(ctx context.Context, fields map[string]inter
return proverTasks, nil
}
// GetFailedTaskAssignmentCount returns the number of times a task with the specified TaskID has been assigned and failed.
func (o *ProverTask) GetFailedTaskAssignmentCount(ctx context.Context, taskID string, dbTX ...*gorm.DB) (uint64, error) {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("task_id = ?", taskID)
db = db.Where("proving_status = ?", types.ProverProofInvalid)
var count int64
if err := db.Count(&count).Error; err != nil {
return 0, fmt.Errorf("ProverTask.GetTaskAssignmentCount failed, taskID: %v, err: %w", taskID, err)
}
return uint64(count), nil
}
// GetProverTasksByHashes retrieves the ProverTask records associated with the specified hashes.
// The returned prover task objects are sorted in ascending order by their ids.
func (o *ProverTask) GetProverTasksByHashes(ctx context.Context, hashes []string) ([]*ProverTask, error) {
@@ -143,6 +161,20 @@ func (o *ProverTask) GetProvingStatusByTaskID(ctx context.Context, taskID string
return types.ProverProveStatus(proverTask.ProvingStatus), nil
}
// GetFailureTypeByTaskID retrieves the failure type of a prover task
func (o *ProverTask) GetFailureTypeByTaskID(ctx context.Context, taskID string) (types.ProverTaskFailureType, error) {
db := o.db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Select("failure_type")
db = db.Where("task_id = ?", taskID)
var proverTask ProverTask
if err := db.Find(&proverTask).Error; err != nil {
return types.ProverTaskFailureTypeUndefined, fmt.Errorf("ProverTask.GetFailureTypeByTaskID error: %w, taskID: %v", err, taskID)
}
return types.ProverTaskFailureType(proverTask.FailureType), nil
}
// GetTimeoutAssignedProverTasks get the timeout and assigned proving_status prover task
func (o *ProverTask) GetTimeoutAssignedProverTasks(ctx context.Context, limit int, taskType message.ProofType, timeout time.Duration) ([]ProverTask, error) {
db := o.db.WithContext(ctx)
@@ -160,32 +192,14 @@ func (o *ProverTask) GetTimeoutAssignedProverTasks(ctx context.Context, limit in
return proverTasks, nil
}
// TaskTimeoutMoreThanOnce get the timeout twice task. a temp design
func (o *ProverTask) TaskTimeoutMoreThanOnce(ctx context.Context, taskID string) bool {
db := o.db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("task_id", taskID)
db = db.Where("proving_status", int(types.ProverProofInvalid))
var count int64
if err := db.Count(&count).Error; err != nil {
return true
}
if count >= 1 {
return true
}
return false
}
// SetProverTask updates or inserts a ProverTask record.
func (o *ProverTask) SetProverTask(ctx context.Context, proverTask *ProverTask, dbTX ...*gorm.DB) error {
db := o.db.WithContext(ctx)
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "task_type"}, {Name: "task_id"}, {Name: "prover_public_key"}, {Name: "prover_version"}},

View File

@@ -283,7 +283,7 @@ func testInvalidProof(t *testing.T) {
assert.NoError(t, err)
batch, err := batchOrm.InsertBatch(context.Background(), 0, 0, dbChunk.Hash, dbChunk.Hash, []*types.Chunk{chunk})
assert.NoError(t, err)
err = batchOrm.UpdateChunkProofsStatusByBatchHash(context.Background(), batch.Hash, types.ChunkProofsStatusReady)
err = chunkOrm.UpdateBatchHashInRange(context.Background(), 0, 0, batch.Hash)
assert.NoError(t, err)
// create mock provers.
@@ -292,13 +292,17 @@ func testInvalidProof(t *testing.T) {
var proofType message.ProofType
if i%2 == 0 {
proofType = message.ProofTypeChunk
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), coordinatorURL, proofType)
proverTask := provers[i].getProverTask(t, proofType)
assert.NotNil(t, proverTask)
provers[i].submitProof(t, proverTask, verifiedSuccess, types.Success)
} else {
proofType = message.ProofTypeBatch
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), coordinatorURL, proofType)
proverTask := provers[i].getProverTask(t, proofType)
assert.NotNil(t, proverTask)
provers[i].submitProof(t, proverTask, verifiedFailed, types.ErrCoordinatorHandleZkProofFailure)
}
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), coordinatorURL, proofType)
proverTask := provers[i].getProverTask(t, proofType)
assert.NotNil(t, proverTask)
provers[i].submitProof(t, proverTask, verifiedFailed, types.ErrCoordinatorHandleZkProofFailure)
}
// verify proof status
@@ -307,17 +311,25 @@ func testInvalidProof(t *testing.T) {
tickStop = time.Tick(time.Minute)
)
var chunkProofStatus types.ProvingStatus
var batchProofStatus types.ProvingStatus
var (
chunkProofStatus types.ProverProveStatus
batchProofStatus types.ProverProveStatus
batchFailureType types.ProverTaskFailureType
)
for {
select {
case <-tick:
chunkProofStatus, err = chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
chunkProofStatus, err = proverTaskOrm.GetProvingStatusByTaskID(context.Background(), dbChunk.Hash)
assert.NoError(t, err)
batchProofStatus, err = batchOrm.GetProvingStatusByHash(context.Background(), batch.Hash)
batchProofStatus, err = proverTaskOrm.GetProvingStatusByTaskID(context.Background(), batch.Hash)
assert.NoError(t, err)
if chunkProofStatus == types.ProvingTaskFailed && batchProofStatus == types.ProvingTaskFailed {
batchFailureType, err = proverTaskOrm.GetFailureTypeByTaskID(context.Background(), batch.Hash)
assert.NoError(t, err)
if chunkProofStatus == types.ProverProofValid &&
batchProofStatus == types.ProverProofInvalid &&
batchFailureType == types.ProverTaskFailureTypeVerifiedFailed {
return
}
case <-tickStop:
@@ -342,20 +354,12 @@ func testProofGeneratedFailed(t *testing.T) {
assert.NoError(t, err)
err = l2BlockOrm.UpdateChunkHashInRange(context.Background(), 0, 100, dbChunk.Hash)
assert.NoError(t, err)
batch, err := batchOrm.InsertBatch(context.Background(), 0, 0, dbChunk.Hash, dbChunk.Hash, []*types.Chunk{chunk})
assert.NoError(t, err)
err = batchOrm.UpdateChunkProofsStatusByBatchHash(context.Background(), batch.Hash, types.ChunkProofsStatusReady)
assert.NoError(t, err)
// create mock provers.
provers := make([]*mockProver, 2)
provers := make([]*mockProver, 1)
for i := 0; i < len(provers); i++ {
var proofType message.ProofType
if i%2 == 0 {
proofType = message.ProofTypeChunk
} else {
proofType = message.ProofTypeBatch
}
proofType = message.ProofTypeChunk
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), coordinatorURL, proofType)
proverTask := provers[i].getProverTask(t, proofType)
assert.NotNil(t, proverTask)
@@ -370,9 +374,7 @@ func testProofGeneratedFailed(t *testing.T) {
var (
chunkProofStatus types.ProvingStatus
batchProofStatus types.ProvingStatus
chunkProverTaskProvingStatus types.ProverProveStatus
batchProverTaskProvingStatus types.ProverProveStatus
)
for {
@@ -380,21 +382,17 @@ func testProofGeneratedFailed(t *testing.T) {
case <-tick:
chunkProofStatus, err = chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
assert.NoError(t, err)
batchProofStatus, err = batchOrm.GetProvingStatusByHash(context.Background(), batch.Hash)
assert.NoError(t, err)
if chunkProofStatus == types.ProvingTaskAssigned && batchProofStatus == types.ProvingTaskAssigned {
if chunkProofStatus == types.ProvingTaskAssigned {
return
}
chunkProverTaskProvingStatus, err = proverTaskOrm.GetProvingStatusByTaskID(context.Background(), dbChunk.Hash)
assert.NoError(t, err)
batchProverTaskProvingStatus, err = proverTaskOrm.GetProvingStatusByTaskID(context.Background(), batch.Hash)
assert.NoError(t, err)
if chunkProverTaskProvingStatus == types.ProverProofInvalid && batchProverTaskProvingStatus == types.ProverProofInvalid {
if chunkProverTaskProvingStatus == types.ProverProofInvalid {
return
}
case <-tickStop:
t.Error("failed to check proof status", "chunkProofStatus", chunkProofStatus.String(), "batchProofStatus", batchProofStatus.String())
t.Error("failed to check proof status", "chunkProofStatus", chunkProofStatus.String())
return
}
}
@@ -414,32 +412,14 @@ func testTimeoutProof(t *testing.T) {
dbChunk, err := chunkOrm.InsertChunk(context.Background(), chunk)
assert.NoError(t, err)
err = l2BlockOrm.UpdateChunkHashInRange(context.Background(), 0, 100, dbChunk.Hash)
assert.NoError(t, err)
batch, err := batchOrm.InsertBatch(context.Background(), 0, 0, dbChunk.Hash, dbChunk.Hash, []*types.Chunk{chunk})
assert.NoError(t, err)
err = batchOrm.UpdateChunkProofsStatusByBatchHash(context.Background(), batch.Hash, types.ChunkProofsStatusReady)
assert.NoError(t, err)
// create first chunk & batch mock prover, that will not send any proof.
chunkProver1 := newMockProver(t, "prover_test"+strconv.Itoa(0), coordinatorURL, message.ProofTypeChunk)
proverChunkTask := chunkProver1.getProverTask(t, message.ProofTypeChunk)
assert.NotNil(t, proverChunkTask)
batchProver1 := newMockProver(t, "prover_test"+strconv.Itoa(1), coordinatorURL, message.ProofTypeBatch)
proverBatchTask := batchProver1.getProverTask(t, message.ProofTypeBatch)
assert.NotNil(t, proverBatchTask)
// verify proof status, it should be assigned, because prover didn't send any proof
chunkProofStatus, err := chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
assert.NoError(t, err)
assert.Equal(t, chunkProofStatus, types.ProvingTaskAssigned)
batchProofStatus, err := batchOrm.GetProvingStatusByHash(context.Background(), batch.Hash)
assert.NoError(t, err)
assert.Equal(t, batchProofStatus, types.ProvingTaskAssigned)
// wait coordinator to reset the prover task proving status
time.Sleep(time.Duration(conf.ProverManager.BatchCollectionTimeSec*2) * time.Second)
time.Sleep(time.Duration(conf.ProverManager.ChunkCollectionTimeSec*2) * time.Second)
// create second mock prover, that will send valid proof.
chunkProver2 := newMockProver(t, "prover_test"+strconv.Itoa(2), coordinatorURL, message.ProofTypeChunk)
@@ -447,17 +427,8 @@ func testTimeoutProof(t *testing.T) {
assert.NotNil(t, proverChunkTask2)
chunkProver2.submitProof(t, proverChunkTask2, verifiedSuccess, types.Success)
batchProver2 := newMockProver(t, "prover_test"+strconv.Itoa(3), coordinatorURL, message.ProofTypeBatch)
proverBatchTask2 := batchProver2.getProverTask(t, message.ProofTypeBatch)
assert.NotNil(t, proverBatchTask2)
batchProver2.submitProof(t, proverBatchTask2, verifiedSuccess, types.Success)
// verify proof status, it should be verified now, because second prover sent valid proof
chunkProofStatus2, err := chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
assert.NoError(t, err)
assert.Equal(t, chunkProofStatus2, types.ProvingTaskVerified)
batchProofStatus2, err := batchOrm.GetProvingStatusByHash(context.Background(), batch.Hash)
assert.NoError(t, err)
assert.Equal(t, batchProofStatus2, types.ProvingTaskVerified)
}

View File

@@ -24,6 +24,8 @@ create table chunk
prover_assigned_at TIMESTAMP(0) DEFAULT NULL,
proved_at TIMESTAMP(0) DEFAULT NULL,
proof_time_sec INTEGER DEFAULT NULL,
total_attempts SMALLINT NOT NULL DEFAULT 0,
active_attempts SMALLINT NOT NULL DEFAULT 0,
-- batch
batch_hash VARCHAR DEFAULT NULL,
@@ -50,6 +52,10 @@ on chunk (hash) where deleted_at IS NULL;
create index batch_hash_index
on chunk (batch_hash) where deleted_at IS NULL;
create index chunk_idx_total_attempts_and_active_attempts_and_end_block_number
on chunk (total_attempts, active_attempts, end_block_number)
where deleted_at IS NULL;
-- +goose StatementEnd
-- +goose Down

View File

@@ -22,6 +22,8 @@ create table batch
prover_assigned_at TIMESTAMP(0) DEFAULT NULL,
proved_at TIMESTAMP(0) DEFAULT NULL,
proof_time_sec INTEGER DEFAULT NULL,
total_attempts SMALLINT NOT NULL DEFAULT 0,
active_attempts SMALLINT NOT NULL DEFAULT 0,
-- rollup
rollup_status SMALLINT NOT NULL DEFAULT 1,
@@ -46,6 +48,10 @@ on batch (index) where deleted_at IS NULL;
create unique index batch_hash_uindex
on batch (hash) where deleted_at IS NULL;
create index batch_idx_total_attempts_and_active_attempts_and_chunk_proofs_status
on batch (total_attempts, active_attempts, chunk_proofs_status)
where deleted_at IS NULL;
comment
on column batch.chunk_proofs_status is 'undefined, pending, ready';

View File

@@ -193,6 +193,7 @@ github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/crate-crypto/go-ipa v0.0.0-20220523130400-f11357ae11c7 h1:6IrxszG5G+O7zhtkWxq6+unVvnrm1fqV2Pe+T95DUzw=
github.com/crate-crypto/go-ipa v0.0.0-20220523130400-f11357ae11c7/go.mod h1:gFnFS95y8HstDP6P9pPwzrxOOC5TRDkwbM+ao15ChAI=
github.com/crate-crypto/go-ipa v0.0.0-20230601170251-1830d0757c80 h1:DuBDHVjgGMPki7bAyh91+3cF1Vh34sAEdH8JQgbc2R0=
github.com/crate-crypto/go-ipa v0.0.0-20230601170251-1830d0757c80/go.mod h1:gzbVz57IDJgQ9rLQwfSk696JGWof8ftznEL9GoAv3NI=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
@@ -238,6 +239,7 @@ github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf h1:Yt+4K30SdjOkRoRRm3v
github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf/go.mod h1:R9ET47fwRVRPZnOGvHxxhuZcbrMCuiqOz3Rlrh4KSnk=
github.com/dop251/goja v0.0.0-20230122112309-96b1610dd4f7 h1:kgvzE5wLsLa7XKfV85VZl40QXaMCaeFtHpPwJ8fhotY=
github.com/dop251/goja v0.0.0-20230122112309-96b1610dd4f7/go.mod h1:yRkwfj0CBpOGre+TwBsqPV0IH0Pk73e4PXJOeNDboGs=
github.com/dop251/goja v0.0.0-20230605162241-28ee0ee714f3 h1:+3HCtB74++ClLy8GgjUQYeC8R4ILzVcIe8+5edAJJnE=
github.com/dop251/goja v0.0.0-20230605162241-28ee0ee714f3/go.mod h1:QMWlm50DNe14hD7t24KEqZuUdC9sOTy8W6XbCU1mlw4=
github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7 h1:tYwu/z8Y0NkkzGEh3z21mSWggMg4LwLRFucLS7TjARg=
github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y=
@@ -277,6 +279,7 @@ github.com/garslo/gogen v0.0.0-20170306192744-1d203ffc1f61/go.mod h1:Q0X6pkwTILD
github.com/gavv/httpexpect v2.0.0+incompatible h1:1X9kcRshkSKEjNJJxX9Y9mQ5BRfbxU5kORdjhlA1yX8=
github.com/gballet/go-verkle v0.0.0-20220902153445-097bd83b7732 h1:AB7YjNrzlVHsYz06zCULVV2zYCEft82P86dSmtwxKL0=
github.com/gballet/go-verkle v0.0.0-20220902153445-097bd83b7732/go.mod h1:o/XfIXWi4/GqbQirfRm5uTbXMG5NpqxkxblnbZ+QM9I=
github.com/gballet/go-verkle v0.0.0-20230607174250-df487255f46b h1:vMT47RYsrftsHSTQhqXwC3BYflo38OLC3Y4LtXtLyU0=
github.com/gballet/go-verkle v0.0.0-20230607174250-df487255f46b/go.mod h1:CDncRYVRSDqwakm282WEkjfaAj1hxU/v5RXxk5nXOiI=
github.com/getkin/kin-openapi v0.61.0 h1:6awGqF5nG5zkVpMsAih1QH4VgzS8phTxECUWIFo7zko=
github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9 h1:r5GgOLGbza2wVHRzK7aAj6lWZjfbAwiu/RDCVOKjRyM=
@@ -374,6 +377,7 @@ github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99 h1:Ak8CrdlwwXwAZxzS66vgPt4U8yUZX7JwLvVR58FN5jM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 h1:4/hN5RUoecvl+RmJRE2YxKWtnnQls6rQjjW5oV7qg2U=
github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg=
github.com/google/renameio v0.1.0 h1:GOZbcHa3HfsPKPlmyPyN2KEohoMXOhdMbHrvbpl2QaA=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
@@ -472,6 +476,7 @@ github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQ
github.com/karalabe/usb v0.0.0-20211005121534-4c5740d64559/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU=
github.com/karalabe/usb v0.0.2 h1:M6QQBNxF+CQ8OFvxrT90BA0qBOXymndZnk5q235mFc4=
github.com/karalabe/usb v0.0.2/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU=
github.com/karalabe/usb v0.0.3-0.20230711191512-61db3e06439c h1:AqsttAyEyIEsNz5WLRwuRwjiT5CMDUfLk6cFJDVPebs=
github.com/karalabe/usb v0.0.3-0.20230711191512-61db3e06439c/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU=
github.com/kataras/blocks v0.0.7 h1:cF3RDY/vxnSRezc7vLFlQFTYXG/yAr1o7WImJuZbzC4=
github.com/kataras/blocks v0.0.7/go.mod h1:UJIU97CluDo0f+zEjbnbkeMRlvYORtmc1304EeyXf4I=
@@ -711,6 +716,7 @@ go.opentelemetry.io/otel v1.9.0/go.mod h1:np4EoPGzoPs3O67xUVNoPPcmSvsfOxNlNA4F4A
go.opentelemetry.io/otel/trace v1.9.0 h1:oZaCNJUjWcg60VXWee8lJKlqhPbXAPB51URuR47pQYc=
go.opentelemetry.io/otel/trace v1.9.0/go.mod h1:2737Q0MuG8q1uILYm2YYVkAyLtOofiTNGg6VODnOiPo=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/automaxprocs v1.5.2 h1:2LxUOGiR3O6tw8ui5sZa2LAaHnsviZdVOUZw4fvbnME=
go.uber.org/automaxprocs v1.5.2/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
@@ -782,6 +788,7 @@ golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.10.0 h1:3R7pNqamzBraeqj/Tj8qt1aQ2HpmlC+Cx/qL/7hn4/c=
golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o=
golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0=
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=

View File

@@ -37,8 +37,11 @@ func NewCoordinatorClient(cfg *config.CoordinatorConfig, proverName string, priv
SetBaseURL(cfg.BaseURL).
AddRetryCondition(func(r *resty.Response, _ error) bool {
// Check for HTTP 5xx errors, e.g., coordinator is restarting.
log.Warn("Received unexpected HTTP response. Retrying...", "status code", r.StatusCode())
return r.StatusCode() >= http.StatusInternalServerError
if r.StatusCode() >= http.StatusInternalServerError {
log.Warn("Received unexpected HTTP response. Retrying...", "status code", r.StatusCode())
return true
}
return false
})
log.Info("successfully initialized prover client",