Compare commits

...

17 Commits

Author SHA1 Message Date
Morty
f957b0f9cf Merge branch 'develop' into feat-prover-multi-task 2025-01-06 21:40:08 +08:00
Morty
6b5cf47bde Merge branch 'feat-coordinator-assign-logic' into feat-prover-multi-task 2025-01-06 21:39:05 +08:00
yiweichi
0d1c303934 chore: auto version bump [bot] 2025-01-06 07:10:40 +00:00
Morty
707267a431 fix: lint 2025-01-01 04:44:47 +08:00
Morty
9e637616b7 fix: lint 2025-01-01 04:11:43 +08:00
Morty
3e97105c72 refactor 2025-01-01 04:07:51 +08:00
Morty
4215b6b339 fix: field is never read 2025-01-01 02:52:50 +08:00
Morty
37da7b8c8b chore: remove extra files 2025-01-01 02:46:47 +08:00
Morty
3da7567295 feat(prover): support mutiple version of prover tasks 2025-01-01 02:46:05 +08:00
Morty
e4c0779e73 fix: comments 2024-12-31 01:46:12 +08:00
Morty
1c5d88dc7c fix: orm GetTaskOfProver 2024-12-31 00:35:52 +08:00
Morty
a75075d384 fix: err log 2024-12-31 00:18:18 +08:00
Morty
df926160ba chore: remove extra files 2024-12-30 22:08:56 +08:00
Morty
3e0589e3a8 Merge branch 'develop' into feat-coordinator-assign-logic 2024-12-30 22:06:30 +08:00
Morty
8ce5121786 fix: GetUnassignedBatchCount 2024-12-30 22:06:12 +08:00
Morty
71acdb3b88 fix: lint 2024-12-30 22:00:20 +08:00
Morty
791fcafa60 feat(coordinator): assign static prover first and avoid reassigning failing task to same prover 2024-12-30 21:03:57 +08:00
17 changed files with 331 additions and 146 deletions

View File

