Compare commits

..

13 Commits

Author SHA1 Message Date
georgehao
06dd3f6100 feat: update 2023-08-17 17:46:00 +08:00
georgehao
aa6c216405 feat: fix bug 2023-08-17 17:08:50 +08:00
colinlyguo
308f8f0502 format 2023-08-17 15:34:34 +08:00
colin
e92e2d54ba Merge branch 'develop' into feat-coordinator-multi-provers-per-task- 2023-08-17 15:29:47 +08:00
colinlyguo
a7870fc19c add ProverTaskFailureTypeValidatedFailed failed type 2023-08-16 17:05:30 +08:00
colinlyguo
5dc77d98d9 tweak 2023-08-16 17:00:50 +08:00
colin
158655dadb Merge branch 'develop' into feat-coordinator-multi-provers-per-task- 2023-08-16 16:50:12 +08:00
colinlyguo
2350802af9 add index 2023-08-16 16:49:08 +08:00
colinlyguo
5e7dbbce71 chore: auto version bump [bot] 2023-08-16 08:31:10 +00:00
colinlyguo
49376db8e3 address comments 2023-08-16 16:30:38 +08:00
colin
360f5ef72b Merge branch 'develop' into feat-coordinator-multi-provers-per-task- 2023-08-16 15:43:31 +08:00
colinlyguo
a88d1428bc address comments 2023-08-16 15:41:34 +08:00
colinlyguo
5f6e50d883 feat(coordinator): supporting multiple provers per task 2023-08-16 13:51:03 +08:00
23 changed files with 362 additions and 404 deletions

View File

