mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-13 07:57:58 -05:00
Compare commits
13 Commits
develop
...
fix/chunk_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
06dd3f6100 | ||
|
|
aa6c216405 | ||
|
|
308f8f0502 | ||
|
|
e92e2d54ba | ||
|
|
a7870fc19c | ||
|
|
5dc77d98d9 | ||
|
|
158655dadb | ||
|
|
2350802af9 | ||
|
|
5e7dbbce71 | ||
|
|
49376db8e3 | ||
|
|
360f5ef72b | ||
|
|
a88d1428bc | ||
|
|
5f6e50d883 |
@@ -40,6 +40,8 @@ type Batch struct {
|
|||||||
ProverAssignedAt *time.Time `json:"prover_assigned_at" gorm:"column:prover_assigned_at;default:NULL"`
|
ProverAssignedAt *time.Time `json:"prover_assigned_at" gorm:"column:prover_assigned_at;default:NULL"`
|
||||||
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
|
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
|
||||||
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
|
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
|
||||||
|
TotalAttempts int16 `json:"total_attempts" gorm:"column:total_attempts;default:0"`
|
||||||
|
ActiveAttempts int16 `json:"active_attempts" gorm:"column:active_attempts;default:0"`
|
||||||
|
|
||||||
// rollup
|
// rollup
|
||||||
RollupStatus int16 `json:"rollup_status" gorm:"column:rollup_status;default:1"`
|
RollupStatus int16 `json:"rollup_status" gorm:"column:rollup_status;default:1"`
|
||||||
@@ -267,6 +269,8 @@ func (o *Batch) InsertBatch(ctx context.Context, startChunkIndex, endChunkIndex
|
|||||||
BatchHeader: batchHeader.Encode(),
|
BatchHeader: batchHeader.Encode(),
|
||||||
ChunkProofsStatus: int16(types.ChunkProofsStatusPending),
|
ChunkProofsStatus: int16(types.ChunkProofsStatusPending),
|
||||||
ProvingStatus: int16(types.ProvingTaskUnassigned),
|
ProvingStatus: int16(types.ProvingTaskUnassigned),
|
||||||
|
TotalAttempts: 0,
|
||||||
|
ActiveAttempts: 0,
|
||||||
RollupStatus: int16(types.RollupPending),
|
RollupStatus: int16(types.RollupPending),
|
||||||
OracleStatus: int16(types.GasOraclePending),
|
OracleStatus: int16(types.GasOraclePending),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,6 +37,8 @@ type Chunk struct {
|
|||||||
ProverAssignedAt *time.Time `json:"prover_assigned_at" gorm:"column:prover_assigned_at;default:NULL"`
|
ProverAssignedAt *time.Time `json:"prover_assigned_at" gorm:"column:prover_assigned_at;default:NULL"`
|
||||||
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
|
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
|
||||||
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
|
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
|
||||||
|
TotalAttempts int16 `json:"total_attempts" gorm:"column:total_attempts;default:0"`
|
||||||
|
ActiveAttempts int16 `json:"active_attempts" gorm:"column:active_attempts;default:0"`
|
||||||
|
|
||||||
// batch
|
// batch
|
||||||
BatchHash string `json:"batch_hash" gorm:"column:batch_hash;default:NULL"`
|
BatchHash string `json:"batch_hash" gorm:"column:batch_hash;default:NULL"`
|
||||||
@@ -175,6 +177,8 @@ func (o *Chunk) InsertChunk(ctx context.Context, chunk *types.Chunk, dbTX ...*go
|
|||||||
ParentChunkStateRoot: parentChunkStateRoot,
|
ParentChunkStateRoot: parentChunkStateRoot,
|
||||||
WithdrawRoot: chunk.Blocks[numBlocks-1].WithdrawRoot.Hex(),
|
WithdrawRoot: chunk.Blocks[numBlocks-1].WithdrawRoot.Hex(),
|
||||||
ProvingStatus: int16(types.ProvingTaskUnassigned),
|
ProvingStatus: int16(types.ProvingTaskUnassigned),
|
||||||
|
TotalAttempts: 0,
|
||||||
|
ActiveAttempts: 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
db := o.db
|
db := o.db
|
||||||
|
|||||||
@@ -103,6 +103,10 @@ const (
|
|||||||
ProverTaskFailureTypeUndefined ProverTaskFailureType = iota
|
ProverTaskFailureTypeUndefined ProverTaskFailureType = iota
|
||||||
// ProverTaskFailureTypeTimeout prover task failure of timeout
|
// ProverTaskFailureTypeTimeout prover task failure of timeout
|
||||||
ProverTaskFailureTypeTimeout
|
ProverTaskFailureTypeTimeout
|
||||||
|
// ProverTaskFailureTypeValidatedFailed prover task failure of validated failed by coordinator
|
||||||
|
ProverTaskFailureTypeValidatedFailed
|
||||||
|
// ProverTaskFailureTypeVerifiedFailed prover task failure of verified failed by coordinator
|
||||||
|
ProverTaskFailureTypeVerifiedFailed
|
||||||
)
|
)
|
||||||
|
|
||||||
func (r ProverTaskFailureType) String() string {
|
func (r ProverTaskFailureType) String() string {
|
||||||
@@ -111,8 +115,12 @@ func (r ProverTaskFailureType) String() string {
|
|||||||
return "prover task failure undefined"
|
return "prover task failure undefined"
|
||||||
case ProverTaskFailureTypeTimeout:
|
case ProverTaskFailureTypeTimeout:
|
||||||
return "prover task failure timeout"
|
return "prover task failure timeout"
|
||||||
|
case ProverTaskFailureTypeValidatedFailed:
|
||||||
|
return "prover task failure validated failed"
|
||||||
|
case ProverTaskFailureTypeVerifiedFailed:
|
||||||
|
return "prover task failure verified failed"
|
||||||
default:
|
default:
|
||||||
return "illegal prover task failure type"
|
return fmt.Sprintf("illegal prover task failure type (%d)", int32(r))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
var tag = "v4.1.59"
|
var tag = "v4.1.60"
|
||||||
|
|
||||||
var commit = func() string {
|
var commit = func() string {
|
||||||
if info, ok := debug.ReadBuildInfo(); ok {
|
if info, ok := debug.ReadBuildInfo(); ok {
|
||||||
|
|||||||
@@ -146,8 +146,6 @@ func (c *Collector) timeoutChunkProofTask() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Collector) check(assignedProverTasks []orm.ProverTask, timeout prometheus.Counter) {
|
func (c *Collector) check(assignedProverTasks []orm.ProverTask, timeout prometheus.Counter) {
|
||||||
// here not update the block batch proving status failed, because the collector loop will check
|
|
||||||
// the attempt times. if reach the times, the collector will set the block batch proving status.
|
|
||||||
for _, assignedProverTask := range assignedProverTasks {
|
for _, assignedProverTask := range assignedProverTasks {
|
||||||
timeout.Inc()
|
timeout.Inc()
|
||||||
log.Warn("proof task have reach the timeout", "task id", assignedProverTask.TaskID,
|
log.Warn("proof task have reach the timeout", "task id", assignedProverTask.TaskID,
|
||||||
@@ -156,26 +154,55 @@ func (c *Collector) check(assignedProverTasks []orm.ProverTask, timeout promethe
|
|||||||
// update prover task proving status as ProverProofInvalid
|
// update prover task proving status as ProverProofInvalid
|
||||||
if err := c.proverTaskOrm.UpdateProverTaskProvingStatus(c.ctx, message.ProofType(assignedProverTask.TaskType),
|
if err := c.proverTaskOrm.UpdateProverTaskProvingStatus(c.ctx, message.ProofType(assignedProverTask.TaskType),
|
||||||
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverProofInvalid, tx); err != nil {
|
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverProofInvalid, tx); err != nil {
|
||||||
log.Error("update prover task proving status failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
|
log.Error("update prover task proving status failure",
|
||||||
|
"hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey,
|
||||||
|
"prover proving status", types.ProverProofInvalid, "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// update prover task failure type
|
// update prover task failure type as ProverTaskFailureTypeTimeout
|
||||||
if err := c.proverTaskOrm.UpdateProverTaskFailureType(c.ctx, message.ProofType(assignedProverTask.TaskType),
|
if err := c.proverTaskOrm.UpdateProverTaskFailureType(c.ctx, message.ProofType(assignedProverTask.TaskType),
|
||||||
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverTaskFailureTypeTimeout, tx); err != nil {
|
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverTaskFailureTypeTimeout, tx); err != nil {
|
||||||
log.Error("update prover task failure type failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
|
log.Error("update prover task failure type failure",
|
||||||
|
"hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey,
|
||||||
|
"prover failure type", types.ProverTaskFailureTypeTimeout, "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the task to unassigned, let collector restart it
|
|
||||||
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeChunk {
|
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeChunk {
|
||||||
if err := c.chunkOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskUnassigned, tx); err != nil {
|
if err := c.chunkOrm.DecreaseActiveAttemptsByHash(c.ctx, assignedProverTask.TaskID, tx); err != nil {
|
||||||
log.Error("update chunk proving status to unassigned to restart it failure", "hash", assignedProverTask.TaskID, "err", err)
|
log.Error("decrease active attempts of chunk failure", "hash", assignedProverTask.TaskID, "err", err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeBatch {
|
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeBatch {
|
||||||
if err := c.batchOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskUnassigned, tx); err != nil {
|
if err := c.batchOrm.DecreaseActiveAttemptsByHash(c.ctx, assignedProverTask.TaskID, tx); err != nil {
|
||||||
log.Error("update batch proving status to unassigned to restart it failure", "hash", assignedProverTask.TaskID, "err", err)
|
log.Error("decrease active attempts of batch failure", "hash", assignedProverTask.TaskID, "err", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var failedAssignmentCount uint64
|
||||||
|
failedAssignmentCount, err := c.proverTaskOrm.GetFailedTaskAssignmentCount(c.ctx, assignedProverTask.TaskID, tx)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("get failed task assignment count failure", "taskID", assignedProverTask.TaskID, "err", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if failedAssignmentCount >= uint64(c.cfg.ProverManager.SessionAttempts) {
|
||||||
|
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeChunk {
|
||||||
|
if err := c.chunkOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskFailed, tx); err != nil {
|
||||||
|
log.Error("update chunk proving status failure", "hash", assignedProverTask.TaskID,
|
||||||
|
"status", types.ProvingTaskFailed, "err", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeBatch {
|
||||||
|
if err := c.batchOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskFailed, tx); err != nil {
|
||||||
|
log.Error("update batch proving status failure", "hash", assignedProverTask.TaskID,
|
||||||
|
"status", types.ProvingTaskFailed, "err", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -24,7 +24,12 @@ import (
|
|||||||
|
|
||||||
// BatchProverTask is prover task implement for batch proof
|
// BatchProverTask is prover task implement for batch proof
|
||||||
type BatchProverTask struct {
|
type BatchProverTask struct {
|
||||||
BaseProverTask
|
cfg *config.Config
|
||||||
|
db *gorm.DB
|
||||||
|
|
||||||
|
batchOrm *orm.Batch
|
||||||
|
chunkOrm *orm.Chunk
|
||||||
|
proverTaskOrm *orm.ProverTask
|
||||||
|
|
||||||
batchAttemptsExceedTotal prometheus.Counter
|
batchAttemptsExceedTotal prometheus.Counter
|
||||||
batchTaskGetTaskTotal prometheus.Counter
|
batchTaskGetTaskTotal prometheus.Counter
|
||||||
@@ -33,13 +38,11 @@ type BatchProverTask struct {
|
|||||||
// NewBatchProverTask new a batch collector
|
// NewBatchProverTask new a batch collector
|
||||||
func NewBatchProverTask(cfg *config.Config, db *gorm.DB, reg prometheus.Registerer) *BatchProverTask {
|
func NewBatchProverTask(cfg *config.Config, db *gorm.DB, reg prometheus.Registerer) *BatchProverTask {
|
||||||
bp := &BatchProverTask{
|
bp := &BatchProverTask{
|
||||||
BaseProverTask: BaseProverTask{
|
db: db,
|
||||||
db: db,
|
cfg: cfg,
|
||||||
cfg: cfg,
|
chunkOrm: orm.NewChunk(db),
|
||||||
chunkOrm: orm.NewChunk(db),
|
batchOrm: orm.NewBatch(db),
|
||||||
batchOrm: orm.NewBatch(db),
|
proverTaskOrm: orm.NewProverTask(db),
|
||||||
proverTaskOrm: orm.NewProverTask(db),
|
|
||||||
},
|
|
||||||
batchAttemptsExceedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
|
batchAttemptsExceedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
|
||||||
Name: "coordinator_batch_attempts_exceed_total",
|
Name: "coordinator_batch_attempts_exceed_total",
|
||||||
Help: "Total number of batch attempts exceed.",
|
Help: "Total number of batch attempts exceed.",
|
||||||
@@ -81,27 +84,17 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
|
|||||||
return nil, fmt.Errorf("prover with publicKey %s is already assigned a task", publicKey)
|
return nil, fmt.Errorf("prover with publicKey %s is already assigned a task", publicKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
batchTasks, err := bp.batchOrm.UpdateUnassignedBatchReturning(ctx, 1)
|
// load and send a batch task
|
||||||
|
batchTask, err := bp.batchOrm.UpdateBatchAttemptsReturning(ctx, bp.cfg.ProverManager.ProversPerSession, bp.cfg.ProverManager.SessionAttempts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get unassigned batch proving tasks, error:%w", err)
|
return nil, fmt.Errorf("failed to get unassigned batch proving tasks, error:%w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(batchTasks) == 0 {
|
if batchTask == nil {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(batchTasks) != 1 {
|
|
||||||
return nil, fmt.Errorf("get unassigned batch proving task len not 1, batch tasks:%v", batchTasks)
|
|
||||||
}
|
|
||||||
|
|
||||||
batchTask := batchTasks[0]
|
|
||||||
log.Info("start batch proof generation session", "id", batchTask.Hash, "public key", publicKey, "prover name", proverName)
|
log.Info("start batch proof generation session", "id", batchTask.Hash, "public key", publicKey, "prover name", proverName)
|
||||||
|
|
||||||
if !bp.checkAttemptsExceeded(batchTask.Hash, message.ProofTypeBatch) {
|
|
||||||
bp.batchAttemptsExceedTotal.Inc()
|
|
||||||
return nil, fmt.Errorf("the batch task id:%s check attempts have reach the maximum", batchTask.Hash)
|
|
||||||
}
|
|
||||||
|
|
||||||
proverTask := orm.ProverTask{
|
proverTask := orm.ProverTask{
|
||||||
TaskID: batchTask.Hash,
|
TaskID: batchTask.Hash,
|
||||||
ProverPublicKey: publicKey.(string),
|
ProverPublicKey: publicKey.(string),
|
||||||
|
|||||||
@@ -24,7 +24,12 @@ import (
|
|||||||
|
|
||||||
// ChunkProverTask the chunk prover task
|
// ChunkProverTask the chunk prover task
|
||||||
type ChunkProverTask struct {
|
type ChunkProverTask struct {
|
||||||
BaseProverTask
|
cfg *config.Config
|
||||||
|
db *gorm.DB
|
||||||
|
|
||||||
|
chunkOrm *orm.Chunk
|
||||||
|
blockOrm *orm.L2Block
|
||||||
|
proverTaskOrm *orm.ProverTask
|
||||||
|
|
||||||
chunkAttemptsExceedTotal prometheus.Counter
|
chunkAttemptsExceedTotal prometheus.Counter
|
||||||
chunkTaskGetTaskTotal prometheus.Counter
|
chunkTaskGetTaskTotal prometheus.Counter
|
||||||
@@ -33,14 +38,11 @@ type ChunkProverTask struct {
|
|||||||
// NewChunkProverTask new a chunk prover task
|
// NewChunkProverTask new a chunk prover task
|
||||||
func NewChunkProverTask(cfg *config.Config, db *gorm.DB, reg prometheus.Registerer) *ChunkProverTask {
|
func NewChunkProverTask(cfg *config.Config, db *gorm.DB, reg prometheus.Registerer) *ChunkProverTask {
|
||||||
cp := &ChunkProverTask{
|
cp := &ChunkProverTask{
|
||||||
BaseProverTask: BaseProverTask{
|
db: db,
|
||||||
db: db,
|
cfg: cfg,
|
||||||
cfg: cfg,
|
chunkOrm: orm.NewChunk(db),
|
||||||
chunkOrm: orm.NewChunk(db),
|
blockOrm: orm.NewL2Block(db),
|
||||||
blockOrm: orm.NewL2Block(db),
|
proverTaskOrm: orm.NewProverTask(db),
|
||||||
proverTaskOrm: orm.NewProverTask(db),
|
|
||||||
},
|
|
||||||
|
|
||||||
chunkAttemptsExceedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
|
chunkAttemptsExceedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
|
||||||
Name: "coordinator_chunk_attempts_exceed_total",
|
Name: "coordinator_chunk_attempts_exceed_total",
|
||||||
Help: "Total number of chunk attempts exceed.",
|
Help: "Total number of chunk attempts exceed.",
|
||||||
@@ -82,29 +84,18 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
|
|||||||
return nil, fmt.Errorf("prover with publicKey %s is already assigned a task", publicKey)
|
return nil, fmt.Errorf("prover with publicKey %s is already assigned a task", publicKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
// load and send chunk tasks
|
// load and send a chunk task
|
||||||
chunkTasks, err := cp.chunkOrm.UpdateUnassignedChunkReturning(ctx, getTaskParameter.ProverHeight, 1)
|
chunkTask, err := cp.chunkOrm.UpdateChunkAttemptsReturning(ctx, getTaskParameter.ProverHeight, cp.cfg.ProverManager.ProversPerSession, cp.cfg.ProverManager.SessionAttempts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get unassigned chunk proving tasks, error:%w", err)
|
return nil, fmt.Errorf("failed to get unassigned chunk proving tasks, error:%w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(chunkTasks) == 0 {
|
if chunkTask == nil {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(chunkTasks) != 1 {
|
|
||||||
return nil, fmt.Errorf("get unassigned chunk proving task len not 1, chunk tasks:%v", chunkTasks)
|
|
||||||
}
|
|
||||||
|
|
||||||
chunkTask := chunkTasks[0]
|
|
||||||
|
|
||||||
log.Info("start chunk generation session", "id", chunkTask.Hash, "public key", publicKey, "prover name", proverName)
|
log.Info("start chunk generation session", "id", chunkTask.Hash, "public key", publicKey, "prover name", proverName)
|
||||||
|
|
||||||
if !cp.checkAttemptsExceeded(chunkTask.Hash, message.ProofTypeChunk) {
|
|
||||||
cp.chunkAttemptsExceedTotal.Inc()
|
|
||||||
return nil, fmt.Errorf("chunk proof hash id:%s check attempts have reach the maximum", chunkTask.Hash)
|
|
||||||
}
|
|
||||||
|
|
||||||
proverTask := orm.ProverTask{
|
proverTask := orm.ProverTask{
|
||||||
TaskID: chunkTask.Hash,
|
TaskID: chunkTask.Hash,
|
||||||
ProverPublicKey: publicKey.(string),
|
ProverPublicKey: publicKey.(string),
|
||||||
|
|||||||
@@ -1,17 +1,8 @@
|
|||||||
package provertask
|
package provertask
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/scroll-tech/go-ethereum/log"
|
|
||||||
"gorm.io/gorm"
|
|
||||||
|
|
||||||
"scroll-tech/common/types"
|
|
||||||
"scroll-tech/common/types/message"
|
|
||||||
|
|
||||||
"scroll-tech/coordinator/internal/config"
|
|
||||||
"scroll-tech/coordinator/internal/orm"
|
|
||||||
coordinatorType "scroll-tech/coordinator/internal/types"
|
coordinatorType "scroll-tech/coordinator/internal/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -19,53 +10,3 @@ import (
|
|||||||
type ProverTask interface {
|
type ProverTask interface {
|
||||||
Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error)
|
Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// BaseProverTask a base prover task which contain series functions
|
|
||||||
type BaseProverTask struct {
|
|
||||||
cfg *config.Config
|
|
||||||
ctx context.Context
|
|
||||||
db *gorm.DB
|
|
||||||
|
|
||||||
batchOrm *orm.Batch
|
|
||||||
chunkOrm *orm.Chunk
|
|
||||||
blockOrm *orm.L2Block
|
|
||||||
proverTaskOrm *orm.ProverTask
|
|
||||||
}
|
|
||||||
|
|
||||||
// checkAttempts use the count of prover task info to check the attempts
|
|
||||||
func (b *BaseProverTask) checkAttemptsExceeded(hash string, taskType message.ProofType) bool {
|
|
||||||
whereFields := make(map[string]interface{})
|
|
||||||
whereFields["task_id"] = hash
|
|
||||||
whereFields["task_type"] = int16(taskType)
|
|
||||||
proverTasks, err := b.proverTaskOrm.GetProverTasks(b.ctx, whereFields, nil, 0, 0)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("get prover task error", "hash id", hash, "error", err)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(proverTasks) >= int(b.cfg.ProverManager.SessionAttempts) {
|
|
||||||
log.Warn("proof generation prover task reach the max attempts", "hash", hash)
|
|
||||||
|
|
||||||
transErr := b.db.Transaction(func(tx *gorm.DB) error {
|
|
||||||
switch message.ProofType(proverTasks[0].TaskType) {
|
|
||||||
case message.ProofTypeChunk:
|
|
||||||
if err := b.chunkOrm.UpdateProvingStatus(b.ctx, hash, types.ProvingTaskFailed, tx); err != nil {
|
|
||||||
log.Error("failed to update chunk proving_status as failed", "msg.ID", hash, "error", err)
|
|
||||||
}
|
|
||||||
case message.ProofTypeBatch:
|
|
||||||
if err := b.batchOrm.UpdateProvingStatus(b.ctx, hash, types.ProvingTaskFailed, tx); err != nil {
|
|
||||||
log.Error("failed to update batch proving_status as failed", "msg.ID", hash, "error", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// update the prover task status to let timeout checker don't check it.
|
|
||||||
if err := b.proverTaskOrm.UpdateAllProverTaskProvingStatusOfTaskID(b.ctx, message.ProofType(proverTasks[0].TaskType), hash, types.ProverProofInvalid, tx); err != nil {
|
|
||||||
log.Error("failed to update prover task proving_status as failed", "msg.ID", hash, "error", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if transErr == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -145,6 +145,7 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
|
|||||||
"prover pk", pk, "prove type", proverTask.TaskType, "proof time", proofTimeSec)
|
"prover pk", pk, "prove type", proverTask.TaskType, "proof time", proofTimeSec)
|
||||||
|
|
||||||
if err = m.validator(ctx, proverTask, pk, proofMsg); err != nil {
|
if err = m.validator(ctx, proverTask, pk, proofMsg); err != nil {
|
||||||
|
m.proofFailure(ctx, proverTask, types.ProverTaskFailureTypeValidatedFailed)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -161,7 +162,8 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
|
|||||||
|
|
||||||
if verifyErr != nil || !success {
|
if verifyErr != nil || !success {
|
||||||
m.verifierFailureTotal.WithLabelValues(proverVersion).Inc()
|
m.verifierFailureTotal.WithLabelValues(proverVersion).Inc()
|
||||||
m.proofFailure(ctx, proofMsg.ID, pk, proofMsg)
|
|
||||||
|
m.proofFailure(ctx, proverTask, types.ProverTaskFailureTypeVerifiedFailed)
|
||||||
|
|
||||||
log.Info("proof verified by coordinator failed", "proof id", proofMsg.ID, "prover name", proverTask.ProverName,
|
log.Info("proof verified by coordinator failed", "proof id", proofMsg.ID, "prover name", proverTask.ProverName,
|
||||||
"prover pk", pk, "prove type", proofMsg.Type, "proof time", proofTimeSec, "error", verifyErr)
|
"prover pk", pk, "prove type", proofMsg.Type, "proof time", proofTimeSec, "error", verifyErr)
|
||||||
@@ -186,25 +188,6 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *ProofReceiverLogic) checkAreAllChunkProofsReady(ctx context.Context, chunkHash string) error {
|
|
||||||
batchHash, err := m.chunkOrm.GetChunkBatchHash(ctx, chunkHash)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
allReady, err := m.chunkOrm.CheckIfBatchChunkProofsAreReady(ctx, batchHash)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if allReady {
|
|
||||||
err := m.batchOrm.UpdateChunkProofsStatusByBatchHash(ctx, batchHash, types.ChunkProofsStatusReady)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.ProverTask, pk string, proofMsg *message.ProofMsg) (err error) {
|
func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.ProverTask, pk string, proofMsg *message.ProofMsg) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -267,12 +250,40 @@ func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.Prov
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *ProofReceiverLogic) proofFailure(ctx context.Context, hash string, pubKey string, proofMsg *message.ProofMsg) {
|
func (m *ProofReceiverLogic) proofFailure(ctx context.Context, proverTask *orm.ProverTask, failedType types.ProverTaskFailureType) {
|
||||||
log.Info("proof failure update proof status", "hash", hash, "public key", pubKey,
|
err := m.db.Transaction(func(tx *gorm.DB) error {
|
||||||
"proof type", proofMsg.Type.String(), "status", types.ProvingTaskFailed.String())
|
// update prover task proving status as ProverProofInvalid
|
||||||
|
if err := m.proverTaskOrm.UpdateProverTaskProvingStatus(ctx, message.ProofType(proverTask.TaskType),
|
||||||
|
proverTask.TaskID, proverTask.ProverPublicKey, types.ProverProofInvalid); err != nil {
|
||||||
|
log.Error("update prover task proving status failure", "hash", proverTask.TaskID, "pubKey", proverTask.ProverPublicKey,
|
||||||
|
"prover proving status", types.ProverProofInvalid, "err", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if err := m.updateProofStatus(ctx, hash, pubKey, proofMsg, types.ProvingTaskFailed, 0); err != nil {
|
// update prover task failure type as ProverTaskFailureTypeVerifiedFailed
|
||||||
log.Error("failed to updated proof status ProvingTaskFailed", "hash", hash, "pubKey", pubKey, "error", err)
|
if err := m.proverTaskOrm.UpdateProverTaskFailureType(ctx, message.ProofType(proverTask.TaskType),
|
||||||
|
proverTask.TaskID, proverTask.ProverPublicKey, failedType, tx); err != nil {
|
||||||
|
log.Error("update prover task failure type failure", "hash", proverTask.TaskID, "pubKey", proverTask.ProverPublicKey,
|
||||||
|
"prover failure type", failedType, "err", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if message.ProofType(proverTask.TaskType) == message.ProofTypeChunk {
|
||||||
|
if err := m.chunkOrm.DecreaseActiveAttemptsByHash(ctx, proverTask.TaskID, tx); err != nil {
|
||||||
|
log.Error("decrease active attempts of chunk failure", "hash", proverTask.TaskID, "err", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if message.ProofType(proverTask.TaskType) == message.ProofTypeBatch {
|
||||||
|
if err := m.batchOrm.DecreaseActiveAttemptsByHash(ctx, proverTask.TaskID, tx); err != nil {
|
||||||
|
log.Error("decrease active attempts of batch failure", "hash", proverTask.TaskID, "err", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Error("failed to update proof status as failed", "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -334,30 +345,18 @@ func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string,
|
|||||||
switch proofMsg.Type {
|
switch proofMsg.Type {
|
||||||
case message.ProofTypeChunk:
|
case message.ProofTypeChunk:
|
||||||
if err := m.chunkOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil {
|
if err := m.chunkOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil {
|
||||||
log.Error("failed to update chunk proving_status as failed", "msg.ID", hash, "error", err)
|
log.Error("failed to update chunk proving_status", "msg.ID", hash, "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case message.ProofTypeBatch:
|
case message.ProofTypeBatch:
|
||||||
if err := m.batchOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil {
|
if err := m.batchOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil {
|
||||||
log.Error("failed to update batch proving_status as failed", "msg.ID", hash, "error", err)
|
log.Error("failed to update batch proving_status", "msg.ID", hash, "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
return err
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if status == types.ProvingTaskVerified && proofMsg.Type == message.ProofTypeChunk {
|
|
||||||
if checkReadyErr := m.checkAreAllChunkProofsReady(ctx, hash); checkReadyErr != nil {
|
|
||||||
log.Error("failed to check are all chunk proofs ready", "error", checkReadyErr)
|
|
||||||
return checkReadyErr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *ProofReceiverLogic) checkIsTaskSuccess(ctx context.Context, hash string, proofType message.ProofType) bool {
|
func (m *ProofReceiverLogic) checkIsTaskSuccess(ctx context.Context, hash string, proofType message.ProofType) bool {
|
||||||
|
|||||||
@@ -41,6 +41,8 @@ type Batch struct {
|
|||||||
ProverAssignedAt *time.Time `json:"prover_assigned_at" gorm:"column:prover_assigned_at;default:NULL"`
|
ProverAssignedAt *time.Time `json:"prover_assigned_at" gorm:"column:prover_assigned_at;default:NULL"`
|
||||||
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
|
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
|
||||||
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
|
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
|
||||||
|
TotalAttempts int16 `json:"total_attempts" gorm:"column:total_attempts;default:0"`
|
||||||
|
ActiveAttempts int16 `json:"active_attempts" gorm:"column:active_attempts;default:0"`
|
||||||
|
|
||||||
// rollup
|
// rollup
|
||||||
RollupStatus int16 `json:"rollup_status" gorm:"column:rollup_status;default:1"`
|
RollupStatus int16 `json:"rollup_status" gorm:"column:rollup_status;default:1"`
|
||||||
@@ -191,6 +193,8 @@ func (o *Batch) InsertBatch(ctx context.Context, startChunkIndex, endChunkIndex
|
|||||||
BatchHeader: batchHeader.Encode(),
|
BatchHeader: batchHeader.Encode(),
|
||||||
ChunkProofsStatus: int16(types.ChunkProofsStatusPending),
|
ChunkProofsStatus: int16(types.ChunkProofsStatusPending),
|
||||||
ProvingStatus: int16(types.ProvingTaskUnassigned),
|
ProvingStatus: int16(types.ProvingTaskUnassigned),
|
||||||
|
TotalAttempts: 0,
|
||||||
|
ActiveAttempts: 0,
|
||||||
RollupStatus: int16(types.RollupPending),
|
RollupStatus: int16(types.RollupPending),
|
||||||
OracleStatus: int16(types.GasOraclePending),
|
OracleStatus: int16(types.GasOraclePending),
|
||||||
}
|
}
|
||||||
@@ -209,19 +213,6 @@ func (o *Batch) InsertBatch(ctx context.Context, startChunkIndex, endChunkIndex
|
|||||||
return &newBatch, nil
|
return &newBatch, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateChunkProofsStatusByBatchHash updates the status of chunk_proofs_status field for a given batch hash.
|
|
||||||
// The function will set the chunk_proofs_status to the status provided.
|
|
||||||
func (o *Batch) UpdateChunkProofsStatusByBatchHash(ctx context.Context, batchHash string, status types.ChunkProofsStatus) error {
|
|
||||||
db := o.db.WithContext(ctx)
|
|
||||||
db = db.Model(&Batch{})
|
|
||||||
db = db.Where("hash = ?", batchHash)
|
|
||||||
|
|
||||||
if err := db.Update("chunk_proofs_status", status).Error; err != nil {
|
|
||||||
return fmt.Errorf("Batch.UpdateChunkProofsStatusByBatchHash error: %w, batch hash: %v, status: %v", err, batchHash, status.String())
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateProvingStatus updates the proving status of a batch.
|
// UpdateProvingStatus updates the proving status of a batch.
|
||||||
func (o *Batch) UpdateProvingStatus(ctx context.Context, hash string, status types.ProvingStatus, dbTX ...*gorm.DB) error {
|
func (o *Batch) UpdateProvingStatus(ctx context.Context, hash string, status types.ProvingStatus, dbTX ...*gorm.DB) error {
|
||||||
db := o.db
|
db := o.db
|
||||||
@@ -275,28 +266,52 @@ func (o *Batch) UpdateProofByHash(ctx context.Context, hash string, proof *messa
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateUnassignedBatchReturning update the unassigned batch and return the update record
|
// UpdateBatchAttemptsReturning atomically increments the attempts count for the earliest available batch that meets the conditions.
|
||||||
func (o *Batch) UpdateUnassignedBatchReturning(ctx context.Context, limit int) ([]*Batch, error) {
|
func (o *Batch) UpdateBatchAttemptsReturning(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
|
||||||
if limit < 0 {
|
db := o.db.WithContext(ctx)
|
||||||
return nil, errors.New("limit must not be smaller than zero")
|
subQueryDB := db.Model(&Batch{})
|
||||||
|
subQueryDB = subQueryDB.Select("batch.index")
|
||||||
|
// Lock the selected row to ensure atomic updates
|
||||||
|
subQueryDB = subQueryDB.Clauses(clause.Locking{Strength: "UPDATE"})
|
||||||
|
subQueryDB = subQueryDB.Where("batch.total_attempts < ?", maxTotalAttempts)
|
||||||
|
subQueryDB = subQueryDB.Where("batch.active_attempts < ?", maxActiveAttempts)
|
||||||
|
subQueryDB = subQueryDB.Joins("JOIN chunk ON chunk.batch_hash = batch.hash AND chunk.proving_status = ?", types.ProvingTaskVerified)
|
||||||
|
subQueryDB = subQueryDB.Order("index ASC")
|
||||||
|
subQueryDB = subQueryDB.Limit(1)
|
||||||
|
|
||||||
|
// Perform the update and return the modified batch
|
||||||
|
var updatedBatch Batch
|
||||||
|
db = db.Model(&updatedBatch).Clauses(clause.Returning{})
|
||||||
|
db = db.Where("index = (?)", subQueryDB)
|
||||||
|
result := db.Updates(map[string]interface{}{
|
||||||
|
"total_attempts": gorm.Expr("total_attempts + ?", 1),
|
||||||
|
"active_attempts": gorm.Expr("active_attempts + ?", 1),
|
||||||
|
})
|
||||||
|
|
||||||
|
if result.Error != nil {
|
||||||
|
return nil, fmt.Errorf("failed to select and update batch, max active attempts: %v, max total attempts: %v, err: %w",
|
||||||
|
maxActiveAttempts, maxTotalAttempts, result.Error)
|
||||||
}
|
}
|
||||||
if limit == 0 {
|
|
||||||
|
if result.RowsAffected == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
return &updatedBatch, nil
|
||||||
db := o.db.WithContext(ctx)
|
}
|
||||||
|
|
||||||
subQueryDB := db.Model(&Batch{}).Select("index")
|
// DecreaseActiveAttemptsByHash decrements the active_attempts of a batch given its hash.
|
||||||
subQueryDB = subQueryDB.Where("proving_status = ? AND chunk_proofs_status = ?", types.ProvingTaskUnassigned, types.ChunkProofsStatusReady)
|
func (o *Batch) DecreaseActiveAttemptsByHash(ctx context.Context, batchHash string, dbTX ...*gorm.DB) error {
|
||||||
subQueryDB = subQueryDB.Order("index ASC")
|
db := o.db
|
||||||
subQueryDB = subQueryDB.Limit(limit)
|
if len(dbTX) > 0 && dbTX[0] != nil {
|
||||||
|
db = dbTX[0]
|
||||||
var batches []*Batch
|
}
|
||||||
db = db.Model(&batches).Clauses(clause.Returning{})
|
db = db.WithContext(ctx)
|
||||||
db = db.Where("index = (?)", subQueryDB)
|
db = db.Model(&Batch{})
|
||||||
db = db.Where("proving_status = ?", types.ProvingTaskUnassigned)
|
db = db.Where("hash = ?", batchHash)
|
||||||
if err := db.Update("proving_status", types.ProvingTaskAssigned).Error; err != nil {
|
|
||||||
return nil, fmt.Errorf("Batch.UpdateUnassignedBatchReturning error: %w", err)
|
if err := db.UpdateColumn("active_attempts", gorm.Expr("active_attempts - ?", 1)).Error; err != nil {
|
||||||
}
|
return fmt.Errorf("Batch.DecreaseActiveAttemptsByHash error: %w, batch hash: %v", err, batchHash)
|
||||||
return batches, nil
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,6 +40,8 @@ type Chunk struct {
|
|||||||
ProverAssignedAt *time.Time `json:"prover_assigned_at" gorm:"column:prover_assigned_at;default:NULL"`
|
ProverAssignedAt *time.Time `json:"prover_assigned_at" gorm:"column:prover_assigned_at;default:NULL"`
|
||||||
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
|
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
|
||||||
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
|
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
|
||||||
|
TotalAttempts int16 `json:"total_attempts" gorm:"column:total_attempts;default:0"`
|
||||||
|
ActiveAttempts int16 `json:"active_attempts" gorm:"column:active_attempts;default:0"`
|
||||||
|
|
||||||
// batch
|
// batch
|
||||||
BatchHash string `json:"batch_hash" gorm:"column:batch_hash;default:NULL"`
|
BatchHash string `json:"batch_hash" gorm:"column:batch_hash;default:NULL"`
|
||||||
@@ -168,19 +170,6 @@ func (o *Chunk) GetAssignedChunks(ctx context.Context) ([]*Chunk, error) {
|
|||||||
return chunks, nil
|
return chunks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CheckIfBatchChunkProofsAreReady checks if all proofs for all chunks of a given batchHash are collected.
|
|
||||||
func (o *Chunk) CheckIfBatchChunkProofsAreReady(ctx context.Context, batchHash string) (bool, error) {
|
|
||||||
db := o.db.WithContext(ctx)
|
|
||||||
db = db.Model(&Chunk{})
|
|
||||||
db = db.Where("batch_hash = ? AND proving_status != ?", batchHash, types.ProvingTaskVerified)
|
|
||||||
|
|
||||||
var count int64
|
|
||||||
if err := db.Count(&count).Error; err != nil {
|
|
||||||
return false, fmt.Errorf("Chunk.CheckIfBatchChunkProofsAreReady error: %w, batch hash: %v", err, batchHash)
|
|
||||||
}
|
|
||||||
return count == 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetChunkBatchHash retrieves the batchHash of a given chunk.
|
// GetChunkBatchHash retrieves the batchHash of a given chunk.
|
||||||
func (o *Chunk) GetChunkBatchHash(ctx context.Context, chunkHash string) (string, error) {
|
func (o *Chunk) GetChunkBatchHash(ctx context.Context, chunkHash string) (string, error) {
|
||||||
db := o.db.WithContext(ctx)
|
db := o.db.WithContext(ctx)
|
||||||
@@ -259,6 +248,8 @@ func (o *Chunk) InsertChunk(ctx context.Context, chunk *types.Chunk, dbTX ...*go
|
|||||||
ParentChunkStateRoot: parentChunkStateRoot,
|
ParentChunkStateRoot: parentChunkStateRoot,
|
||||||
WithdrawRoot: chunk.Blocks[numBlocks-1].WithdrawRoot.Hex(),
|
WithdrawRoot: chunk.Blocks[numBlocks-1].WithdrawRoot.Hex(),
|
||||||
ProvingStatus: int16(types.ProvingTaskUnassigned),
|
ProvingStatus: int16(types.ProvingTaskUnassigned),
|
||||||
|
TotalAttempts: 0,
|
||||||
|
ActiveAttempts: 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
db := o.db
|
db := o.db
|
||||||
@@ -341,32 +332,52 @@ func (o *Chunk) UpdateBatchHashInRange(ctx context.Context, startIndex uint64, e
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateUnassignedChunkReturning update the unassigned batch which end_block_number <= height and return the update record
|
// UpdateChunkAttemptsReturning atomically increments the attempts count for the earliest available chunk that meets the conditions.
|
||||||
func (o *Chunk) UpdateUnassignedChunkReturning(ctx context.Context, height, limit int) ([]*Chunk, error) {
|
func (o *Chunk) UpdateChunkAttemptsReturning(ctx context.Context, proverBlockHeight int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) {
|
||||||
if height <= 0 {
|
db := o.db.WithContext(ctx)
|
||||||
return nil, errors.New("Chunk.UpdateUnassignedBatchReturning error: height must be larger than zero")
|
subQueryDB := db.Model(&Chunk{})
|
||||||
|
subQueryDB = subQueryDB.Select("index")
|
||||||
|
// Lock the selected row to ensure atomic updates
|
||||||
|
subQueryDB = subQueryDB.Clauses(clause.Locking{Strength: "UPDATE"})
|
||||||
|
subQueryDB = subQueryDB.Where("total_attempts < ?", maxTotalAttempts)
|
||||||
|
subQueryDB = subQueryDB.Where("active_attempts < ?", maxActiveAttempts)
|
||||||
|
subQueryDB = subQueryDB.Where("end_block_number <= ?", proverBlockHeight)
|
||||||
|
subQueryDB = subQueryDB.Order("index ASC")
|
||||||
|
subQueryDB = subQueryDB.Limit(1)
|
||||||
|
|
||||||
|
// Perform the update and return the modified chunk
|
||||||
|
var updatedChunk Chunk
|
||||||
|
db = db.Model(&updatedChunk).Clauses(clause.Returning{})
|
||||||
|
db = db.Where("index = (?)", subQueryDB)
|
||||||
|
result := db.Updates(map[string]interface{}{
|
||||||
|
"total_attempts": gorm.Expr("total_attempts + ?", 1),
|
||||||
|
"active_attempts": gorm.Expr("active_attempts + ?", 1),
|
||||||
|
})
|
||||||
|
|
||||||
|
if result.Error != nil {
|
||||||
|
return nil, fmt.Errorf("failed to select and update batch, max active attempts: %v, max total attempts: %v, err: %w",
|
||||||
|
maxActiveAttempts, maxTotalAttempts, result.Error)
|
||||||
}
|
}
|
||||||
if limit < 0 {
|
|
||||||
return nil, errors.New("Chunk.UpdateUnassignedBatchReturning error: limit must not be smaller than zero")
|
if result.RowsAffected == 0 {
|
||||||
}
|
|
||||||
if limit == 0 {
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
return &updatedChunk, nil
|
||||||
db := o.db.WithContext(ctx)
|
}
|
||||||
|
|
||||||
subQueryDB := db.Model(&Chunk{}).Select("index")
|
// DecreaseActiveAttemptsByHash decrements the active_attempts of a chunk given its hash.
|
||||||
subQueryDB = subQueryDB.Where("proving_status = ?", types.ProvingTaskUnassigned)
|
func (o *Chunk) DecreaseActiveAttemptsByHash(ctx context.Context, chunkHash string, dbTX ...*gorm.DB) error {
|
||||||
subQueryDB = subQueryDB.Where("end_block_number <= ?", height)
|
db := o.db
|
||||||
subQueryDB = subQueryDB.Order("index ASC")
|
if len(dbTX) > 0 && dbTX[0] != nil {
|
||||||
subQueryDB = subQueryDB.Limit(limit)
|
db = dbTX[0]
|
||||||
|
}
|
||||||
var chunks []*Chunk
|
db = db.WithContext(ctx)
|
||||||
db = db.Model(&chunks).Clauses(clause.Returning{})
|
db = db.Model(&Chunk{})
|
||||||
db = db.Where("index = (?)", subQueryDB)
|
db = db.Where("hash = ?", chunkHash)
|
||||||
db = db.Where("proving_status = ?", types.ProvingTaskUnassigned)
|
|
||||||
if err := db.Update("proving_status", types.ProvingTaskAssigned).Error; err != nil {
|
if err := db.UpdateColumn("active_attempts", gorm.Expr("active_attempts - ?", 1)).Error; err != nil {
|
||||||
return nil, fmt.Errorf("Chunk.UpdateUnassignedBatchReturning error: %w", err)
|
return fmt.Errorf("Chunk.DecreaseActiveAttemptsByHash error: %w, chunk hash: %v", err, chunkHash)
|
||||||
}
|
}
|
||||||
return chunks, nil
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -94,6 +94,24 @@ func (o *ProverTask) GetProverTasks(ctx context.Context, fields map[string]inter
|
|||||||
return proverTasks, nil
|
return proverTasks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetFailedTaskAssignmentCount returns the number of times a task with the specified TaskID has been assigned and failed.
|
||||||
|
func (o *ProverTask) GetFailedTaskAssignmentCount(ctx context.Context, taskID string, dbTX ...*gorm.DB) (uint64, error) {
|
||||||
|
db := o.db
|
||||||
|
if len(dbTX) > 0 && dbTX[0] != nil {
|
||||||
|
db = dbTX[0]
|
||||||
|
}
|
||||||
|
db = db.WithContext(ctx)
|
||||||
|
db = db.Model(&ProverTask{})
|
||||||
|
db = db.Where("task_id = ?", taskID)
|
||||||
|
db = db.Where("proving_status = ?", types.ProverProofInvalid)
|
||||||
|
|
||||||
|
var count int64
|
||||||
|
if err := db.Count(&count).Error; err != nil {
|
||||||
|
return 0, fmt.Errorf("ProverTask.GetTaskAssignmentCount failed, taskID: %v, err: %w", taskID, err)
|
||||||
|
}
|
||||||
|
return uint64(count), nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetProverTasksByHashes retrieves the ProverTask records associated with the specified hashes.
|
// GetProverTasksByHashes retrieves the ProverTask records associated with the specified hashes.
|
||||||
// The returned prover task objects are sorted in ascending order by their ids.
|
// The returned prover task objects are sorted in ascending order by their ids.
|
||||||
func (o *ProverTask) GetProverTasksByHashes(ctx context.Context, hashes []string) ([]*ProverTask, error) {
|
func (o *ProverTask) GetProverTasksByHashes(ctx context.Context, hashes []string) ([]*ProverTask, error) {
|
||||||
@@ -143,6 +161,20 @@ func (o *ProverTask) GetProvingStatusByTaskID(ctx context.Context, taskID string
|
|||||||
return types.ProverProveStatus(proverTask.ProvingStatus), nil
|
return types.ProverProveStatus(proverTask.ProvingStatus), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetFailureTypeByTaskID retrieves the failure type of a prover task
|
||||||
|
func (o *ProverTask) GetFailureTypeByTaskID(ctx context.Context, taskID string) (types.ProverTaskFailureType, error) {
|
||||||
|
db := o.db.WithContext(ctx)
|
||||||
|
db = db.Model(&ProverTask{})
|
||||||
|
db = db.Select("failure_type")
|
||||||
|
db = db.Where("task_id = ?", taskID)
|
||||||
|
|
||||||
|
var proverTask ProverTask
|
||||||
|
if err := db.Find(&proverTask).Error; err != nil {
|
||||||
|
return types.ProverTaskFailureTypeUndefined, fmt.Errorf("ProverTask.GetFailureTypeByTaskID error: %w, taskID: %v", err, taskID)
|
||||||
|
}
|
||||||
|
return types.ProverTaskFailureType(proverTask.FailureType), nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetTimeoutAssignedProverTasks get the timeout and assigned proving_status prover task
|
// GetTimeoutAssignedProverTasks get the timeout and assigned proving_status prover task
|
||||||
func (o *ProverTask) GetTimeoutAssignedProverTasks(ctx context.Context, limit int, taskType message.ProofType, timeout time.Duration) ([]ProverTask, error) {
|
func (o *ProverTask) GetTimeoutAssignedProverTasks(ctx context.Context, limit int, taskType message.ProofType, timeout time.Duration) ([]ProverTask, error) {
|
||||||
db := o.db.WithContext(ctx)
|
db := o.db.WithContext(ctx)
|
||||||
@@ -162,11 +194,12 @@ func (o *ProverTask) GetTimeoutAssignedProverTasks(ctx context.Context, limit in
|
|||||||
|
|
||||||
// SetProverTask updates or inserts a ProverTask record.
|
// SetProverTask updates or inserts a ProverTask record.
|
||||||
func (o *ProverTask) SetProverTask(ctx context.Context, proverTask *ProverTask, dbTX ...*gorm.DB) error {
|
func (o *ProverTask) SetProverTask(ctx context.Context, proverTask *ProverTask, dbTX ...*gorm.DB) error {
|
||||||
db := o.db.WithContext(ctx)
|
db := o.db
|
||||||
if len(dbTX) > 0 && dbTX[0] != nil {
|
if len(dbTX) > 0 && dbTX[0] != nil {
|
||||||
db = dbTX[0]
|
db = dbTX[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
db = db.WithContext(ctx)
|
||||||
db = db.Model(&ProverTask{})
|
db = db.Model(&ProverTask{})
|
||||||
db = db.Clauses(clause.OnConflict{
|
db = db.Clauses(clause.OnConflict{
|
||||||
Columns: []clause.Column{{Name: "task_type"}, {Name: "task_id"}, {Name: "prover_public_key"}, {Name: "prover_version"}},
|
Columns: []clause.Column{{Name: "task_type"}, {Name: "task_id"}, {Name: "prover_public_key"}, {Name: "prover_version"}},
|
||||||
|
|||||||
@@ -283,7 +283,7 @@ func testInvalidProof(t *testing.T) {
|
|||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
batch, err := batchOrm.InsertBatch(context.Background(), 0, 0, dbChunk.Hash, dbChunk.Hash, []*types.Chunk{chunk})
|
batch, err := batchOrm.InsertBatch(context.Background(), 0, 0, dbChunk.Hash, dbChunk.Hash, []*types.Chunk{chunk})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
err = batchOrm.UpdateChunkProofsStatusByBatchHash(context.Background(), batch.Hash, types.ChunkProofsStatusReady)
|
err = chunkOrm.UpdateBatchHashInRange(context.Background(), 0, 0, batch.Hash)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// create mock provers.
|
// create mock provers.
|
||||||
@@ -292,13 +292,17 @@ func testInvalidProof(t *testing.T) {
|
|||||||
var proofType message.ProofType
|
var proofType message.ProofType
|
||||||
if i%2 == 0 {
|
if i%2 == 0 {
|
||||||
proofType = message.ProofTypeChunk
|
proofType = message.ProofTypeChunk
|
||||||
|
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), coordinatorURL, proofType)
|
||||||
|
proverTask := provers[i].getProverTask(t, proofType)
|
||||||
|
assert.NotNil(t, proverTask)
|
||||||
|
provers[i].submitProof(t, proverTask, verifiedSuccess, types.Success)
|
||||||
} else {
|
} else {
|
||||||
proofType = message.ProofTypeBatch
|
proofType = message.ProofTypeBatch
|
||||||
|
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), coordinatorURL, proofType)
|
||||||
|
proverTask := provers[i].getProverTask(t, proofType)
|
||||||
|
assert.NotNil(t, proverTask)
|
||||||
|
provers[i].submitProof(t, proverTask, verifiedFailed, types.ErrCoordinatorHandleZkProofFailure)
|
||||||
}
|
}
|
||||||
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), coordinatorURL, proofType)
|
|
||||||
proverTask := provers[i].getProverTask(t, proofType)
|
|
||||||
assert.NotNil(t, proverTask)
|
|
||||||
provers[i].submitProof(t, proverTask, verifiedFailed, types.ErrCoordinatorHandleZkProofFailure)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// verify proof status
|
// verify proof status
|
||||||
@@ -307,17 +311,25 @@ func testInvalidProof(t *testing.T) {
|
|||||||
tickStop = time.Tick(time.Minute)
|
tickStop = time.Tick(time.Minute)
|
||||||
)
|
)
|
||||||
|
|
||||||
var chunkProofStatus types.ProvingStatus
|
var (
|
||||||
var batchProofStatus types.ProvingStatus
|
chunkProofStatus types.ProverProveStatus
|
||||||
|
batchProofStatus types.ProverProveStatus
|
||||||
|
batchFailureType types.ProverTaskFailureType
|
||||||
|
)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-tick:
|
case <-tick:
|
||||||
chunkProofStatus, err = chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
|
chunkProofStatus, err = proverTaskOrm.GetProvingStatusByTaskID(context.Background(), dbChunk.Hash)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
batchProofStatus, err = batchOrm.GetProvingStatusByHash(context.Background(), batch.Hash)
|
batchProofStatus, err = proverTaskOrm.GetProvingStatusByTaskID(context.Background(), batch.Hash)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
if chunkProofStatus == types.ProvingTaskFailed && batchProofStatus == types.ProvingTaskFailed {
|
batchFailureType, err = proverTaskOrm.GetFailureTypeByTaskID(context.Background(), batch.Hash)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
if chunkProofStatus == types.ProverProofValid &&
|
||||||
|
batchProofStatus == types.ProverProofInvalid &&
|
||||||
|
batchFailureType == types.ProverTaskFailureTypeVerifiedFailed {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case <-tickStop:
|
case <-tickStop:
|
||||||
@@ -342,20 +354,12 @@ func testProofGeneratedFailed(t *testing.T) {
|
|||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
err = l2BlockOrm.UpdateChunkHashInRange(context.Background(), 0, 100, dbChunk.Hash)
|
err = l2BlockOrm.UpdateChunkHashInRange(context.Background(), 0, 100, dbChunk.Hash)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
batch, err := batchOrm.InsertBatch(context.Background(), 0, 0, dbChunk.Hash, dbChunk.Hash, []*types.Chunk{chunk})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
err = batchOrm.UpdateChunkProofsStatusByBatchHash(context.Background(), batch.Hash, types.ChunkProofsStatusReady)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
// create mock provers.
|
// create mock provers.
|
||||||
provers := make([]*mockProver, 2)
|
provers := make([]*mockProver, 1)
|
||||||
for i := 0; i < len(provers); i++ {
|
for i := 0; i < len(provers); i++ {
|
||||||
var proofType message.ProofType
|
var proofType message.ProofType
|
||||||
if i%2 == 0 {
|
proofType = message.ProofTypeChunk
|
||||||
proofType = message.ProofTypeChunk
|
|
||||||
} else {
|
|
||||||
proofType = message.ProofTypeBatch
|
|
||||||
}
|
|
||||||
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), coordinatorURL, proofType)
|
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), coordinatorURL, proofType)
|
||||||
proverTask := provers[i].getProverTask(t, proofType)
|
proverTask := provers[i].getProverTask(t, proofType)
|
||||||
assert.NotNil(t, proverTask)
|
assert.NotNil(t, proverTask)
|
||||||
@@ -370,9 +374,7 @@ func testProofGeneratedFailed(t *testing.T) {
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
chunkProofStatus types.ProvingStatus
|
chunkProofStatus types.ProvingStatus
|
||||||
batchProofStatus types.ProvingStatus
|
|
||||||
chunkProverTaskProvingStatus types.ProverProveStatus
|
chunkProverTaskProvingStatus types.ProverProveStatus
|
||||||
batchProverTaskProvingStatus types.ProverProveStatus
|
|
||||||
)
|
)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@@ -380,21 +382,17 @@ func testProofGeneratedFailed(t *testing.T) {
|
|||||||
case <-tick:
|
case <-tick:
|
||||||
chunkProofStatus, err = chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
|
chunkProofStatus, err = chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
batchProofStatus, err = batchOrm.GetProvingStatusByHash(context.Background(), batch.Hash)
|
if chunkProofStatus == types.ProvingTaskAssigned {
|
||||||
assert.NoError(t, err)
|
|
||||||
if chunkProofStatus == types.ProvingTaskAssigned && batchProofStatus == types.ProvingTaskAssigned {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
chunkProverTaskProvingStatus, err = proverTaskOrm.GetProvingStatusByTaskID(context.Background(), dbChunk.Hash)
|
chunkProverTaskProvingStatus, err = proverTaskOrm.GetProvingStatusByTaskID(context.Background(), dbChunk.Hash)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
batchProverTaskProvingStatus, err = proverTaskOrm.GetProvingStatusByTaskID(context.Background(), batch.Hash)
|
if chunkProverTaskProvingStatus == types.ProverProofInvalid {
|
||||||
assert.NoError(t, err)
|
|
||||||
if chunkProverTaskProvingStatus == types.ProverProofInvalid && batchProverTaskProvingStatus == types.ProverProofInvalid {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case <-tickStop:
|
case <-tickStop:
|
||||||
t.Error("failed to check proof status", "chunkProofStatus", chunkProofStatus.String(), "batchProofStatus", batchProofStatus.String())
|
t.Error("failed to check proof status", "chunkProofStatus", chunkProofStatus.String())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -414,32 +412,14 @@ func testTimeoutProof(t *testing.T) {
|
|||||||
dbChunk, err := chunkOrm.InsertChunk(context.Background(), chunk)
|
dbChunk, err := chunkOrm.InsertChunk(context.Background(), chunk)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
err = l2BlockOrm.UpdateChunkHashInRange(context.Background(), 0, 100, dbChunk.Hash)
|
err = l2BlockOrm.UpdateChunkHashInRange(context.Background(), 0, 100, dbChunk.Hash)
|
||||||
assert.NoError(t, err)
|
|
||||||
batch, err := batchOrm.InsertBatch(context.Background(), 0, 0, dbChunk.Hash, dbChunk.Hash, []*types.Chunk{chunk})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
err = batchOrm.UpdateChunkProofsStatusByBatchHash(context.Background(), batch.Hash, types.ChunkProofsStatusReady)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
// create first chunk & batch mock prover, that will not send any proof.
|
// create first chunk & batch mock prover, that will not send any proof.
|
||||||
chunkProver1 := newMockProver(t, "prover_test"+strconv.Itoa(0), coordinatorURL, message.ProofTypeChunk)
|
chunkProver1 := newMockProver(t, "prover_test"+strconv.Itoa(0), coordinatorURL, message.ProofTypeChunk)
|
||||||
proverChunkTask := chunkProver1.getProverTask(t, message.ProofTypeChunk)
|
proverChunkTask := chunkProver1.getProverTask(t, message.ProofTypeChunk)
|
||||||
assert.NotNil(t, proverChunkTask)
|
assert.NotNil(t, proverChunkTask)
|
||||||
|
|
||||||
batchProver1 := newMockProver(t, "prover_test"+strconv.Itoa(1), coordinatorURL, message.ProofTypeBatch)
|
|
||||||
proverBatchTask := batchProver1.getProverTask(t, message.ProofTypeBatch)
|
|
||||||
assert.NotNil(t, proverBatchTask)
|
|
||||||
|
|
||||||
// verify proof status, it should be assigned, because prover didn't send any proof
|
|
||||||
chunkProofStatus, err := chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, chunkProofStatus, types.ProvingTaskAssigned)
|
|
||||||
|
|
||||||
batchProofStatus, err := batchOrm.GetProvingStatusByHash(context.Background(), batch.Hash)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, batchProofStatus, types.ProvingTaskAssigned)
|
|
||||||
|
|
||||||
// wait coordinator to reset the prover task proving status
|
// wait coordinator to reset the prover task proving status
|
||||||
time.Sleep(time.Duration(conf.ProverManager.BatchCollectionTimeSec*2) * time.Second)
|
time.Sleep(time.Duration(conf.ProverManager.ChunkCollectionTimeSec*2) * time.Second)
|
||||||
|
|
||||||
// create second mock prover, that will send valid proof.
|
// create second mock prover, that will send valid proof.
|
||||||
chunkProver2 := newMockProver(t, "prover_test"+strconv.Itoa(2), coordinatorURL, message.ProofTypeChunk)
|
chunkProver2 := newMockProver(t, "prover_test"+strconv.Itoa(2), coordinatorURL, message.ProofTypeChunk)
|
||||||
@@ -447,17 +427,8 @@ func testTimeoutProof(t *testing.T) {
|
|||||||
assert.NotNil(t, proverChunkTask2)
|
assert.NotNil(t, proverChunkTask2)
|
||||||
chunkProver2.submitProof(t, proverChunkTask2, verifiedSuccess, types.Success)
|
chunkProver2.submitProof(t, proverChunkTask2, verifiedSuccess, types.Success)
|
||||||
|
|
||||||
batchProver2 := newMockProver(t, "prover_test"+strconv.Itoa(3), coordinatorURL, message.ProofTypeBatch)
|
|
||||||
proverBatchTask2 := batchProver2.getProverTask(t, message.ProofTypeBatch)
|
|
||||||
assert.NotNil(t, proverBatchTask2)
|
|
||||||
batchProver2.submitProof(t, proverBatchTask2, verifiedSuccess, types.Success)
|
|
||||||
|
|
||||||
// verify proof status, it should be verified now, because second prover sent valid proof
|
// verify proof status, it should be verified now, because second prover sent valid proof
|
||||||
chunkProofStatus2, err := chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
|
chunkProofStatus2, err := chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, chunkProofStatus2, types.ProvingTaskVerified)
|
assert.Equal(t, chunkProofStatus2, types.ProvingTaskVerified)
|
||||||
|
|
||||||
batchProofStatus2, err := batchOrm.GetProvingStatusByHash(context.Background(), batch.Hash)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, batchProofStatus2, types.ProvingTaskVerified)
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,6 +24,8 @@ create table chunk
|
|||||||
prover_assigned_at TIMESTAMP(0) DEFAULT NULL,
|
prover_assigned_at TIMESTAMP(0) DEFAULT NULL,
|
||||||
proved_at TIMESTAMP(0) DEFAULT NULL,
|
proved_at TIMESTAMP(0) DEFAULT NULL,
|
||||||
proof_time_sec INTEGER DEFAULT NULL,
|
proof_time_sec INTEGER DEFAULT NULL,
|
||||||
|
total_attempts SMALLINT NOT NULL DEFAULT 0,
|
||||||
|
active_attempts SMALLINT NOT NULL DEFAULT 0,
|
||||||
|
|
||||||
-- batch
|
-- batch
|
||||||
batch_hash VARCHAR DEFAULT NULL,
|
batch_hash VARCHAR DEFAULT NULL,
|
||||||
@@ -50,6 +52,10 @@ on chunk (hash) where deleted_at IS NULL;
|
|||||||
create index batch_hash_index
|
create index batch_hash_index
|
||||||
on chunk (batch_hash) where deleted_at IS NULL;
|
on chunk (batch_hash) where deleted_at IS NULL;
|
||||||
|
|
||||||
|
create index chunk_idx_total_attempts_and_active_attempts_and_end_block_number
|
||||||
|
on chunk (total_attempts, active_attempts, end_block_number)
|
||||||
|
where deleted_at IS NULL;
|
||||||
|
|
||||||
-- +goose StatementEnd
|
-- +goose StatementEnd
|
||||||
|
|
||||||
-- +goose Down
|
-- +goose Down
|
||||||
|
|||||||
@@ -22,6 +22,8 @@ create table batch
|
|||||||
prover_assigned_at TIMESTAMP(0) DEFAULT NULL,
|
prover_assigned_at TIMESTAMP(0) DEFAULT NULL,
|
||||||
proved_at TIMESTAMP(0) DEFAULT NULL,
|
proved_at TIMESTAMP(0) DEFAULT NULL,
|
||||||
proof_time_sec INTEGER DEFAULT NULL,
|
proof_time_sec INTEGER DEFAULT NULL,
|
||||||
|
total_attempts SMALLINT NOT NULL DEFAULT 0,
|
||||||
|
active_attempts SMALLINT NOT NULL DEFAULT 0,
|
||||||
|
|
||||||
-- rollup
|
-- rollup
|
||||||
rollup_status SMALLINT NOT NULL DEFAULT 1,
|
rollup_status SMALLINT NOT NULL DEFAULT 1,
|
||||||
@@ -46,6 +48,10 @@ on batch (index) where deleted_at IS NULL;
|
|||||||
create unique index batch_hash_uindex
|
create unique index batch_hash_uindex
|
||||||
on batch (hash) where deleted_at IS NULL;
|
on batch (hash) where deleted_at IS NULL;
|
||||||
|
|
||||||
|
create index batch_idx_total_attempts_and_active_attempts_and_chunk_proofs_status
|
||||||
|
on batch (total_attempts, active_attempts, chunk_proofs_status)
|
||||||
|
where deleted_at IS NULL;
|
||||||
|
|
||||||
comment
|
comment
|
||||||
on column batch.chunk_proofs_status is 'undefined, pending, ready';
|
on column batch.chunk_proofs_status is 'undefined, pending, ready';
|
||||||
|
|
||||||
|
|||||||
@@ -193,6 +193,7 @@ github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF
|
|||||||
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
|
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
|
||||||
github.com/crate-crypto/go-ipa v0.0.0-20220523130400-f11357ae11c7 h1:6IrxszG5G+O7zhtkWxq6+unVvnrm1fqV2Pe+T95DUzw=
|
github.com/crate-crypto/go-ipa v0.0.0-20220523130400-f11357ae11c7 h1:6IrxszG5G+O7zhtkWxq6+unVvnrm1fqV2Pe+T95DUzw=
|
||||||
github.com/crate-crypto/go-ipa v0.0.0-20220523130400-f11357ae11c7/go.mod h1:gFnFS95y8HstDP6P9pPwzrxOOC5TRDkwbM+ao15ChAI=
|
github.com/crate-crypto/go-ipa v0.0.0-20220523130400-f11357ae11c7/go.mod h1:gFnFS95y8HstDP6P9pPwzrxOOC5TRDkwbM+ao15ChAI=
|
||||||
|
github.com/crate-crypto/go-ipa v0.0.0-20230601170251-1830d0757c80 h1:DuBDHVjgGMPki7bAyh91+3cF1Vh34sAEdH8JQgbc2R0=
|
||||||
github.com/crate-crypto/go-ipa v0.0.0-20230601170251-1830d0757c80/go.mod h1:gzbVz57IDJgQ9rLQwfSk696JGWof8ftznEL9GoAv3NI=
|
github.com/crate-crypto/go-ipa v0.0.0-20230601170251-1830d0757c80/go.mod h1:gzbVz57IDJgQ9rLQwfSk696JGWof8ftznEL9GoAv3NI=
|
||||||
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
|
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
|
||||||
github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
|
github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
|
||||||
@@ -238,6 +239,7 @@ github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf h1:Yt+4K30SdjOkRoRRm3v
|
|||||||
github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf/go.mod h1:R9ET47fwRVRPZnOGvHxxhuZcbrMCuiqOz3Rlrh4KSnk=
|
github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf/go.mod h1:R9ET47fwRVRPZnOGvHxxhuZcbrMCuiqOz3Rlrh4KSnk=
|
||||||
github.com/dop251/goja v0.0.0-20230122112309-96b1610dd4f7 h1:kgvzE5wLsLa7XKfV85VZl40QXaMCaeFtHpPwJ8fhotY=
|
github.com/dop251/goja v0.0.0-20230122112309-96b1610dd4f7 h1:kgvzE5wLsLa7XKfV85VZl40QXaMCaeFtHpPwJ8fhotY=
|
||||||
github.com/dop251/goja v0.0.0-20230122112309-96b1610dd4f7/go.mod h1:yRkwfj0CBpOGre+TwBsqPV0IH0Pk73e4PXJOeNDboGs=
|
github.com/dop251/goja v0.0.0-20230122112309-96b1610dd4f7/go.mod h1:yRkwfj0CBpOGre+TwBsqPV0IH0Pk73e4PXJOeNDboGs=
|
||||||
|
github.com/dop251/goja v0.0.0-20230605162241-28ee0ee714f3 h1:+3HCtB74++ClLy8GgjUQYeC8R4ILzVcIe8+5edAJJnE=
|
||||||
github.com/dop251/goja v0.0.0-20230605162241-28ee0ee714f3/go.mod h1:QMWlm50DNe14hD7t24KEqZuUdC9sOTy8W6XbCU1mlw4=
|
github.com/dop251/goja v0.0.0-20230605162241-28ee0ee714f3/go.mod h1:QMWlm50DNe14hD7t24KEqZuUdC9sOTy8W6XbCU1mlw4=
|
||||||
github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7 h1:tYwu/z8Y0NkkzGEh3z21mSWggMg4LwLRFucLS7TjARg=
|
github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7 h1:tYwu/z8Y0NkkzGEh3z21mSWggMg4LwLRFucLS7TjARg=
|
||||||
github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y=
|
github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y=
|
||||||
@@ -277,6 +279,7 @@ github.com/garslo/gogen v0.0.0-20170306192744-1d203ffc1f61/go.mod h1:Q0X6pkwTILD
|
|||||||
github.com/gavv/httpexpect v2.0.0+incompatible h1:1X9kcRshkSKEjNJJxX9Y9mQ5BRfbxU5kORdjhlA1yX8=
|
github.com/gavv/httpexpect v2.0.0+incompatible h1:1X9kcRshkSKEjNJJxX9Y9mQ5BRfbxU5kORdjhlA1yX8=
|
||||||
github.com/gballet/go-verkle v0.0.0-20220902153445-097bd83b7732 h1:AB7YjNrzlVHsYz06zCULVV2zYCEft82P86dSmtwxKL0=
|
github.com/gballet/go-verkle v0.0.0-20220902153445-097bd83b7732 h1:AB7YjNrzlVHsYz06zCULVV2zYCEft82P86dSmtwxKL0=
|
||||||
github.com/gballet/go-verkle v0.0.0-20220902153445-097bd83b7732/go.mod h1:o/XfIXWi4/GqbQirfRm5uTbXMG5NpqxkxblnbZ+QM9I=
|
github.com/gballet/go-verkle v0.0.0-20220902153445-097bd83b7732/go.mod h1:o/XfIXWi4/GqbQirfRm5uTbXMG5NpqxkxblnbZ+QM9I=
|
||||||
|
github.com/gballet/go-verkle v0.0.0-20230607174250-df487255f46b h1:vMT47RYsrftsHSTQhqXwC3BYflo38OLC3Y4LtXtLyU0=
|
||||||
github.com/gballet/go-verkle v0.0.0-20230607174250-df487255f46b/go.mod h1:CDncRYVRSDqwakm282WEkjfaAj1hxU/v5RXxk5nXOiI=
|
github.com/gballet/go-verkle v0.0.0-20230607174250-df487255f46b/go.mod h1:CDncRYVRSDqwakm282WEkjfaAj1hxU/v5RXxk5nXOiI=
|
||||||
github.com/getkin/kin-openapi v0.61.0 h1:6awGqF5nG5zkVpMsAih1QH4VgzS8phTxECUWIFo7zko=
|
github.com/getkin/kin-openapi v0.61.0 h1:6awGqF5nG5zkVpMsAih1QH4VgzS8phTxECUWIFo7zko=
|
||||||
github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9 h1:r5GgOLGbza2wVHRzK7aAj6lWZjfbAwiu/RDCVOKjRyM=
|
github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9 h1:r5GgOLGbza2wVHRzK7aAj6lWZjfbAwiu/RDCVOKjRyM=
|
||||||
@@ -374,6 +377,7 @@ github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hf
|
|||||||
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99 h1:Ak8CrdlwwXwAZxzS66vgPt4U8yUZX7JwLvVR58FN5jM=
|
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99 h1:Ak8CrdlwwXwAZxzS66vgPt4U8yUZX7JwLvVR58FN5jM=
|
||||||
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
|
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
|
||||||
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
|
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
|
||||||
|
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 h1:4/hN5RUoecvl+RmJRE2YxKWtnnQls6rQjjW5oV7qg2U=
|
||||||
github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg=
|
github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg=
|
||||||
github.com/google/renameio v0.1.0 h1:GOZbcHa3HfsPKPlmyPyN2KEohoMXOhdMbHrvbpl2QaA=
|
github.com/google/renameio v0.1.0 h1:GOZbcHa3HfsPKPlmyPyN2KEohoMXOhdMbHrvbpl2QaA=
|
||||||
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
|
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
|
||||||
@@ -472,6 +476,7 @@ github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQ
|
|||||||
github.com/karalabe/usb v0.0.0-20211005121534-4c5740d64559/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU=
|
github.com/karalabe/usb v0.0.0-20211005121534-4c5740d64559/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU=
|
||||||
github.com/karalabe/usb v0.0.2 h1:M6QQBNxF+CQ8OFvxrT90BA0qBOXymndZnk5q235mFc4=
|
github.com/karalabe/usb v0.0.2 h1:M6QQBNxF+CQ8OFvxrT90BA0qBOXymndZnk5q235mFc4=
|
||||||
github.com/karalabe/usb v0.0.2/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU=
|
github.com/karalabe/usb v0.0.2/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU=
|
||||||
|
github.com/karalabe/usb v0.0.3-0.20230711191512-61db3e06439c h1:AqsttAyEyIEsNz5WLRwuRwjiT5CMDUfLk6cFJDVPebs=
|
||||||
github.com/karalabe/usb v0.0.3-0.20230711191512-61db3e06439c/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU=
|
github.com/karalabe/usb v0.0.3-0.20230711191512-61db3e06439c/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU=
|
||||||
github.com/kataras/blocks v0.0.7 h1:cF3RDY/vxnSRezc7vLFlQFTYXG/yAr1o7WImJuZbzC4=
|
github.com/kataras/blocks v0.0.7 h1:cF3RDY/vxnSRezc7vLFlQFTYXG/yAr1o7WImJuZbzC4=
|
||||||
github.com/kataras/blocks v0.0.7/go.mod h1:UJIU97CluDo0f+zEjbnbkeMRlvYORtmc1304EeyXf4I=
|
github.com/kataras/blocks v0.0.7/go.mod h1:UJIU97CluDo0f+zEjbnbkeMRlvYORtmc1304EeyXf4I=
|
||||||
@@ -711,6 +716,7 @@ go.opentelemetry.io/otel v1.9.0/go.mod h1:np4EoPGzoPs3O67xUVNoPPcmSvsfOxNlNA4F4A
|
|||||||
go.opentelemetry.io/otel/trace v1.9.0 h1:oZaCNJUjWcg60VXWee8lJKlqhPbXAPB51URuR47pQYc=
|
go.opentelemetry.io/otel/trace v1.9.0 h1:oZaCNJUjWcg60VXWee8lJKlqhPbXAPB51URuR47pQYc=
|
||||||
go.opentelemetry.io/otel/trace v1.9.0/go.mod h1:2737Q0MuG8q1uILYm2YYVkAyLtOofiTNGg6VODnOiPo=
|
go.opentelemetry.io/otel/trace v1.9.0/go.mod h1:2737Q0MuG8q1uILYm2YYVkAyLtOofiTNGg6VODnOiPo=
|
||||||
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
|
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
|
||||||
|
go.uber.org/automaxprocs v1.5.2 h1:2LxUOGiR3O6tw8ui5sZa2LAaHnsviZdVOUZw4fvbnME=
|
||||||
go.uber.org/automaxprocs v1.5.2/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
|
go.uber.org/automaxprocs v1.5.2/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
|
||||||
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
|
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
|
||||||
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
|
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
|
||||||
@@ -782,6 +788,7 @@ golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||||||
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/term v0.10.0 h1:3R7pNqamzBraeqj/Tj8qt1aQ2HpmlC+Cx/qL/7hn4/c=
|
golang.org/x/term v0.10.0 h1:3R7pNqamzBraeqj/Tj8qt1aQ2HpmlC+Cx/qL/7hn4/c=
|
||||||
golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o=
|
golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o=
|
||||||
|
golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0=
|
||||||
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
|
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
|
||||||
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
|
|||||||
@@ -37,8 +37,11 @@ func NewCoordinatorClient(cfg *config.CoordinatorConfig, proverName string, priv
|
|||||||
SetBaseURL(cfg.BaseURL).
|
SetBaseURL(cfg.BaseURL).
|
||||||
AddRetryCondition(func(r *resty.Response, _ error) bool {
|
AddRetryCondition(func(r *resty.Response, _ error) bool {
|
||||||
// Check for HTTP 5xx errors, e.g., coordinator is restarting.
|
// Check for HTTP 5xx errors, e.g., coordinator is restarting.
|
||||||
log.Warn("Received unexpected HTTP response. Retrying...", "status code", r.StatusCode())
|
if r.StatusCode() >= http.StatusInternalServerError {
|
||||||
return r.StatusCode() >= http.StatusInternalServerError
|
log.Warn("Received unexpected HTTP response. Retrying...", "status code", r.StatusCode())
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
})
|
})
|
||||||
|
|
||||||
log.Info("successfully initialized prover client",
|
log.Info("successfully initialized prover client",
|
||||||
|
|||||||
Reference in New Issue
Block a user