@@ -2,6 +2,7 @@
"prover_manager": { "prover_manager": {
"provers_per_session": 1, "provers_per_session": 1,
"session_attempts": 5, "session_attempts": 5,
"external_prover_threshold": 32,
"bundle_collection_time_sec": 180, "bundle_collection_time_sec": 180,
"batch_collection_time_sec": 180, "batch_collection_time_sec": 180,
"chunk_collection_time_sec": 180, "chunk_collection_time_sec": 180,

View File

@@ -16,6 +16,8 @@ type ProverManager struct {
// Number of attempts that a session can be retried if previous attempts failed. // Number of attempts that a session can be retried if previous attempts failed.
// Currently we only consider proving timeout as failure here. // Currently we only consider proving timeout as failure here.
SessionAttempts uint8 `json:"session_attempts"` SessionAttempts uint8 `json:"session_attempts"`
// Threshold for activating the external prover based on unassigned task count.
ExternalProverThreshold int64 `json:"external_prover_threshold"`
// Zk verifier config. // Zk verifier config.
Verifier *VerifierConfig `json:"verifier"` Verifier *VerifierConfig `json:"verifier"`
// BatchCollectionTimeSec batch Proof collection time (in seconds). // BatchCollectionTimeSec batch Proof collection time (in seconds).

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -63,24 +64,43 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts
if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) {
unassignedBatchCount, getCountError := bp.batchOrm.GetUnassignedBatchCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
if getCountError != nil {
log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
return nil, ErrCoordinatorInternalFailure
}
// Assign external prover if unassigned task number exceeds threshold
if unassignedBatchCount < bp.cfg.ProverManager.ExternalProverThreshold {
return nil, nil
}
}
var batchTask *orm.Batch var batchTask *orm.Batch
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
var getTaskError error var getTaskError error
var tmpBatchTask *orm.Batch var tmpBatchTask *orm.Batch
tmpBatchTask, getTaskError = bp.batchOrm.GetAssignedBatch(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) var assignedOffset, unassignedOffset = 0, 0
tmpAssignedBatchTasks, getTaskError := bp.batchOrm.GetAssignedBatches(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50)
if getTaskError != nil { if getTaskError != nil {
log.Error("failed to get assigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) log.Error("failed to get assigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure return nil, ErrCoordinatorInternalFailure
} }
// Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned`
// batch to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. // chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
if tmpBatchTask == nil { tmpUnassignedBatchTask, getTaskError := bp.batchOrm.GetUnassignedBatches(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50)
tmpBatchTask, getTaskError = bp.batchOrm.GetUnassignedBatch(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
if getTaskError != nil { if getTaskError != nil {
log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure return nil, ErrCoordinatorInternalFailure
} }
for {
tmpBatchTask = nil
if assignedOffset < len(tmpAssignedBatchTasks) {
tmpBatchTask = tmpAssignedBatchTasks[assignedOffset]
assignedOffset++
} else if unassignedOffset < len(tmpUnassignedBatchTask) {
tmpBatchTask = tmpUnassignedBatchTask[unassignedOffset]
unassignedOffset++
} }
if tmpBatchTask == nil { if tmpBatchTask == nil {
@@ -88,6 +108,17 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
return nil, nil return nil, nil
} }
// Don't dispatch the same failing job to the same prover
proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion)
if getTaskError != nil {
log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBatchTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid {
break
}
}
rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx.Copy(), tmpBatchTask.Index, tmpBatchTask.ActiveAttempts, tmpBatchTask.TotalAttempts) rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx.Copy(), tmpBatchTask.Index, tmpBatchTask.ActiveAttempts, tmpBatchTask.TotalAttempts)
if updateAttemptsErr != nil { if updateAttemptsErr != nil {
log.Error("failed to update batch attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr) log.Error("failed to update batch attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -63,24 +64,43 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts
if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) {
unassignedBundleCount, getCountError := bp.bundleOrm.GetUnassignedBundleCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
if getCountError != nil {
log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
return nil, ErrCoordinatorInternalFailure
}
// Assign external prover if unassigned task number exceeds threshold
if unassignedBundleCount < bp.cfg.ProverManager.ExternalProverThreshold {
return nil, nil
}
}
var bundleTask *orm.Bundle var bundleTask *orm.Bundle
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
var getTaskError error var getTaskError error
var tmpBundleTask *orm.Bundle var tmpBundleTask *orm.Bundle
tmpBundleTask, getTaskError = bp.bundleOrm.GetAssignedBundle(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) var assignedOffset, unassignedOffset = 0, 0
tmpAssignedBundleTasks, getTaskError := bp.bundleOrm.GetAssignedBundles(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50)
if getTaskError != nil { if getTaskError != nil {
log.Error("failed to get assigned bundle proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) log.Error("failed to get assigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure return nil, ErrCoordinatorInternalFailure
} }
// Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned`
// bundle to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. // chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
if tmpBundleTask == nil { tmpUnassignedBundleTask, getTaskError := bp.bundleOrm.GetUnassignedBundles(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50)
tmpBundleTask, getTaskError = bp.bundleOrm.GetUnassignedBundle(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
if getTaskError != nil { if getTaskError != nil {
log.Error("failed to get unassigned bundle proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure return nil, ErrCoordinatorInternalFailure
} }
for {
tmpBundleTask = nil
if assignedOffset < len(tmpAssignedBundleTasks) {
tmpBundleTask = tmpAssignedBundleTasks[assignedOffset]
assignedOffset++
} else if unassignedOffset < len(tmpUnassignedBundleTask) {
tmpBundleTask = tmpUnassignedBundleTask[unassignedOffset]
unassignedOffset++
} }
if tmpBundleTask == nil { if tmpBundleTask == nil {
@@ -88,6 +108,17 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
return nil, nil return nil, nil
} }
// Don't dispatch the same failing job to the same prover
proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBundleTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion)
if getTaskError != nil {
log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBundleTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid {
break
}
}
rowsAffected, updateAttemptsErr := bp.bundleOrm.UpdateBundleAttempts(ctx.Copy(), tmpBundleTask.Hash, tmpBundleTask.ActiveAttempts, tmpBundleTask.TotalAttempts) rowsAffected, updateAttemptsErr := bp.bundleOrm.UpdateBundleAttempts(ctx.Copy(), tmpBundleTask.Hash, tmpBundleTask.ActiveAttempts, tmpBundleTask.TotalAttempts)
if updateAttemptsErr != nil { if updateAttemptsErr != nil {
log.Error("failed to update bundle attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr) log.Error("failed to update bundle attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -61,24 +62,43 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
maxActiveAttempts := cp.cfg.ProverManager.ProversPerSession maxActiveAttempts := cp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts
if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) {
unassignedChunkCount, getCountError := cp.chunkOrm.GetUnassignedChunkCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight)
if getCountError != nil {
log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
return nil, ErrCoordinatorInternalFailure
}
// Assign external prover if unassigned task number exceeds threshold
if unassignedChunkCount < cp.cfg.ProverManager.ExternalProverThreshold {
return nil, nil
}
}
var chunkTask *orm.Chunk var chunkTask *orm.Chunk
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
var getTaskError error var getTaskError error
var tmpChunkTask *orm.Chunk var tmpChunkTask *orm.Chunk
tmpChunkTask, getTaskError = cp.chunkOrm.GetAssignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight) var assignedOffset, unassignedOffset = 0, 0
tmpAssignedChunkTasks, getTaskError := cp.chunkOrm.GetAssignedChunks(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight, 50)
if getTaskError != nil { if getTaskError != nil {
log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure return nil, ErrCoordinatorInternalFailure
} }
// Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned`
// chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. // chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
if tmpChunkTask == nil { tmpUnassignedChunkTask, getTaskError := cp.chunkOrm.GetUnassignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight, 50)
tmpChunkTask, getTaskError = cp.chunkOrm.GetUnassignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight)
if getTaskError != nil { if getTaskError != nil {
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure return nil, ErrCoordinatorInternalFailure
} }
for {
tmpChunkTask = nil
if assignedOffset < len(tmpAssignedChunkTasks) {
tmpChunkTask = tmpAssignedChunkTasks[assignedOffset]
assignedOffset++
} else if unassignedOffset < len(tmpUnassignedChunkTask) {
tmpChunkTask = tmpUnassignedChunkTask[unassignedOffset]
unassignedOffset++
} }
if tmpChunkTask == nil { if tmpChunkTask == nil {
@@ -86,6 +106,17 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
return nil, nil return nil, nil
} }
// Don't dispatch the same failing job to the same prover
proverTask, getTaskError := cp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion)
if getTaskError != nil {
log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeChunk.String(), "taskID", tmpChunkTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid {
break
}
}
rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx.Copy(), tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts) rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx.Copy(), tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts)
if updateAttemptsErr != nil { if updateAttemptsErr != nil {
log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr) log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)

View File

@@ -27,6 +27,11 @@ var (
getTaskCounterVec *prometheus.CounterVec = nil getTaskCounterVec *prometheus.CounterVec = nil
) )
var (
// ExternalProverNamePrefix prefix of prover name
ExternalProverNamePrefix = "external"
)
// ProverTask the interface of a collector who send data to prover // ProverTask the interface of a collector who send data to prover
type ProverTask interface { type ProverTask interface {
Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error)

View File

@@ -78,38 +78,48 @@ func (*Batch) TableName() string {
return "batch" return "batch"
} }
// GetUnassignedBatch retrieves unassigned batch based on the specified limit. // GetUnassignedBatches retrieves unassigned batches based on the specified limit.
// The returned batches are sorted in ascending order by their index. // The returned batches are sorted in ascending order by their index.
func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) { func (o *Batch) GetUnassignedBatches(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Batch, error) {
var batch Batch var batch []*Batch
db := o.db.WithContext(ctx) db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT 1;", sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT %d;",
int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady)) int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady), limit)
err := db.Raw(sql).Scan(&batch).Error err := db.Raw(sql).Scan(&batch).Error
if err != nil { if err != nil {
return nil, fmt.Errorf("Batch.GetUnassignedBatch error: %w", err) return nil, fmt.Errorf("Batch.GetUnassignedBatches error: %w", err)
} }
if batch.Hash == "" { return batch, nil
return nil, nil
}
return &batch, nil
} }
// GetAssignedBatch retrieves assigned batch based on the specified limit. // GetUnassignedBatchCount retrieves unassigned batch count based on the specified limit.
// The returned batches are sorted in ascending order by their index. func (o *Batch) GetUnassignedBatchCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (int64, error) {
func (o *Batch) GetAssignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) { var count int64
var batch Batch
db := o.db.WithContext(ctx) db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT 1;", db = db.Model(&Batch{})
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady)) db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady))
db = db.Where("batch.deleted_at IS NULL")
if err := db.Count(&count).Error; err != nil {
return 0, fmt.Errorf("Batch.GetUnassignedBatchCount error: %w", err)
}
return count, nil
}
// GetAssignedBatches retrieves assigned batches based on the specified limit.
// The returned batches are sorted in ascending order by their index.
func (o *Batch) GetAssignedBatches(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Batch, error) {
var batch []*Batch
db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT %d;",
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady), limit)
err := db.Raw(sql).Scan(&batch).Error err := db.Raw(sql).Scan(&batch).Error
if err != nil { if err != nil {
return nil, fmt.Errorf("Batch.GetAssignedBatch error: %w", err) return nil, fmt.Errorf("Batch.GetAssignedBatches error: %w", err)
} }
if batch.Hash == "" { return batch, nil
return nil, nil
}
return &batch, nil
} }
// GetUnassignedAndChunksUnreadyBatches get the batches which is unassigned and chunks is not ready // GetUnassignedAndChunksUnreadyBatches get the batches which is unassigned and chunks is not ready
@@ -132,19 +142,6 @@ func (o *Batch) GetUnassignedAndChunksUnreadyBatches(ctx context.Context, offset
return batches, nil return batches, nil
} }
// GetAssignedBatches retrieves all batches whose proving_status is either types.ProvingTaskAssigned.
func (o *Batch) GetAssignedBatches(ctx context.Context) ([]*Batch, error) {
db := o.db.WithContext(ctx)
db = db.Model(&Batch{})
db = db.Where("proving_status = ?", int(types.ProvingTaskAssigned))
var assignedBatches []*Batch
if err := db.Find(&assignedBatches).Error; err != nil {
return nil, fmt.Errorf("Batch.GetAssignedBatches error: %w", err)
}
return assignedBatches, nil
}
// GetProvingStatusByHash retrieves the proving status of a batch given its hash. // GetProvingStatusByHash retrieves the proving status of a batch given its hash.
func (o *Batch) GetProvingStatusByHash(ctx context.Context, hash string) (types.ProvingStatus, error) { func (o *Batch) GetProvingStatusByHash(ctx context.Context, hash string) (types.ProvingStatus, error) {
db := o.db.WithContext(ctx) db := o.db.WithContext(ctx)

View File

@@ -54,38 +54,47 @@ func (*Bundle) TableName() string {
return "bundle" return "bundle"
} }
// GetUnassignedBundle retrieves unassigned bundle based on the specified limit. // GetUnassignedBundles retrieves unassigned bundle based on the specified limit.
// The returned batch sorts in ascending order by their index. // The returned batch sorts in ascending order by their index.
func (o *Bundle) GetUnassignedBundle(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Bundle, error) { func (o *Bundle) GetUnassignedBundles(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Bundle, error) {
var bundle Bundle var bundle []*Bundle
db := o.db.WithContext(ctx) db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT 1;", sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT %d;",
int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady)) int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady), limit)
err := db.Raw(sql).Scan(&bundle).Error err := db.Raw(sql).Scan(&bundle).Error
if err != nil { if err != nil {
return nil, fmt.Errorf("Batch.GetUnassignedBundle error: %w", err) return nil, fmt.Errorf("Batch.GetUnassignedBundles error: %w", err)
} }
if bundle.StartBatchHash == "" || bundle.EndBatchHash == "" { return bundle, nil
return nil, nil
}
return &bundle, nil
} }
// GetAssignedBundle retrieves assigned bundle based on the specified limit. // GetUnassignedBundleCount retrieves unassigned bundle count based on the specified limit.
// The returned bundle sorts in ascending order by their index. func (o *Bundle) GetUnassignedBundleCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (int64, error) {
func (o *Bundle) GetAssignedBundle(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Bundle, error) { var count int64
var bundle Bundle
db := o.db.WithContext(ctx) db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT 1;", db = db.Model(&Bundle{})
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady)) db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("bundle.deleted_at IS NULL")
if err := db.Count(&count).Error; err != nil {
return 0, fmt.Errorf("Bundle.GetUnassignedBundleCount error: %w", err)
}
return count, nil
}
// GetAssignedBundles retrieves assigned bundles based on the specified limit.
// The returned bundle sorts in ascending order by their index.
func (o *Bundle) GetAssignedBundles(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Bundle, error) {
var bundle []*Bundle
db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT %d;",
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady), limit)
err := db.Raw(sql).Scan(&bundle).Error err := db.Raw(sql).Scan(&bundle).Error
if err != nil { if err != nil {
return nil, fmt.Errorf("Bundle.GetAssignedBatch error: %w", err) return nil, fmt.Errorf("Bundle.GetAssignedBundles error: %w", err)
} }
if bundle.StartBatchHash == "" || bundle.EndBatchHash == "" { return bundle, nil
return nil, nil
}
return &bundle, nil
} }
// GetProvingStatusByHash retrieves the proving status of a bundle given its hash. // GetProvingStatusByHash retrieves the proving status of a bundle given its hash.