@@ -40,6 +40,8 @@ type Batch struct {
ProverAssignedAt *time.Time `json:"prover_assigned_at" gorm:"column:prover_assigned_at;default:NULL"`
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
TotalAttempts int16 `json:"total_attempts" gorm:"column:total_attempts;default:0"`
ActiveAttempts int16 `json:"active_attempts" gorm:"column:active_attempts;default:0"`
// rollup
RollupStatus int16 `json:"rollup_status" gorm:"column:rollup_status;default:1"`
@@ -267,6 +269,8 @@ func (o *Batch) InsertBatch(ctx context.Context, startChunkIndex, endChunkIndex
BatchHeader: batchHeader.Encode(),
ChunkProofsStatus: int16(types.ChunkProofsStatusPending),
ProvingStatus: int16(types.ProvingTaskUnassigned),
TotalAttempts: 0,
ActiveAttempts: 0,
RollupStatus: int16(types.RollupPending),
OracleStatus: int16(types.GasOraclePending),
}

View File

@@ -37,6 +37,8 @@ type Chunk struct {
ProverAssignedAt *time.Time `json:"prover_assigned_at" gorm:"column:prover_assigned_at;default:NULL"`
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
TotalAttempts int16 `json:"total_attempts" gorm:"column:total_attempts;default:0"`
ActiveAttempts int16 `json:"active_attempts" gorm:"column:active_attempts;default:0"`
// batch
BatchHash string `json:"batch_hash" gorm:"column:batch_hash;default:NULL"`
@@ -175,6 +177,8 @@ func (o *Chunk) InsertChunk(ctx context.Context, chunk *types.Chunk, dbTX ...*go
ParentChunkStateRoot: parentChunkStateRoot,
WithdrawRoot: chunk.Blocks[numBlocks-1].WithdrawRoot.Hex(),
ProvingStatus: int16(types.ProvingTaskUnassigned),
TotalAttempts: 0,
ActiveAttempts: 0,
}
db := o.db

View File

@@ -5,6 +5,7 @@ import (
"math/big"
"testing"
"github.com/scroll-tech/go-ethereum/accounts/abi/bind"
"github.com/scroll-tech/go-ethereum/common"
gethTypes "github.com/scroll-tech/go-ethereum/core/types"
"github.com/stretchr/testify/assert"
@@ -12,7 +13,6 @@ import (
"scroll-tech/common/database"
"scroll-tech/common/types"
"scroll-tech/common/types/message"
"scroll-tech/common/utils"
"scroll-tech/bridge/internal/config"
"scroll-tech/bridge/internal/controller/relayer"
@@ -91,22 +91,20 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {
assert.NotEmpty(t, batch.CommitTxHash)
assert.Equal(t, types.RollupCommitting, types.RollupStatus(batch.RollupStatus))
success := utils.TryTimes(30, func() bool {
var receipt *gethTypes.Receipt
receipt, err = l1Client.TransactionReceipt(context.Background(), common.HexToHash(batch.CommitTxHash))
return err == nil && receipt.Status == 1
})
assert.True(t, success)
assert.NoError(t, err)
commitTx, _, err := l1Client.TransactionByHash(context.Background(), common.HexToHash(batch.CommitTxHash))
assert.NoError(t, err)
commitTxReceipt, err := bind.WaitMined(context.Background(), l1Client, commitTx)
assert.NoError(t, err)
assert.Equal(t, len(commitTxReceipt.Logs), 1)
// fetch rollup events
success = utils.TryTimes(30, func() bool {
err = l1Watcher.FetchContractEvent()
assert.NoError(t, err)
var statuses []types.RollupStatus
statuses, err = batchOrm.GetRollupStatusByHashList(context.Background(), []string{batchHash})
return err == nil && len(statuses) == 1 && types.RollupCommitted == statuses[0]
})
assert.True(t, success)
err = l1Watcher.FetchContractEvent()
assert.NoError(t, err)
statuses, err := batchOrm.GetRollupStatusByHashList(context.Background(), []string{batchHash})
assert.NoError(t, err)
assert.Equal(t, 1, len(statuses))
assert.Equal(t, types.RollupCommitted, statuses[0])
// add dummy proof
proof := &message.BatchProof{
@@ -120,7 +118,7 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {
// process committed batch and check status
l2Relayer.ProcessCommittedBatches()
statuses, err := batchOrm.GetRollupStatusByHashList(context.Background(), []string{batchHash})
statuses, err = batchOrm.GetRollupStatusByHashList(context.Background(), []string{batchHash})
assert.NoError(t, err)
assert.Equal(t, 1, len(statuses))
assert.Equal(t, types.RollupFinalizing, statuses[0])
@@ -130,20 +128,17 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {
assert.NotNil(t, batch)
assert.NotEmpty(t, batch.FinalizeTxHash)
success = utils.TryTimes(30, func() bool {
var receipt *gethTypes.Receipt
receipt, err = l1Client.TransactionReceipt(context.Background(), common.HexToHash(batch.FinalizeTxHash))
return err == nil && receipt.Status == 1
})
assert.True(t, success)
finalizeTx, _, err := l1Client.TransactionByHash(context.Background(), common.HexToHash(batch.FinalizeTxHash))
assert.NoError(t, err)
finalizeTxReceipt, err := bind.WaitMined(context.Background(), l1Client, finalizeTx)
assert.NoError(t, err)
assert.Equal(t, len(finalizeTxReceipt.Logs), 1)
// fetch rollup events
success = utils.TryTimes(30, func() bool {
err = l1Watcher.FetchContractEvent()
assert.NoError(t, err)
var statuses []types.RollupStatus
statuses, err = batchOrm.GetRollupStatusByHashList(context.Background(), []string{batchHash})
return err == nil && len(statuses) == 1 && types.RollupFinalized == statuses[0]
})
assert.True(t, success)
err = l1Watcher.FetchContractEvent()
assert.NoError(t, err)
statuses, err = batchOrm.GetRollupStatusByHashList(context.Background(), []string{batchHash})
assert.NoError(t, err)
assert.Equal(t, 1, len(statuses))
assert.Equal(t, types.RollupFinalized, statuses[0])
}

View File

@@ -103,6 +103,10 @@ const (
ProverTaskFailureTypeUndefined ProverTaskFailureType = iota
// ProverTaskFailureTypeTimeout prover task failure of timeout
ProverTaskFailureTypeTimeout
// ProverTaskFailureTypeValidatedFailed prover task failure of validated failed by coordinator
ProverTaskFailureTypeValidatedFailed
// ProverTaskFailureTypeVerifiedFailed prover task failure of verified failed by coordinator
ProverTaskFailureTypeVerifiedFailed
)
func (r ProverTaskFailureType) String() string {
@@ -111,8 +115,12 @@ func (r ProverTaskFailureType) String() string {
return "prover task failure undefined"
case ProverTaskFailureTypeTimeout:
return "prover task failure timeout"
case ProverTaskFailureTypeValidatedFailed:
return "prover task failure validated failed"
case ProverTaskFailureTypeVerifiedFailed:
return "prover task failure verified failed"
default:
return "illegal prover task failure type"
return fmt.Sprintf("illegal prover task failure type (%d)", int32(r))
}
}

View File

@@ -13,18 +13,6 @@ import (
"github.com/scroll-tech/go-ethereum/rlp"
)
// ProofFailureType the proof failure type
type ProofFailureType int
const (
// ProofFailureUndefined the undefined type proof failure type
ProofFailureUndefined ProofFailureType = iota
// ProofFailurePanic proof failure for prover panic
ProofFailurePanic
// ProofFailureNoPanic proof failure for no prover panic
ProofFailureNoPanic
)
// RespStatus represents status code from prover to scroll
type RespStatus uint32

View File

@@ -6,7 +6,7 @@ import (
"strings"
)
var tag = "v4.1.63"
var tag = "v4.1.60"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -64,7 +64,7 @@ func (spc *SubmitProofController) SubmitProof(ctx *gin.Context) {
proofMsg.BatchProof = &tmpBatchProof
}
if err := spc.submitProofReceiverLogic.HandleZkProof(ctx, &proofMsg, spp); err != nil {
if err := spc.submitProofReceiverLogic.HandleZkProof(ctx, &proofMsg); err != nil {
nerr := fmt.Errorf("handle zk proof failure, err:%w", err)
coodinatorType.RenderJSON(ctx, types.ErrCoordinatorHandleZkProofFailure, nerr, nil)
return

View File

@@ -146,13 +146,7 @@ func (c *Collector) timeoutChunkProofTask() {
}
func (c *Collector) check(assignedProverTasks []orm.ProverTask, timeout prometheus.Counter) {
// here not update the block batch proving status failed, because the collector loop will check
// the attempt times. if reach the times, the collector will set the block batch proving status.
for _, assignedProverTask := range assignedProverTasks {
if c.proverTaskOrm.TaskTimeoutMoreThanOnce(c.ctx, assignedProverTask.TaskID) {
log.Warn("Task timeout more than once", "hash", assignedProverTask.TaskID)
}
timeout.Inc()
log.Warn("proof task have reach the timeout", "task id", assignedProverTask.TaskID,
"prover public key", assignedProverTask.ProverPublicKey, "prover name", assignedProverTask.ProverName, "task type", assignedProverTask.TaskType)
@@ -160,26 +154,55 @@ func (c *Collector) check(assignedProverTasks []orm.ProverTask, timeout promethe
// update prover task proving status as ProverProofInvalid
if err := c.proverTaskOrm.UpdateProverTaskProvingStatus(c.ctx, message.ProofType(assignedProverTask.TaskType),
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverProofInvalid, tx); err != nil {
log.Error("update prover task proving status failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
log.Error("update prover task proving status failure",
"hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey,
"prover proving status", types.ProverProofInvalid, "err", err)
return err
}
// update prover task failure type
// update prover task failure type as ProverTaskFailureTypeTimeout
if err := c.proverTaskOrm.UpdateProverTaskFailureType(c.ctx, message.ProofType(assignedProverTask.TaskType),
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverTaskFailureTypeTimeout, tx); err != nil {
log.Error("update prover task failure type failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
log.Error("update prover task failure type failure",
"hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey,
"prover failure type", types.ProverTaskFailureTypeTimeout, "err", err)
return err
}
// update the task to unassigned, let collector restart it
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeChunk {
if err := c.chunkOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskUnassigned, tx); err != nil {
log.Error("update chunk proving status to unassigned to restart it failure", "hash", assignedProverTask.TaskID, "err", err)
if err := c.chunkOrm.DecreaseActiveAttemptsByHash(c.ctx, assignedProverTask.TaskID, tx); err != nil {
log.Error("decrease active attempts of chunk failure", "hash", assignedProverTask.TaskID, "err", err)
return err
}
}
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeBatch {
if err := c.batchOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskUnassigned, tx); err != nil {
log.Error("update batch proving status to unassigned to restart it failure", "hash", assignedProverTask.TaskID, "err", err)
if err := c.batchOrm.DecreaseActiveAttemptsByHash(c.ctx, assignedProverTask.TaskID, tx); err != nil {
log.Error("decrease active attempts of batch failure", "hash", assignedProverTask.TaskID, "err", err)
return err
}
}
var failedAssignmentCount uint64
failedAssignmentCount, err := c.proverTaskOrm.GetFailedTaskAssignmentCount(c.ctx, assignedProverTask.TaskID, tx)
if err != nil {
log.Error("get failed task assignment count failure", "taskID", assignedProverTask.TaskID, "err", err)
return err
}
if failedAssignmentCount >= uint64(c.cfg.ProverManager.SessionAttempts) {
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeChunk {
if err := c.chunkOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskFailed, tx); err != nil {
log.Error("update chunk proving status failure", "hash", assignedProverTask.TaskID,
"status", types.ProvingTaskFailed, "err", err)
return err
}
}
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeBatch {
if err := c.batchOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskFailed, tx); err != nil {
log.Error("update batch proving status failure", "hash", assignedProverTask.TaskID,
"status", types.ProvingTaskFailed, "err", err)
return err
}
}
}
return nil

View File

@@ -24,7 +24,12 @@ import (
// BatchProverTask is prover task implement for batch proof
type BatchProverTask struct {
BaseProverTask
cfg *config.Config
db *gorm.DB
batchOrm *orm.Batch
chunkOrm *orm.Chunk
proverTaskOrm *orm.ProverTask
batchAttemptsExceedTotal prometheus.Counter
batchTaskGetTaskTotal prometheus.Counter
@@ -33,13 +38,11 @@ type BatchProverTask struct {
// NewBatchProverTask new a batch collector
func NewBatchProverTask(cfg *config.Config, db *gorm.DB, reg prometheus.Registerer) *BatchProverTask {
bp := &BatchProverTask{
BaseProverTask: BaseProverTask{
db: db,
cfg: cfg,
chunkOrm: orm.NewChunk(db),
batchOrm: orm.NewBatch(db),
proverTaskOrm: orm.NewProverTask(db),
},
db: db,
cfg: cfg,
chunkOrm: orm.NewChunk(db),
batchOrm: orm.NewBatch(db),
proverTaskOrm: orm.NewProverTask(db),
batchAttemptsExceedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "coordinator_batch_attempts_exceed_total",
Help: "Total number of batch attempts exceed.",
@@ -81,27 +84,17 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
return nil, fmt.Errorf("prover with publicKey %s is already assigned a task", publicKey)
}
batchTasks, err := bp.batchOrm.UpdateUnassignedBatchReturning(ctx, 1)
// load and send a batch task
batchTask, err := bp.batchOrm.UpdateBatchAttemptsReturning(ctx, bp.cfg.ProverManager.ProversPerSession, bp.cfg.ProverManager.SessionAttempts)
if err != nil {
return nil, fmt.Errorf("failed to get unassigned batch proving tasks, error:%w", err)
}
if len(batchTasks) == 0 {
if batchTask == nil {
return nil, nil
}
if len(batchTasks) != 1 {
return nil, fmt.Errorf("get unassigned batch proving task len not 1, batch tasks:%v", batchTasks)
}
batchTask := batchTasks[0]
log.Info("start batch proof generation session", "id", batchTask.Hash, "public key", publicKey, "prover name", proverName)
if !bp.checkAttemptsExceeded(batchTask.Hash, message.ProofTypeBatch) {
bp.batchAttemptsExceedTotal.Inc()
return nil, fmt.Errorf("the batch task id:%s check attempts have reach the maximum", batchTask.Hash)
}
proverTask := orm.ProverTask{
TaskID: batchTask.Hash,
ProverPublicKey: publicKey.(string),

View File

@@ -24,7 +24,12 @@ import (
// ChunkProverTask the chunk prover task
type ChunkProverTask struct {
BaseProverTask
cfg *config.Config
db *gorm.DB
chunkOrm *orm.Chunk
blockOrm *orm.L2Block
proverTaskOrm *orm.ProverTask
chunkAttemptsExceedTotal prometheus.Counter
chunkTaskGetTaskTotal prometheus.Counter
@@ -33,14 +38,11 @@ type ChunkProverTask struct {
// NewChunkProverTask new a chunk prover task
func NewChunkProverTask(cfg *config.Config, db *gorm.DB, reg prometheus.Registerer) *ChunkProverTask {
cp := &ChunkProverTask{
BaseProverTask: BaseProverTask{
db: db,
cfg: cfg,
chunkOrm: orm.NewChunk(db),
blockOrm: orm.NewL2Block(db),
proverTaskOrm: orm.NewProverTask(db),
},
db: db,
cfg: cfg,
chunkOrm: orm.NewChunk(db),
blockOrm: orm.NewL2Block(db),
proverTaskOrm: orm.NewProverTask(db),
chunkAttemptsExceedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "coordinator_chunk_attempts_exceed_total",
Help: "Total number of chunk attempts exceed.",
@@ -82,29 +84,18 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
return nil, fmt.Errorf("prover with publicKey %s is already assigned a task", publicKey)
}
// load and send chunk tasks
chunkTasks, err := cp.chunkOrm.UpdateUnassignedChunkReturning(ctx, getTaskParameter.ProverHeight, 1)
// load and send a chunk task
chunkTask, err := cp.chunkOrm.UpdateChunkAttemptsReturning(ctx, getTaskParameter.ProverHeight, cp.cfg.ProverManager.ProversPerSession, cp.cfg.ProverManager.SessionAttempts)
if err != nil {
return nil, fmt.Errorf("failed to get unassigned chunk proving tasks, error:%w", err)
}
if len(chunkTasks) == 0 {
if chunkTask == nil {
return nil, nil
}
if len(chunkTasks) != 1 {
return nil, fmt.Errorf("get unassigned chunk proving task len not 1, chunk tasks:%v", chunkTasks)
}
chunkTask := chunkTasks[0]
log.Info("start chunk generation session", "id", chunkTask.Hash, "public key", publicKey, "prover name", proverName)
if !cp.checkAttemptsExceeded(chunkTask.Hash, message.ProofTypeChunk) {
cp.chunkAttemptsExceedTotal.Inc()
return nil, fmt.Errorf("chunk proof hash id:%s check attempts have reach the maximum", chunkTask.Hash)
}
proverTask := orm.ProverTask{
TaskID: chunkTask.Hash,
ProverPublicKey: publicKey.(string),

View File

@@ -1,17 +1,8 @@
package provertask
import (
"context"
"github.com/gin-gonic/gin"
"github.com/scroll-tech/go-ethereum/log"
"gorm.io/gorm"
"scroll-tech/common/types"
"scroll-tech/common/types/message"
"scroll-tech/coordinator/internal/config"
"scroll-tech/coordinator/internal/orm"
coordinatorType "scroll-tech/coordinator/internal/types"
)
@@ -19,53 +10,3 @@ import (
type ProverTask interface {
Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error)
}
// BaseProverTask a base prover task which contain series functions
type BaseProverTask struct {
cfg *config.Config
ctx context.Context
db *gorm.DB
batchOrm *orm.Batch
chunkOrm *orm.Chunk
blockOrm *orm.L2Block
proverTaskOrm *orm.ProverTask
}
// checkAttempts use the count of prover task info to check the attempts
func (b *BaseProverTask) checkAttemptsExceeded(hash string, taskType message.ProofType) bool {
whereFields := make(map[string]interface{})
whereFields["task_id"] = hash
whereFields["task_type"] = int16(taskType)
proverTasks, err := b.proverTaskOrm.GetProverTasks(b.ctx, whereFields, nil, 0, 0)
if err != nil {
log.Error("get prover task error", "hash id", hash, "error", err)
return true
}
if len(proverTasks) >= int(b.cfg.ProverManager.SessionAttempts) {
log.Warn("proof generation prover task reach the max attempts", "hash", hash)
transErr := b.db.Transaction(func(tx *gorm.DB) error {
switch message.ProofType(proverTasks[0].TaskType) {
case message.ProofTypeChunk:
if err := b.chunkOrm.UpdateProvingStatus(b.ctx, hash, types.ProvingTaskFailed, tx); err != nil {
log.Error("failed to update chunk proving_status as failed", "msg.ID", hash, "error", err)
}
case message.ProofTypeBatch:
if err := b.batchOrm.UpdateProvingStatus(b.ctx, hash, types.ProvingTaskFailed, tx); err != nil {
log.Error("failed to update batch proving_status as failed", "msg.ID", hash, "error", err)
}
}
// update the prover task status to let timeout checker don't check it.
if err := b.proverTaskOrm.UpdateAllProverTaskProvingStatusOfTaskID(b.ctx, message.ProofType(proverTasks[0].TaskType), hash, types.ProverProofInvalid, tx); err != nil {
log.Error("failed to update prover task proving_status as failed", "msg.ID", hash, "error", err)
}
return nil
})
if transErr == nil {
return false
}
}
return true
}

View File

@@ -121,7 +121,7 @@ func NewSubmitProofReceiverLogic(cfg *config.ProverManager, db *gorm.DB, reg pro
// HandleZkProof handle a ZkProof submitted from a prover.
// For now only proving/verifying error will lead to setting status as skipped.
// db/unmarshal errors will not because they are errors on the business logic side.
func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.ProofMsg, proofParameter coordinatorType.SubmitProofParameter) error {
func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.ProofMsg) error {
m.proofReceivedTotal.Inc()
pk := ctx.GetString(coordinatorType.PublicKey)
if len(pk) == 0 {
@@ -144,7 +144,8 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
log.Info("handling zk proof", "proof id", proofMsg.ID, "prover name", proverTask.ProverName,
"prover pk", pk, "prove type", proverTask.TaskType, "proof time", proofTimeSec)
if err = m.validator(ctx, proverTask, pk, proofMsg, proofParameter); err != nil {
if err = m.validator(ctx, proverTask, pk, proofMsg); err != nil {
m.proofFailure(ctx, proverTask, types.ProverTaskFailureTypeValidatedFailed)
return err
}
@@ -161,7 +162,8 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
if verifyErr != nil || !success {
m.verifierFailureTotal.WithLabelValues(proverVersion).Inc()
m.proofRecover(ctx, proofMsg.ID, pk, proofMsg)
m.proofFailure(ctx, proverTask, types.ProverTaskFailureTypeVerifiedFailed)
log.Info("proof verified by coordinator failed", "proof id", proofMsg.ID, "prover name", proverTask.ProverName,
"prover pk", pk, "prove type", proofMsg.Type, "proof time", proofTimeSec, "error", verifyErr)
@@ -186,26 +188,7 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
return nil
}
func (m *ProofReceiverLogic) checkAreAllChunkProofsReady(ctx context.Context, chunkHash string) error {
batchHash, err := m.chunkOrm.GetChunkBatchHash(ctx, chunkHash)
if err != nil {
return err
}
allReady, err := m.chunkOrm.CheckIfBatchChunkProofsAreReady(ctx, batchHash)
if err != nil {
return err
}
if allReady {
err := m.batchOrm.UpdateChunkProofsStatusByBatchHash(ctx, batchHash, types.ChunkProofsStatusReady)
if err != nil {
return err
}
}
return nil
}
func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.ProverTask, pk string, proofMsg *message.ProofMsg, proofParameter coordinatorType.SubmitProofParameter) (err error) {
func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.ProverTask, pk string, proofMsg *message.ProofMsg) (err error) {
defer func() {
if err != nil {
m.validateFailureTotal.Inc()
@@ -232,12 +215,14 @@ func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.Prov
proofTimeSec := uint64(proofTime.Seconds())
if proofMsg.Status != message.StatusOk {
m.proofRecover(ctx, proofMsg.ID, pk, proofMsg)
m.validateFailureProverTaskStatusNotOk.Inc()
log.Info("proof generated by prover failed",
"prove type", proofMsg.Type, "proof id", proofMsg.ID,
"prover name", proverTask.ProverName, "prover version", proverTask.ProverVersion,
"prover pk", "failure type", proofParameter.FailureType, "failure message", proofParameter.FailureMsg)
log.Info("proof generated by prover failed", "proof id", proofMsg.ID, "prover name", proverTask.ProverName,
"prover pk", pk, "prove type", proofMsg.Type, "error", proofMsg.Error)
if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatus(ctx, proofMsg.Type, proofMsg.ID, pk, types.ProverProofInvalid); updateErr != nil {
log.Error("proof generated by prover failed update prover task proving status failure", "proof id", proofMsg.ID,
"prover name", proverTask.ProverName, "prover pk", pk, "prove type", proofMsg.Type, "error", proofMsg.Error)
}
return ErrValidatorFailureProofMsgStatusNotOk
}
@@ -265,14 +250,42 @@ func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.Prov
return nil
}
//func (m *ProofReceiverLogic) proofFailure(ctx context.Context, hash string, pubKey string, proofMsg *message.ProofMsg) {
// log.Info("proof failure update proof status", "hash", hash, "public key", pubKey,
// "proof type", proofMsg.Type.String(), "status", types.ProvingTaskFailed.String())
//
// if err := m.updateProofStatus(ctx, hash, pubKey, proofMsg, types.ProvingTaskFailed, 0); err != nil {
// log.Error("failed to updated proof status ProvingTaskFailed", "hash", hash, "pubKey", pubKey, "error", err)
// }
//}
func (m *ProofReceiverLogic) proofFailure(ctx context.Context, proverTask *orm.ProverTask, failedType types.ProverTaskFailureType) {
err := m.db.Transaction(func(tx *gorm.DB) error {
// update prover task proving status as ProverProofInvalid
if err := m.proverTaskOrm.UpdateProverTaskProvingStatus(ctx, message.ProofType(proverTask.TaskType),
proverTask.TaskID, proverTask.ProverPublicKey, types.ProverProofInvalid); err != nil {
log.Error("update prover task proving status failure", "hash", proverTask.TaskID, "pubKey", proverTask.ProverPublicKey,
"prover proving status", types.ProverProofInvalid, "err", err)
return err
}
// update prover task failure type as ProverTaskFailureTypeVerifiedFailed
if err := m.proverTaskOrm.UpdateProverTaskFailureType(ctx, message.ProofType(proverTask.TaskType),
proverTask.TaskID, proverTask.ProverPublicKey, failedType, tx); err != nil {
log.Error("update prover task failure type failure", "hash", proverTask.TaskID, "pubKey", proverTask.ProverPublicKey,
"prover failure type", failedType, "err", err)
return err
}
if message.ProofType(proverTask.TaskType) == message.ProofTypeChunk {
if err := m.chunkOrm.DecreaseActiveAttemptsByHash(ctx, proverTask.TaskID, tx); err != nil {
log.Error("decrease active attempts of chunk failure", "hash", proverTask.TaskID, "err", err)
return err
}
}
if message.ProofType(proverTask.TaskType) == message.ProofTypeBatch {
if err := m.batchOrm.DecreaseActiveAttemptsByHash(ctx, proverTask.TaskID, tx); err != nil {
log.Error("decrease active attempts of batch failure", "hash", proverTask.TaskID, "err", err)
return err
}
}
return nil
})
if err != nil {
log.Error("failed to update proof status as failed", "error", err)
}
}
func (m *ProofReceiverLogic) proofRecover(ctx context.Context, hash string, pubKey string, proofMsg *message.ProofMsg) {
log.Info("proof recover update proof status", "hash", hash, "public key", pubKey,
@@ -332,30 +345,18 @@ func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string,
switch proofMsg.Type {
case message.ProofTypeChunk:
if err := m.chunkOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil {
log.Error("failed to update chunk proving_status as failed", "msg.ID", hash, "error", err)
log.Error("failed to update chunk proving_status", "msg.ID", hash, "error", err)
return err
}
case message.ProofTypeBatch:
if err := m.batchOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil {
log.Error("failed to update batch proving_status as failed", "msg.ID", hash, "error", err)
log.Error("failed to update batch proving_status", "msg.ID", hash, "error", err)
return err
}
}
return nil
})
if err != nil {
return err
}
if status == types.ProvingTaskVerified && proofMsg.Type == message.ProofTypeChunk {
if checkReadyErr := m.checkAreAllChunkProofsReady(ctx, hash); checkReadyErr != nil {
log.Error("failed to check are all chunk proofs ready", "error", checkReadyErr)
return checkReadyErr
}
}
return nil
return err
}
func (m *ProofReceiverLogic) checkIsTaskSuccess(ctx context.Context, hash string, proofType message.ProofType) bool {

View File

@@ -41,6 +41,8 @@ type Batch struct {
ProverAssignedAt *time.Time `json:"prover_assigned_at" gorm:"column:prover_assigned_at;default:NULL"`
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
TotalAttempts int16 `json:"total_attempts" gorm:"column:total_attempts;default:0"`
ActiveAttempts int16 `json:"active_attempts" gorm:"column:active_attempts;default:0"`
// rollup
RollupStatus int16 `json:"rollup_status" gorm:"column:rollup_status;default:1"`
@@ -191,6 +193,8 @@ func (o *Batch) InsertBatch(ctx context.Context, startChunkIndex, endChunkIndex
BatchHeader: batchHeader.Encode(),
ChunkProofsStatus: int16(types.ChunkProofsStatusPending),
ProvingStatus: int16(types.ProvingTaskUnassigned),
TotalAttempts: 0,
ActiveAttempts: 0,
RollupStatus: int16(types.RollupPending),
OracleStatus: int16(types.GasOraclePending),
}
@@ -209,19 +213,6 @@ func (o *Batch) InsertBatch(ctx context.Context, startChunkIndex, endChunkIndex
return &newBatch, nil
}
// UpdateChunkProofsStatusByBatchHash updates the status of chunk_proofs_status field for a given batch hash.
// The function will set the chunk_proofs_status to the status provided.
func (o *Batch) UpdateChunkProofsStatusByBatchHash(ctx context.Context, batchHash string, status types.ChunkProofsStatus) error {
db := o.db.WithContext(ctx)
db = db.Model(&Batch{})
db = db.Where("hash = ?", batchHash)
if err := db.Update("chunk_proofs_status", status).Error; err != nil {
return fmt.Errorf("Batch.UpdateChunkProofsStatusByBatchHash error: %w, batch hash: %v, status: %v", err, batchHash, status.String())
}
return nil
}
// UpdateProvingStatus updates the proving status of a batch.
func (o *Batch) UpdateProvingStatus(ctx context.Context, hash string, status types.ProvingStatus, dbTX ...*gorm.DB) error {
db := o.db
@@ -275,28 +266,52 @@ func (o *Batch) UpdateProofByHash(ctx context.Context, hash string, proof *messa
return nil
}
// UpdateUnassignedBatchReturning update the unassigned batch and return the update record
func (o *Batch) UpdateUnassignedBatchReturning(ctx context.Context, limit int) ([]*Batch, error) {
if limit < 0 {
return nil, errors.New("limit must not be smaller than zero")
// UpdateBatchAttemptsReturning atomically increments the attempts count for the earliest available batch that meets the conditions.
func (o *Batch) UpdateBatchAttemptsReturning(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
db := o.db.WithContext(ctx)
subQueryDB := db.Model(&Batch{})
subQueryDB = subQueryDB.Select("batch.index")
// Lock the selected row to ensure atomic updates
subQueryDB = subQueryDB.Clauses(clause.Locking{Strength: "UPDATE"})
subQueryDB = subQueryDB.Where("batch.total_attempts < ?", maxTotalAttempts)
subQueryDB = subQueryDB.Where("batch.active_attempts < ?", maxActiveAttempts)
subQueryDB = subQueryDB.Joins("JOIN chunk ON chunk.batch_hash = batch.hash AND chunk.proving_status = ?", types.ProvingTaskVerified)
subQueryDB = subQueryDB.Order("index ASC")
subQueryDB = subQueryDB.Limit(1)
// Perform the update and return the modified batch
var updatedBatch Batch
db = db.Model(&updatedBatch).Clauses(clause.Returning{})
db = db.Where("index = (?)", subQueryDB)
result := db.Updates(map[string]interface{}{
"total_attempts": gorm.Expr("total_attempts + ?", 1),
"active_attempts": gorm.Expr("active_attempts + ?", 1),
})
if result.Error != nil {
return nil, fmt.Errorf("failed to select and update batch, max active attempts: %v, max total attempts: %v, err: %w",
maxActiveAttempts, maxTotalAttempts, result.Error)
}
if limit == 0 {
if result.RowsAffected == 0 {
return nil, nil
}
db := o.db.WithContext(ctx)
subQueryDB := db.Model(&Batch{}).Select("index")
subQueryDB = subQueryDB.Where("proving_status = ? AND chunk_proofs_status = ?", types.ProvingTaskUnassigned, types.ChunkProofsStatusReady)
subQueryDB = subQueryDB.Order("index ASC")
subQueryDB = subQueryDB.Limit(limit)
var batches []*Batch
db = db.Model(&batches).Clauses(clause.Returning{})
db = db.Where("index = (?)", subQueryDB)
db = db.Where("proving_status = ?", types.ProvingTaskUnassigned)
if err := db.Update("proving_status", types.ProvingTaskAssigned).Error; err != nil {
return nil, fmt.Errorf("Batch.UpdateUnassignedBatchReturning error: %w", err)
}
return batches, nil
return &updatedBatch, nil
}
// DecreaseActiveAttemptsByHash decrements the active_attempts of a batch given its hash.
func (o *Batch) DecreaseActiveAttemptsByHash(ctx context.Context, batchHash string, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&Batch{})
db = db.Where("hash = ?", batchHash)
if err := db.UpdateColumn("active_attempts", gorm.Expr("active_attempts - ?", 1)).Error; err != nil {
return fmt.Errorf("Batch.DecreaseActiveAttemptsByHash error: %w, batch hash: %v", err, batchHash)
}
return nil
}

View File

@@ -40,6 +40,8 @@ type Chunk struct {
ProverAssignedAt *time.Time `json:"prover_assigned_at" gorm:"column:prover_assigned_at;default:NULL"`
ProvedAt *time.Time `json:"proved_at" gorm:"column:proved_at;default:NULL"`
ProofTimeSec int32 `json:"proof_time_sec" gorm:"column:proof_time_sec;default:NULL"`
TotalAttempts int16 `json:"total_attempts" gorm:"column:total_attempts;default:0"`
ActiveAttempts int16 `json:"active_attempts" gorm:"column:active_attempts;default:0"`
// batch
BatchHash string `json:"batch_hash" gorm:"column:batch_hash;default:NULL"`
@@ -168,19 +170,6 @@ func (o *Chunk) GetAssignedChunks(ctx context.Context) ([]*Chunk, error) {
return chunks, nil
}
// CheckIfBatchChunkProofsAreReady checks if all proofs for all chunks of a given batchHash are collected.
func (o *Chunk) CheckIfBatchChunkProofsAreReady(ctx context.Context, batchHash string) (bool, error) {
db := o.db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("batch_hash = ? AND proving_status != ?", batchHash, types.ProvingTaskVerified)
var count int64
if err := db.Count(&count).Error; err != nil {
return false, fmt.Errorf("Chunk.CheckIfBatchChunkProofsAreReady error: %w, batch hash: %v", err, batchHash)
}
return count == 0, nil
}
// GetChunkBatchHash retrieves the batchHash of a given chunk.
func (o *Chunk) GetChunkBatchHash(ctx context.Context, chunkHash string) (string, error) {
db := o.db.WithContext(ctx)
@@ -259,6 +248,8 @@ func (o *Chunk) InsertChunk(ctx context.Context, chunk *types.Chunk, dbTX ...*go
ParentChunkStateRoot: parentChunkStateRoot,
WithdrawRoot: chunk.Blocks[numBlocks-1].WithdrawRoot.Hex(),
ProvingStatus: int16(types.ProvingTaskUnassigned),
TotalAttempts: 0,
ActiveAttempts: 0,
}
db := o.db
@@ -341,32 +332,52 @@ func (o *Chunk) UpdateBatchHashInRange(ctx context.Context, startIndex uint64, e
return nil
}
// UpdateUnassignedChunkReturning update the unassigned batch which end_block_number <= height and return the update record
func (o *Chunk) UpdateUnassignedChunkReturning(ctx context.Context, height, limit int) ([]*Chunk, error) {
if height <= 0 {
return nil, errors.New("Chunk.UpdateUnassignedBatchReturning error: height must be larger than zero")
// UpdateChunkAttemptsReturning atomically increments the attempts count for the earliest available chunk that meets the conditions.
func (o *Chunk) UpdateChunkAttemptsReturning(ctx context.Context, proverBlockHeight int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) {
db := o.db.WithContext(ctx)
subQueryDB := db.Model(&Chunk{})
subQueryDB = subQueryDB.Select("index")
// Lock the selected row to ensure atomic updates
subQueryDB = subQueryDB.Clauses(clause.Locking{Strength: "UPDATE"})
subQueryDB = subQueryDB.Where("total_attempts < ?", maxTotalAttempts)
subQueryDB = subQueryDB.Where("active_attempts < ?", maxActiveAttempts)
subQueryDB = subQueryDB.Where("end_block_number <= ?", proverBlockHeight)
subQueryDB = subQueryDB.Order("index ASC")
subQueryDB = subQueryDB.Limit(1)
// Perform the update and return the modified chunk
var updatedChunk Chunk
db = db.Model(&updatedChunk).Clauses(clause.Returning{})
db = db.Where("index = (?)", subQueryDB)
result := db.Updates(map[string]interface{}{
"total_attempts": gorm.Expr("total_attempts + ?", 1),
"active_attempts": gorm.Expr("active_attempts + ?", 1),
})
if result.Error != nil {
return nil, fmt.Errorf("failed to select and update batch, max active attempts: %v, max total attempts: %v, err: %w",
maxActiveAttempts, maxTotalAttempts, result.Error)
}
if limit < 0 {
return nil, errors.New("Chunk.UpdateUnassignedBatchReturning error: limit must not be smaller than zero")
}
if limit == 0 {
if result.RowsAffected == 0 {
return nil, nil
}
db := o.db.WithContext(ctx)
subQueryDB := db.Model(&Chunk{}).Select("index")
subQueryDB = subQueryDB.Where("proving_status = ?", types.ProvingTaskUnassigned)
subQueryDB = subQueryDB.Where("end_block_number <= ?", height)
subQueryDB = subQueryDB.Order("index ASC")
subQueryDB = subQueryDB.Limit(limit)
var chunks []*Chunk
db = db.Model(&chunks).Clauses(clause.Returning{})
db = db.Where("index = (?)", subQueryDB)
db = db.Where("proving_status = ?", types.ProvingTaskUnassigned)
if err := db.Update("proving_status", types.ProvingTaskAssigned).Error; err != nil {
return nil, fmt.Errorf("Chunk.UpdateUnassignedBatchReturning error: %w", err)
}
return chunks, nil
return &updatedChunk, nil
}
// DecreaseActiveAttemptsByHash decrements the active_attempts of a chunk given its hash.
func (o *Chunk) DecreaseActiveAttemptsByHash(ctx context.Context, chunkHash string, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("hash = ?", chunkHash)
if err := db.UpdateColumn("active_attempts", gorm.Expr("active_attempts - ?", 1)).Error; err != nil {
return fmt.Errorf("Chunk.DecreaseActiveAttemptsByHash error: %w, chunk hash: %v", err, chunkHash)
}
return nil
}

View File

@@ -94,6 +94,24 @@ func (o *ProverTask) GetProverTasks(ctx context.Context, fields map[string]inter
return proverTasks, nil
}
// GetFailedTaskAssignmentCount returns the number of times a task with the specified TaskID has been assigned and failed.
func (o *ProverTask) GetFailedTaskAssignmentCount(ctx context.Context, taskID string, dbTX ...*gorm.DB) (uint64, error) {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("task_id = ?", taskID)
db = db.Where("proving_status = ?", types.ProverProofInvalid)
var count int64
if err := db.Count(&count).Error; err != nil {
return 0, fmt.Errorf("ProverTask.GetTaskAssignmentCount failed, taskID: %v, err: %w", taskID, err)
}
return uint64(count), nil
}
// GetProverTasksByHashes retrieves the ProverTask records associated with the specified hashes.
// The returned prover task objects are sorted in ascending order by their ids.
func (o *ProverTask) GetProverTasksByHashes(ctx context.Context, hashes []string) ([]*ProverTask, error) {
@@ -143,6 +161,20 @@ func (o *ProverTask) GetProvingStatusByTaskID(ctx context.Context, taskID string
return types.ProverProveStatus(proverTask.ProvingStatus), nil
}
// GetFailureTypeByTaskID retrieves the failure type of a prover task
func (o *ProverTask) GetFailureTypeByTaskID(ctx context.Context, taskID string) (types.ProverTaskFailureType, error) {
db := o.db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Select("failure_type")
db = db.Where("task_id = ?", taskID)
var proverTask ProverTask
if err := db.Find(&proverTask).Error; err != nil {
return types.ProverTaskFailureTypeUndefined, fmt.Errorf("ProverTask.GetFailureTypeByTaskID error: %w, taskID: %v", err, taskID)
}
return types.ProverTaskFailureType(proverTask.FailureType), nil
}
// GetTimeoutAssignedProverTasks get the timeout and assigned proving_status prover task
func (o *ProverTask) GetTimeoutAssignedProverTasks(ctx context.Context, limit int, taskType message.ProofType, timeout time.Duration) ([]ProverTask, error) {
db := o.db.WithContext(ctx)
@@ -160,32 +192,14 @@ func (o *ProverTask) GetTimeoutAssignedProverTasks(ctx context.Context, limit in
return proverTasks, nil
}
// TaskTimeoutMoreThanOnce get the timeout twice task. a temp design
func (o *ProverTask) TaskTimeoutMoreThanOnce(ctx context.Context, taskID string) bool {
db := o.db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("task_id", taskID)
db = db.Where("proving_status", int(types.ProverProofInvalid))
var count int64
if err := db.Count(&count).Error; err != nil {
return true
}
if count >= 1 {
return true
}
return false
}
// SetProverTask updates or inserts a ProverTask record.
func (o *ProverTask) SetProverTask(ctx context.Context, proverTask *ProverTask, dbTX ...*gorm.DB) error {
db := o.db.WithContext(ctx)
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "task_type"}, {Name: "task_id"}, {Name: "prover_public_key"}, {Name: "prover_version"}},

View File

@@ -2,10 +2,8 @@ package types
// SubmitProofParameter the SubmitProof api request parameter
type SubmitProofParameter struct {
TaskID string `form:"task_id" json:"task_id" binding:"required"`
TaskType int `form:"task_type" json:"task_type" binding:"required"`
Status int `form:"status" json:"status"`
Proof string `form:"proof" json:"proof"`
FailureType int `form:"failure_type" json:"failure_type"`
FailureMsg string `form:"failure_msg" json:"failure_msg"`
TaskID string `form:"task_id" json:"task_id" binding:"required"`
TaskType int `form:"task_type" json:"task_type" binding:"required"`
Status int `form:"status" json:"status"`
Proof string `form:"proof" json:"proof"`
}

View File

@@ -283,7 +283,7 @@ func testInvalidProof(t *testing.T) {
assert.NoError(t, err)
batch, err := batchOrm.InsertBatch(context.Background(), 0, 0, dbChunk.Hash, dbChunk.Hash, []*types.Chunk{chunk})
assert.NoError(t, err)
err = batchOrm.UpdateChunkProofsStatusByBatchHash(context.Background(), batch.Hash, types.ChunkProofsStatusReady)
err = chunkOrm.UpdateBatchHashInRange(context.Background(), 0, 0, batch.Hash)
assert.NoError(t, err)
// create mock provers.
@@ -292,13 +292,17 @@ func testInvalidProof(t *testing.T) {
var proofType message.ProofType
if i%2 == 0 {
proofType = message.ProofTypeChunk
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), coordinatorURL, proofType)
proverTask := provers[i].getProverTask(t, proofType)
assert.NotNil(t, proverTask)
provers[i].submitProof(t, proverTask, verifiedSuccess, types.Success)
} else {
proofType = message.ProofTypeBatch
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), coordinatorURL, proofType)
proverTask := provers[i].getProverTask(t, proofType)
assert.NotNil(t, proverTask)
provers[i].submitProof(t, proverTask, verifiedFailed, types.ErrCoordinatorHandleZkProofFailure)
}
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), coordinatorURL, proofType)
proverTask := provers[i].getProverTask(t, proofType)
assert.NotNil(t, proverTask)
provers[i].submitProof(t, proverTask, verifiedFailed, types.ErrCoordinatorHandleZkProofFailure)
}
// verify proof status
@@ -307,17 +311,25 @@ func testInvalidProof(t *testing.T) {
tickStop = time.Tick(time.Minute)
)
var chunkProofStatus types.ProvingStatus
var batchProofStatus types.ProvingStatus
var (
chunkProofStatus types.ProverProveStatus
batchProofStatus types.ProverProveStatus
batchFailureType types.ProverTaskFailureType
)
for {
select {
case <-tick:
chunkProofStatus, err = chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
chunkProofStatus, err = proverTaskOrm.GetProvingStatusByTaskID(context.Background(), dbChunk.Hash)
assert.NoError(t, err)
batchProofStatus, err = batchOrm.GetProvingStatusByHash(context.Background(), batch.Hash)
batchProofStatus, err = proverTaskOrm.GetProvingStatusByTaskID(context.Background(), batch.Hash)
assert.NoError(t, err)
if chunkProofStatus == types.ProvingTaskUnassigned && batchProofStatus == types.ProvingTaskUnassigned {
batchFailureType, err = proverTaskOrm.GetFailureTypeByTaskID(context.Background(), batch.Hash)
assert.NoError(t, err)
if chunkProofStatus == types.ProverProofValid &&
batchProofStatus == types.ProverProofInvalid &&
batchFailureType == types.ProverTaskFailureTypeVerifiedFailed {
return
}
case <-tickStop:
@@ -342,20 +354,12 @@ func testProofGeneratedFailed(t *testing.T) {
assert.NoError(t, err)
err = l2BlockOrm.UpdateChunkHashInRange(context.Background(), 0, 100, dbChunk.Hash)
assert.NoError(t, err)
batch, err := batchOrm.InsertBatch(context.Background(), 0, 0, dbChunk.Hash, dbChunk.Hash, []*types.Chunk{chunk})
assert.NoError(t, err)
err = batchOrm.UpdateChunkProofsStatusByBatchHash(context.Background(), batch.Hash, types.ChunkProofsStatusReady)
assert.NoError(t, err)
// create mock provers.
provers := make([]*mockProver, 2)
provers := make([]*mockProver, 1)
for i := 0; i < len(provers); i++ {
var proofType message.ProofType
if i%2 == 0 {
proofType = message.ProofTypeChunk
} else {
proofType = message.ProofTypeBatch
}
proofType = message.ProofTypeChunk
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), coordinatorURL, proofType)
proverTask := provers[i].getProverTask(t, proofType)
assert.NotNil(t, proverTask)
@@ -370,9 +374,7 @@ func testProofGeneratedFailed(t *testing.T) {
var (
chunkProofStatus types.ProvingStatus
batchProofStatus types.ProvingStatus
chunkProverTaskProvingStatus types.ProverProveStatus
batchProverTaskProvingStatus types.ProverProveStatus
)
for {
@@ -380,21 +382,17 @@ func testProofGeneratedFailed(t *testing.T) {
case <-tick:
chunkProofStatus, err = chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
assert.NoError(t, err)
batchProofStatus, err = batchOrm.GetProvingStatusByHash(context.Background(), batch.Hash)
assert.NoError(t, err)
if chunkProofStatus == types.ProvingTaskAssigned && batchProofStatus == types.ProvingTaskAssigned {
if chunkProofStatus == types.ProvingTaskAssigned {
return
}
chunkProverTaskProvingStatus, err = proverTaskOrm.GetProvingStatusByTaskID(context.Background(), dbChunk.Hash)
assert.NoError(t, err)
batchProverTaskProvingStatus, err = proverTaskOrm.GetProvingStatusByTaskID(context.Background(), batch.Hash)
assert.NoError(t, err)
if chunkProverTaskProvingStatus == types.ProverProofInvalid && batchProverTaskProvingStatus == types.ProverProofInvalid {
if chunkProverTaskProvingStatus == types.ProverProofInvalid {
return
}
case <-tickStop:
t.Error("failed to check proof status", "chunkProofStatus", chunkProofStatus.String(), "batchProofStatus", batchProofStatus.String())
t.Error("failed to check proof status", "chunkProofStatus", chunkProofStatus.String())
return
}
}
@@ -414,32 +412,14 @@ func testTimeoutProof(t *testing.T) {
dbChunk, err := chunkOrm.InsertChunk(context.Background(), chunk)
assert.NoError(t, err)
err = l2BlockOrm.UpdateChunkHashInRange(context.Background(), 0, 100, dbChunk.Hash)
assert.NoError(t, err)
batch, err := batchOrm.InsertBatch(context.Background(), 0, 0, dbChunk.Hash, dbChunk.Hash, []*types.Chunk{chunk})
assert.NoError(t, err)
err = batchOrm.UpdateChunkProofsStatusByBatchHash(context.Background(), batch.Hash, types.ChunkProofsStatusReady)
assert.NoError(t, err)
// create first chunk & batch mock prover, that will not send any proof.
chunkProver1 := newMockProver(t, "prover_test"+strconv.Itoa(0), coordinatorURL, message.ProofTypeChunk)
proverChunkTask := chunkProver1.getProverTask(t, message.ProofTypeChunk)
assert.NotNil(t, proverChunkTask)
batchProver1 := newMockProver(t, "prover_test"+strconv.Itoa(1), coordinatorURL, message.ProofTypeBatch)
proverBatchTask := batchProver1.getProverTask(t, message.ProofTypeBatch)
assert.NotNil(t, proverBatchTask)
// verify proof status, it should be assigned, because prover didn't send any proof
chunkProofStatus, err := chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
assert.NoError(t, err)
assert.Equal(t, chunkProofStatus, types.ProvingTaskAssigned)
batchProofStatus, err := batchOrm.GetProvingStatusByHash(context.Background(), batch.Hash)
assert.NoError(t, err)
assert.Equal(t, batchProofStatus, types.ProvingTaskAssigned)
// wait coordinator to reset the prover task proving status
time.Sleep(time.Duration(conf.ProverManager.BatchCollectionTimeSec*2) * time.Second)
time.Sleep(time.Duration(conf.ProverManager.ChunkCollectionTimeSec*2) * time.Second)
// create second mock prover, that will send valid proof.
chunkProver2 := newMockProver(t, "prover_test"+strconv.Itoa(2), coordinatorURL, message.ProofTypeChunk)
@@ -447,17 +427,8 @@ func testTimeoutProof(t *testing.T) {
assert.NotNil(t, proverChunkTask2)
chunkProver2.submitProof(t, proverChunkTask2, verifiedSuccess, types.Success)
batchProver2 := newMockProver(t, "prover_test"+strconv.Itoa(3), coordinatorURL, message.ProofTypeBatch)
proverBatchTask2 := batchProver2.getProverTask(t, message.ProofTypeBatch)
assert.NotNil(t, proverBatchTask2)
batchProver2.submitProof(t, proverBatchTask2, verifiedSuccess, types.Success)
// verify proof status, it should be verified now, because second prover sent valid proof
chunkProofStatus2, err := chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
assert.NoError(t, err)
assert.Equal(t, chunkProofStatus2, types.ProvingTaskVerified)
batchProofStatus2, err := batchOrm.GetProvingStatusByHash(context.Background(), batch.Hash)
assert.NoError(t, err)
assert.Equal(t, batchProofStatus2, types.ProvingTaskVerified)
}

View File

@@ -24,6 +24,8 @@ create table chunk
prover_assigned_at TIMESTAMP(0) DEFAULT NULL,
proved_at TIMESTAMP(0) DEFAULT NULL,
proof_time_sec INTEGER DEFAULT NULL,
total_attempts SMALLINT NOT NULL DEFAULT 0,
active_attempts SMALLINT NOT NULL DEFAULT 0,
-- batch
batch_hash VARCHAR DEFAULT NULL,
@@ -50,6 +52,10 @@ on chunk (hash) where deleted_at IS NULL;
create index batch_hash_index
on chunk (batch_hash) where deleted_at IS NULL;
create index chunk_idx_total_attempts_and_active_attempts_and_end_block_number
on chunk (total_attempts, active_attempts, end_block_number)
where deleted_at IS NULL;
-- +goose StatementEnd
-- +goose Down

View File

@@ -22,6 +22,8 @@ create table batch
prover_assigned_at TIMESTAMP(0) DEFAULT NULL,
proved_at TIMESTAMP(0) DEFAULT NULL,
proof_time_sec INTEGER DEFAULT NULL,
total_attempts SMALLINT NOT NULL DEFAULT 0,
active_attempts SMALLINT NOT NULL DEFAULT 0,
-- rollup
rollup_status SMALLINT NOT NULL DEFAULT 1,
@@ -46,6 +48,10 @@ on batch (index) where deleted_at IS NULL;
create unique index batch_hash_uindex
on batch (hash) where deleted_at IS NULL;
create index batch_idx_total_attempts_and_active_attempts_and_chunk_proofs_status
on batch (total_attempts, active_attempts, chunk_proofs_status)
where deleted_at IS NULL;
comment
on column batch.chunk_proofs_status is 'undefined, pending, ready';

View File

@@ -193,6 +193,7 @@ github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/crate-crypto/go-ipa v0.0.0-20220523130400-f11357ae11c7 h1:6IrxszG5G+O7zhtkWxq6+unVvnrm1fqV2Pe+T95DUzw=
github.com/crate-crypto/go-ipa v0.0.0-20220523130400-f11357ae11c7/go.mod h1:gFnFS95y8HstDP6P9pPwzrxOOC5TRDkwbM+ao15ChAI=
github.com/crate-crypto/go-ipa v0.0.0-20230601170251-1830d0757c80 h1:DuBDHVjgGMPki7bAyh91+3cF1Vh34sAEdH8JQgbc2R0=
github.com/crate-crypto/go-ipa v0.0.0-20230601170251-1830d0757c80/go.mod h1:gzbVz57IDJgQ9rLQwfSk696JGWof8ftznEL9GoAv3NI=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
@@ -238,6 +239,7 @@ github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf h1:Yt+4K30SdjOkRoRRm3v
github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf/go.mod h1:R9ET47fwRVRPZnOGvHxxhuZcbrMCuiqOz3Rlrh4KSnk=
github.com/dop251/goja v0.0.0-20230122112309-96b1610dd4f7 h1:kgvzE5wLsLa7XKfV85VZl40QXaMCaeFtHpPwJ8fhotY=
github.com/dop251/goja v0.0.0-20230122112309-96b1610dd4f7/go.mod h1:yRkwfj0CBpOGre+TwBsqPV0IH0Pk73e4PXJOeNDboGs=
github.com/dop251/goja v0.0.0-20230605162241-28ee0ee714f3 h1:+3HCtB74++ClLy8GgjUQYeC8R4ILzVcIe8+5edAJJnE=
github.com/dop251/goja v0.0.0-20230605162241-28ee0ee714f3/go.mod h1:QMWlm50DNe14hD7t24KEqZuUdC9sOTy8W6XbCU1mlw4=
github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7 h1:tYwu/z8Y0NkkzGEh3z21mSWggMg4LwLRFucLS7TjARg=
github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y=
@@ -277,6 +279,7 @@ github.com/garslo/gogen v0.0.0-20170306192744-1d203ffc1f61/go.mod h1:Q0X6pkwTILD
github.com/gavv/httpexpect v2.0.0+incompatible h1:1X9kcRshkSKEjNJJxX9Y9mQ5BRfbxU5kORdjhlA1yX8=
github.com/gballet/go-verkle v0.0.0-20220902153445-097bd83b7732 h1:AB7YjNrzlVHsYz06zCULVV2zYCEft82P86dSmtwxKL0=
github.com/gballet/go-verkle v0.0.0-20220902153445-097bd83b7732/go.mod h1:o/XfIXWi4/GqbQirfRm5uTbXMG5NpqxkxblnbZ+QM9I=
github.com/gballet/go-verkle v0.0.0-20230607174250-df487255f46b h1:vMT47RYsrftsHSTQhqXwC3BYflo38OLC3Y4LtXtLyU0=
github.com/gballet/go-verkle v0.0.0-20230607174250-df487255f46b/go.mod h1:CDncRYVRSDqwakm282WEkjfaAj1hxU/v5RXxk5nXOiI=
github.com/getkin/kin-openapi v0.61.0 h1:6awGqF5nG5zkVpMsAih1QH4VgzS8phTxECUWIFo7zko=
github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9 h1:r5GgOLGbza2wVHRzK7aAj6lWZjfbAwiu/RDCVOKjRyM=
@@ -374,6 +377,7 @@ github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99 h1:Ak8CrdlwwXwAZxzS66vgPt4U8yUZX7JwLvVR58FN5jM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 h1:4/hN5RUoecvl+RmJRE2YxKWtnnQls6rQjjW5oV7qg2U=
github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg=
github.com/google/renameio v0.1.0 h1:GOZbcHa3HfsPKPlmyPyN2KEohoMXOhdMbHrvbpl2QaA=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
@@ -472,6 +476,7 @@ github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQ
github.com/karalabe/usb v0.0.0-20211005121534-4c5740d64559/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU=
github.com/karalabe/usb v0.0.2 h1:M6QQBNxF+CQ8OFvxrT90BA0qBOXymndZnk5q235mFc4=
github.com/karalabe/usb v0.0.2/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU=
github.com/karalabe/usb v0.0.3-0.20230711191512-61db3e06439c h1:AqsttAyEyIEsNz5WLRwuRwjiT5CMDUfLk6cFJDVPebs=
github.com/karalabe/usb v0.0.3-0.20230711191512-61db3e06439c/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU=
github.com/kataras/blocks v0.0.7 h1:cF3RDY/vxnSRezc7vLFlQFTYXG/yAr1o7WImJuZbzC4=
github.com/kataras/blocks v0.0.7/go.mod h1:UJIU97CluDo0f+zEjbnbkeMRlvYORtmc1304EeyXf4I=
@@ -711,6 +716,7 @@ go.opentelemetry.io/otel v1.9.0/go.mod h1:np4EoPGzoPs3O67xUVNoPPcmSvsfOxNlNA4F4A
go.opentelemetry.io/otel/trace v1.9.0 h1:oZaCNJUjWcg60VXWee8lJKlqhPbXAPB51URuR47pQYc=
go.opentelemetry.io/otel/trace v1.9.0/go.mod h1:2737Q0MuG8q1uILYm2YYVkAyLtOofiTNGg6VODnOiPo=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/automaxprocs v1.5.2 h1:2LxUOGiR3O6tw8ui5sZa2LAaHnsviZdVOUZw4fvbnME=
go.uber.org/automaxprocs v1.5.2/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
@@ -782,6 +788,7 @@ golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.10.0 h1:3R7pNqamzBraeqj/Tj8qt1aQ2HpmlC+Cx/qL/7hn4/c=
golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o=
golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0=
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=

View File

@@ -37,8 +37,11 @@ func NewCoordinatorClient(cfg *config.CoordinatorConfig, proverName string, priv
SetBaseURL(cfg.BaseURL).
AddRetryCondition(func(r *resty.Response, _ error) bool {
// Check for HTTP 5xx errors, e.g., coordinator is restarting.
log.Warn("Received unexpected HTTP response. Retrying...", "status code", r.StatusCode())
return r.StatusCode() >= http.StatusInternalServerError
if r.StatusCode() >= http.StatusInternalServerError {
log.Warn("Received unexpected HTTP response. Retrying...", "status code", r.StatusCode())
return true
}
return false
})
log.Info("successfully initialized prover client",

View File

@@ -53,12 +53,10 @@ type GetTaskResponse struct {
// SubmitProofRequest defines the request structure for the SubmitProof API.
type SubmitProofRequest struct {
TaskID string `json:"task_id"`
TaskType int `json:"task_type"`
Status int `json:"status"`
Proof string `json:"proof"`
FailureType int `json:"failure_type,omitempty"`
FailureMsg string `json:"failure_msg,omitempty"`
TaskID string `json:"task_id"`
TaskType int `json:"task_type"`
Status int `json:"status"`
Proof string `json:"proof"`
}
// SubmitProofResponse defines the response structure for the SubmitProof API.

View File

@@ -151,6 +151,13 @@ func (r *Prover) proveAndSubmit() error {
}
}
defer func() {
err = r.stack.Delete(task.Task.ID)
if err != nil {
log.Error("prover stack pop failed!", "err", err)
}
}()
var proofMsg *message.ProofDetail
if task.Times <= 2 {
// If panic times <= 2, try to proof the task.
@@ -161,8 +168,7 @@ func (r *Prover) proveAndSubmit() error {
log.Info("start to prove task", "task-type", task.Task.Type, "task-id", task.Task.ID)
proofMsg, err = r.prove(task)
if err != nil { // handling error from prove
log.Error("failed to prove task", "task_type", task.Task.Type, "task-id", task.Task.ID, "err", err)
return r.submitErr(task, message.ProofFailureNoPanic, err)
return fmt.Errorf("failed to prove task, task-type: %v, err: %v", task.Task.Type, err)
}
return r.submitProof(proofMsg)
@@ -170,11 +176,7 @@ func (r *Prover) proveAndSubmit() error {
// when the prover has more than 3 times panic,
// it will omit to prove the task, submit StatusProofError and then Delete the task.
if err = r.stack.Delete(task.Task.ID); err != nil {
log.Error("prover stack pop failed", "task_type", task.Task.Type, "task_id", task.Task.ID, "err", err)
}
log.Error("zk proving panic for task", "task-type", task.Task.Type, "task-id", task.Task.ID)
return r.submitErr(task, message.ProofFailurePanic, errors.New("zk proving panic for task"))
return fmt.Errorf("zk proving panic for task, task-type: %v, task-id: %v", task.Task.Type, task.Task.ID)
}
// fetchTaskFromCoordinator fetches a new task from the server
@@ -334,27 +336,6 @@ func (r *Prover) submitProof(msg *message.ProofDetail) error {
return nil
}
func (r *Prover) submitErr(task *store.ProvingTask, proofFailureType message.ProofFailureType, err error) error {
// prepare the submit request
req := &client.SubmitProofRequest{
TaskID: task.Task.ID,
TaskType: int(task.Task.Type),
Status: int(message.StatusProofError),
Proof: "",
FailureType: int(proofFailureType),
FailureMsg: err.Error(),
}
// send the submit request
if submitErr := r.coordinatorClient.SubmitProof(r.ctx, req); submitErr != nil {
return fmt.Errorf("error submitting proof: %v", submitErr)
}
log.Info("proof submitted report failure successfully", "task-id", task.Task.ID, "task-type",
task.Task.Type, "task-status", message.StatusProofError, "err", err)
return nil
}
func (r *Prover) getSortedTracesByHashes(blockHashes []common.Hash) ([]*types.BlockTrace, error) {
if len(blockHashes) == 0 {
return nil, fmt.Errorf("blockHashes is empty")