View File

@@ -73,36 +73,46 @@ func (*Chunk) TableName() string {
// GetUnassignedChunk retrieves unassigned chunk based on the specified limit. // GetUnassignedChunk retrieves unassigned chunk based on the specified limit.
// The returned chunks are sorted in ascending order by their index. // The returned chunks are sorted in ascending order by their index.
func (o *Chunk) GetUnassignedChunk(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64) (*Chunk, error) { func (o *Chunk) GetUnassignedChunk(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height, limit uint64) ([]*Chunk, error) {
var chunk Chunk var chunks []*Chunk
db := o.db.WithContext(ctx) db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT 1;", sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT %d;",
int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, height) int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, height, limit)
err := db.Raw(sql).Scan(&chunk).Error err := db.Raw(sql).Scan(&chunks).Error
if err != nil { if err != nil {
return nil, fmt.Errorf("Chunk.GetUnassignedChunk error: %w", err) return nil, fmt.Errorf("Chunk.GetUnassignedChunk error: %w", err)
} }
if chunk.Hash == "" { return chunks, nil
return nil, nil
}
return &chunk, nil
} }
// GetAssignedChunk retrieves assigned chunk based on the specified limit. // GetUnassignedChunkCount retrieves unassigned chunk count based on the specified limit.
// The returned chunks are sorted in ascending order by their index. func (o *Chunk) GetUnassignedChunkCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64) (int64, error) {
func (o *Chunk) GetAssignedChunk(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64) (*Chunk, error) { var count int64
var chunk Chunk
db := o.db.WithContext(ctx) db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT 1;", db = db.Model(&Chunk{})
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, height) db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
err := db.Raw(sql).Scan(&chunk).Error db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("end_block_number <= ?", height)
db = db.Where("chunk.deleted_at IS NULL")
if err := db.Count(&count).Error; err != nil {
return 0, fmt.Errorf("Chunk.GetUnassignedChunkCount error: %w", err)
}
return count, nil
}
// GetAssignedChunks retrieves assigned chunks based on the specified limit.
// The returned chunks are sorted in ascending order by their index.
func (o *Chunk) GetAssignedChunks(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64, limit uint64) ([]*Chunk, error) {
var chunks []*Chunk
db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT %d;",
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, height, limit)
err := db.Raw(sql).Scan(&chunks).Error
if err != nil { if err != nil {
return nil, fmt.Errorf("Chunk.GetAssignedChunk error: %w", err) return nil, fmt.Errorf("Chunk.GetAssignedChunks error: %w", err)
} }
if chunk.Hash == "" { return chunks, nil
return nil, nil
}
return &chunk, nil
} }
// GetChunksByBatchHash retrieves the chunks associated with a specific batch hash. // GetChunksByBatchHash retrieves the chunks associated with a specific batch hash.

View File

@@ -148,6 +148,24 @@ func (o *ProverTask) GetAssignedTaskOfOtherProvers(ctx context.Context, taskType
return proverTasks, nil return proverTasks, nil
} }
// GetTaskOfOtherProvers get the chunk/batch task of prover
func (o *ProverTask) GetTaskOfProver(ctx context.Context, taskType message.ProofType, taskID, proverPublicKey, proverVersion string) (*ProverTask, error) {
db := o.db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("task_type", int(taskType))
db = db.Where("task_id", taskID)
db = db.Where("prover_public_key", proverPublicKey)
db = db.Where("prover_version", proverVersion)
db = db.Limit(1)
var proverTask ProverTask
err := db.Find(&proverTask).Error
if err != nil {
return nil, fmt.Errorf("ProverTask.GetTaskOfProver error: %w, taskID: %v, publicKey:%s", err, taskID, proverPublicKey)
}
return &proverTask, nil
}
// GetProvingStatusByTaskID retrieves the proving status of a prover task // GetProvingStatusByTaskID retrieves the proving status of a prover task
func (o *ProverTask) GetProvingStatusByTaskID(ctx context.Context, taskType message.ProofType, taskID string) (types.ProverProveStatus, error) { func (o *ProverTask) GetProvingStatusByTaskID(ctx context.Context, taskType message.ProofType, taskID string) (types.ProverProveStatus, error) {
db := o.db.WithContext(ctx) db := o.db.WithContext(ctx)

View File

@@ -3,7 +3,7 @@
"keystore_path": "keystore.json", "keystore_path": "keystore.json",
"keystore_password": "prover-pwd", "keystore_password": "prover-pwd",
"db_path": "unique-db-path-for-prover-1", "db_path": "unique-db-path-for-prover-1",
"prover_type": 2, "prover_types": [2],
"low_version_circuit": { "low_version_circuit": {
"hard_fork_name": "bernoulli", "hard_fork_name": "bernoulli",
"params_path": "params", "params_path": "params",

View File

@@ -30,7 +30,7 @@ pub struct Config {
pub keystore_path: String, pub keystore_path: String,
pub keystore_password: String, pub keystore_password: String,
pub db_path: String, pub db_path: String,
pub prover_type: ProverType, pub prover_types: Vec<ProverType>,
pub low_version_circuit: CircuitConfig, pub low_version_circuit: CircuitConfig,
pub high_version_circuit: CircuitConfig, pub high_version_circuit: CircuitConfig,
pub coordinator: CoordinatorConfig, pub coordinator: CoordinatorConfig,

View File

@@ -73,7 +73,7 @@ impl<'a> CoordinatorClient<'a> {
challenge: token.clone(), challenge: token.clone(),
prover_name: self.config.prover_name.clone(), prover_name: self.config.prover_name.clone(),
prover_version: crate::version::get_version(), prover_version: crate::version::get_version(),
prover_types: vec![self.config.prover_type], prover_types: self.config.prover_types.clone(),
vks: self.vks.clone(), vks: self.vks.clone(),
}; };

View File

@@ -66,7 +66,7 @@ fn start() -> Result<()> {
log::info!( log::info!(
"prover start successfully. name: {}, type: {:?}, publickey: {}, version: {}", "prover start successfully. name: {}, type: {:?}, publickey: {}, version: {}",
config.prover_name, config.prover_name,
config.prover_type, config.prover_types,
prover.get_public_key(), prover.get_public_key(),
version::get_version(), version::get_version(),
); );

View File

@@ -8,8 +8,8 @@ use crate::{
coordinator_client::{listener::Listener, types::*, CoordinatorClient}, coordinator_client::{listener::Listener, types::*, CoordinatorClient},
geth_client::GethClient, geth_client::GethClient,
key_signer::KeySigner, key_signer::KeySigner,
types::{ProofFailureType, ProofStatus, ProverType}, types::{ProofFailureType, ProofStatus, ProverType, TaskType},
utils::get_task_types, utils::{get_prover_type, get_task_types},
zk_circuits_handler::{CircuitsHandler, CircuitsHandlerProvider}, zk_circuits_handler::{CircuitsHandler, CircuitsHandlerProvider},
}; };
@@ -25,11 +25,14 @@ pub struct Prover<'a> {
impl<'a> Prover<'a> { impl<'a> Prover<'a> {
pub fn new(config: &'a Config, coordinator_listener: Box<dyn Listener>) -> Result<Self> { pub fn new(config: &'a Config, coordinator_listener: Box<dyn Listener>) -> Result<Self> {
let prover_type = config.prover_type;
let keystore_path = &config.keystore_path; let keystore_path = &config.keystore_path;
let keystore_password = &config.keystore_password; let keystore_password = &config.keystore_password;
let geth_client = if config.prover_type == ProverType::Chunk { let geth_client = if config
.prover_types
.iter()
.any(|element| *element == ProverType::Chunk)
{
Some(Rc::new(RefCell::new( Some(Rc::new(RefCell::new(
GethClient::new( GethClient::new(
&config.prover_name, &config.prover_name,
@@ -41,10 +44,10 @@ impl<'a> Prover<'a> {
None None
}; };
let provider = CircuitsHandlerProvider::new(prover_type, config, geth_client.clone()) let provider = CircuitsHandlerProvider::new(config, geth_client.clone())
.context("failed to create circuits handler provider")?; .context("failed to create circuits handler provider")?;
let vks = provider.init_vks(prover_type, config, geth_client.clone()); let vks = provider.init_vks(config.prover_types.clone(), config, geth_client.clone());
let key_signer = Rc::new(KeySigner::new(keystore_path, keystore_password)?); let key_signer = Rc::new(KeySigner::new(keystore_path, keystore_password)?);
let coordinator_client = let coordinator_client =
@@ -68,12 +71,27 @@ impl<'a> Prover<'a> {
pub fn fetch_task(&self) -> Result<Task> { pub fn fetch_task(&self) -> Result<Task> {
log::info!("[prover] start to fetch_task"); log::info!("[prover] start to fetch_task");
let task_types: Vec<TaskType> =
self.config
.prover_types
.iter()
.fold(Vec::new(), |mut acc, prover_type| {
acc.extend(get_task_types(*prover_type));
acc
});
let mut req = GetTaskRequest { let mut req = GetTaskRequest {
task_types: get_task_types(self.config.prover_type), task_types,
prover_height: None, prover_height: None,
}; };
if self.config.prover_type == ProverType::Chunk { if self
.config
.prover_types
.iter()
.any(|element| *element == ProverType::Chunk)
{
let latest_block_number = self.get_latest_block_number_value()?; let latest_block_number = self.get_latest_block_number_value()?;
if let Some(v) = latest_block_number { if let Some(v) = latest_block_number {
if v.as_u64() == 0 { if v.as_u64() == 0 {
@@ -96,11 +114,17 @@ impl<'a> Prover<'a> {
} }
pub fn prove_task(&self, task: &Task) -> Result<ProofDetail> { pub fn prove_task(&self, task: &Task) -> Result<ProofDetail> {
let prover_type = match get_prover_type(task.task_type) {
Some(pt) => Ok(pt),
None => {
bail!("unsupported prover_type.")
}
}?;
log::info!("[prover] start to prove_task, task id: {}", task.id); log::info!("[prover] start to prove_task, task id: {}", task.id);
let handler: Rc<Box<dyn CircuitsHandler>> = self let handler: Rc<Box<dyn CircuitsHandler>> = self
.circuits_handler_provider .circuits_handler_provider
.borrow_mut() .borrow_mut()
.get_circuits_handler(&task.hard_fork_name) .get_circuits_handler(&task.hard_fork_name, prover_type)
.context("failed to get circuit handler")?; .context("failed to get circuit handler")?;
self.do_prove(task, handler) self.do_prove(task, handler)
} }

View File

@@ -24,9 +24,31 @@ pub fn log_init(log_file: Option<String>) {
}); });
} }
// pub fn get_task_types(prover_types: Vec<ProverType>) -> Vec<TaskType> {
// prover_types.into_iter().fold(Vec::new(), |mut acc, prover_type| {
// match prover_type {
// ProverType::Chunk => acc.push(TaskType::Chunk),
// ProverType::Batch => {
// acc.push(TaskType::Batch);
// acc.push(TaskType::Bundle);
// }
// }
// acc
// })
// }
pub fn get_task_types(prover_type: ProverType) -> Vec<TaskType> { pub fn get_task_types(prover_type: ProverType) -> Vec<TaskType> {
match prover_type { match prover_type {
ProverType::Chunk => vec![TaskType::Chunk], ProverType::Chunk => vec![TaskType::Chunk],
ProverType::Batch => vec![TaskType::Batch, TaskType::Bundle], ProverType::Batch => vec![TaskType::Batch, TaskType::Bundle],
} }
} }
pub fn get_prover_type(task_type: TaskType) -> Option<ProverType> {
match task_type {
TaskType::Undefined => None,
TaskType::Chunk => Some(ProverType::Chunk),
TaskType::Batch => Some(ProverType::Batch),
TaskType::Bundle => Some(ProverType::Batch),
}
}

View File

@@ -34,21 +34,17 @@ type CircuitsHandlerBuilder = fn(
) -> Result<Box<dyn CircuitsHandler>>; ) -> Result<Box<dyn CircuitsHandler>>;
pub struct CircuitsHandlerProvider<'a> { pub struct CircuitsHandlerProvider<'a> {
prover_type: ProverType,
config: &'a Config, config: &'a Config,
geth_client: Option<Rc<RefCell<GethClient>>>, geth_client: Option<Rc<RefCell<GethClient>>>,
circuits_handler_builder_map: HashMap<HardForkName, CircuitsHandlerBuilder>, circuits_handler_builder_map: HashMap<HardForkName, CircuitsHandlerBuilder>,
current_fork_name: Option<HardForkName>, current_fork_name: Option<HardForkName>,
current_prover_type: Option<ProverType>,
current_circuit: Option<Rc<Box<dyn CircuitsHandler>>>, current_circuit: Option<Rc<Box<dyn CircuitsHandler>>>,
} }
impl<'a> CircuitsHandlerProvider<'a> { impl<'a> CircuitsHandlerProvider<'a> {
pub fn new( pub fn new(config: &'a Config, geth_client: Option<Rc<RefCell<GethClient>>>) -> Result<Self> {
prover_type: ProverType,
config: &'a Config,
geth_client: Option<Rc<RefCell<GethClient>>>,
) -> Result<Self> {
let mut m: HashMap<HardForkName, CircuitsHandlerBuilder> = HashMap::new(); let mut m: HashMap<HardForkName, CircuitsHandlerBuilder> = HashMap::new();
fn handler_builder( fn handler_builder(
@@ -99,11 +95,11 @@ impl<'a> CircuitsHandlerProvider<'a> {
); );
let provider = CircuitsHandlerProvider { let provider = CircuitsHandlerProvider {
prover_type,
config, config,
geth_client, geth_client,
circuits_handler_builder_map: m, circuits_handler_builder_map: m,
current_fork_name: None, current_fork_name: None,
current_prover_type: None,
current_circuit: None, current_circuit: None,
}; };
@@ -113,6 +109,7 @@ impl<'a> CircuitsHandlerProvider<'a> {
pub fn get_circuits_handler( pub fn get_circuits_handler(
&mut self, &mut self,
hard_fork_name: &String, hard_fork_name: &String,
prover_type: ProverType,
) -> Result<Rc<Box<dyn CircuitsHandler>>> { ) -> Result<Rc<Box<dyn CircuitsHandler>>> {
match &self.current_fork_name { match &self.current_fork_name {
Some(fork_name) if fork_name == hard_fork_name => { Some(fork_name) if fork_name == hard_fork_name => {
@@ -129,9 +126,10 @@ impl<'a> CircuitsHandlerProvider<'a> {
); );
if let Some(builder) = self.circuits_handler_builder_map.get(hard_fork_name) { if let Some(builder) = self.circuits_handler_builder_map.get(hard_fork_name) {
log::info!("building circuits handler for {hard_fork_name}"); log::info!("building circuits handler for {hard_fork_name}");
let handler = builder(self.prover_type, self.config, self.geth_client.clone()) let handler = builder(prover_type, self.config, self.geth_client.clone())
.expect("failed to build circuits handler"); .expect("failed to build circuits handler");
self.current_fork_name = Some(hard_fork_name.clone()); self.current_fork_name = Some(hard_fork_name.clone());
self.current_prover_type = Some(prover_type);
let rc_handler = Rc::new(handler); let rc_handler = Rc::new(handler);
self.current_circuit = Some(rc_handler.clone()); self.current_circuit = Some(rc_handler.clone());
Ok(rc_handler) Ok(rc_handler)
@@ -144,19 +142,23 @@ impl<'a> CircuitsHandlerProvider<'a> {
pub fn init_vks( pub fn init_vks(
&self, &self,
prover_type: ProverType, prover_types: Vec<ProverType>,
config: &'a Config, config: &'a Config,
geth_client: Option<Rc<RefCell<GethClient>>>, geth_client: Option<Rc<RefCell<GethClient>>>,
) -> Vec<String> { ) -> Vec<String> {
self.circuits_handler_builder_map self.circuits_handler_builder_map
.iter() .iter()
.flat_map(|(hard_fork_name, build)| { .flat_map(|(hard_fork_name, build)| {
let handler = build(prover_type, config, geth_client.clone()) let geth_client_clone = geth_client.clone();
prover_types
.iter()
.flat_map(move |prover_type| {
let handler = build(*prover_type, config, geth_client_clone.clone())
.expect("failed to build circuits handler"); .expect("failed to build circuits handler");
get_task_types(prover_type) get_task_types(*prover_type)
.into_iter() .into_iter()
.map(|task_type| { .map(move |task_type| {
let vk = handler let vk = handler
.get_vk(task_type) .get_vk(task_type)
.map_or("".to_string(), utils::encode_vk); .map_or("".to_string(), utils::encode_vk);
@@ -170,5 +172,7 @@ impl<'a> CircuitsHandlerProvider<'a> {
.collect::<Vec<String>>() .collect::<Vec<String>>()
}) })
.collect::<Vec<String>>() .collect::<Vec<String>>()
})
.collect()
} }
} }