mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-12 15:38:18 -05:00
Compare commits
2 Commits
v4.3.74-pe
...
feat/optim
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
22f546ddbb | ||
|
|
92ca47d35c |
@@ -51,8 +51,7 @@ func InitDB(config *Config) (*gorm.DB, error) {
|
||||
}
|
||||
|
||||
db, err := gorm.Open(postgres.Open(config.DSN), &gorm.Config{
|
||||
PrepareStmt: false,
|
||||
Logger: &tmpGormLogger,
|
||||
Logger: &tmpGormLogger,
|
||||
NowFunc: func() time.Time {
|
||||
// why set time to UTC.
|
||||
// if now set this, the inserted data time will use local timezone. like 2023-07-18 18:24:00 CST+8
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
@@ -63,47 +65,55 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
|
||||
|
||||
maxActiveAttempts := cp.cfg.ProverManager.ProversPerSession
|
||||
maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts
|
||||
|
||||
tmpChunkTask, getTaskError := cp.chunkOrm.GetAssignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts)
|
||||
if getTaskError != nil {
|
||||
log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
|
||||
// Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned`
|
||||
// chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
|
||||
if tmpChunkTask == nil {
|
||||
tmpChunkTask, getTaskError = cp.chunkOrm.GetUnassignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts)
|
||||
var chunkTask *orm.Chunk
|
||||
for i := 0; i < 5; i++ {
|
||||
var getTaskError error
|
||||
var tmpChunkTask *orm.Chunk
|
||||
tmpChunkTask, getTaskError = cp.chunkOrm.GetAssignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts)
|
||||
if getTaskError != nil {
|
||||
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
|
||||
log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
|
||||
// Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned`
|
||||
// chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
|
||||
if tmpChunkTask == nil {
|
||||
tmpChunkTask, getTaskError = cp.chunkOrm.GetUnassignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts)
|
||||
if getTaskError != nil {
|
||||
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
}
|
||||
|
||||
if tmpChunkTask == nil {
|
||||
log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx, tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts)
|
||||
if updateAttemptsErr != nil {
|
||||
log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
|
||||
if rowsAffected == 0 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
chunkTask = tmpChunkTask
|
||||
break
|
||||
}
|
||||
|
||||
if tmpChunkTask == nil {
|
||||
log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx, tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts)
|
||||
if updateAttemptsErr != nil {
|
||||
log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
|
||||
if rowsAffected == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if tmpChunkTask == nil {
|
||||
if chunkTask == nil {
|
||||
log.Debug("get empty unassigned chunk after retry 5 times", "height", getTaskParameter.ProverHeight)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
log.Info("start chunk generation session", "id", tmpChunkTask.Hash, "public key", taskCtx.PublicKey, "prover name", taskCtx.ProverName)
|
||||
log.Info("start chunk generation session", "id", chunkTask.Hash, "public key", taskCtx.PublicKey, "prover name", taskCtx.ProverName)
|
||||
|
||||
proverTask := orm.ProverTask{
|
||||
TaskID: tmpChunkTask.Hash,
|
||||
TaskID: chunkTask.Hash,
|
||||
ProverPublicKey: taskCtx.PublicKey,
|
||||
TaskType: int16(message.ProofTypeChunk),
|
||||
ProverName: taskCtx.ProverName,
|
||||
@@ -115,15 +125,15 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
|
||||
}
|
||||
|
||||
if err = cp.proverTaskOrm.InsertProverTask(ctx, &proverTask); err != nil {
|
||||
cp.recoverActiveAttempts(ctx, tmpChunkTask)
|
||||
log.Error("insert chunk prover task fail", "taskID", tmpChunkTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
|
||||
cp.recoverActiveAttempts(ctx, chunkTask)
|
||||
log.Error("insert chunk prover task fail", "taskID", chunkTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
|
||||
taskMsg, err := cp.formatProverTask(ctx, &proverTask)
|
||||
if err != nil {
|
||||
cp.recoverActiveAttempts(ctx, tmpChunkTask)
|
||||
log.Error("format prover task failure", "hash", tmpChunkTask.Hash, "err", err)
|
||||
cp.recoverActiveAttempts(ctx, chunkTask)
|
||||
log.Error("format prover task failure", "hash", chunkTask.Hash, "err", err)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
|
||||
|
||||
@@ -399,9 +399,5 @@ func (m *ProofReceiverLogic) updateProverTaskProof(ctx context.Context, proverTa
|
||||
if len(proofBytes) == 0 || marshalErr != nil {
|
||||
return fmt.Errorf("updateProverTaskProof marshal proof error:%w", marshalErr)
|
||||
}
|
||||
// only info this, don't insert to db
|
||||
log.Info("prover task pprof submit", "proverTask UUID", proverTask.UUID, "task id", proverTask.TaskID,
|
||||
"public key", proverTask.ProverPublicKey, "prover name", proverTask.ProverName, "proof", proofBytes)
|
||||
return nil
|
||||
//return m.proverTaskOrm.UpdateProverTaskProof(ctx, proverTask.UUID, proofBytes)
|
||||
return m.proverTaskOrm.UpdateProverTaskProof(ctx, proverTask.UUID, proofBytes)
|
||||
}
|
||||
|
||||
@@ -73,20 +73,16 @@ func (*Batch) TableName() string {
|
||||
// GetUnassignedBatch retrieves unassigned batch based on the specified limit.
|
||||
// The returned batch are sorted in ascending order by their index.
|
||||
func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
|
||||
db := o.db.WithContext(ctx)
|
||||
db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
|
||||
db = db.Where("total_attempts < ?", maxTotalAttempts)
|
||||
db = db.Where("active_attempts < ?", maxActiveAttempts)
|
||||
db = db.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady))
|
||||
|
||||
var batch Batch
|
||||
err := db.First(&batch).Error
|
||||
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
db := o.db.WithContext(ctx)
|
||||
sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT 1;",
|
||||
int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady))
|
||||
err := db.Raw(sql).Scan(&batch).Error
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Batch.GetUnassignedBatches error: %w", err)
|
||||
return nil, fmt.Errorf("Batch.GetUnassignedBatch error: %w", err)
|
||||
}
|
||||
if batch.Hash == "" {
|
||||
return nil, nil
|
||||
}
|
||||
return &batch, nil
|
||||
}
|
||||
@@ -94,20 +90,16 @@ func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTo
|
||||
// GetAssignedBatch retrieves assigned batch based on the specified limit.
|
||||
// The returned batch are sorted in ascending order by their index.
|
||||
func (o *Batch) GetAssignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
|
||||
db := o.db.WithContext(ctx)
|
||||
db = db.Where("proving_status = ?", int(types.ProvingTaskAssigned))
|
||||
db = db.Where("total_attempts < ?", maxTotalAttempts)
|
||||
db = db.Where("active_attempts < ?", maxActiveAttempts)
|
||||
db = db.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady))
|
||||
|
||||
var batch Batch
|
||||
err := db.First(&batch).Error
|
||||
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
db := o.db.WithContext(ctx)
|
||||
sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT 1;",
|
||||
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady))
|
||||
err := db.Raw(sql).Scan(&batch).Error
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Batch.GetAssignedBatches error: %w", err)
|
||||
return nil, fmt.Errorf("Batch.GetAssignedBatch error: %w", err)
|
||||
}
|
||||
if batch.Hash == "" {
|
||||
return nil, nil
|
||||
}
|
||||
return &batch, nil
|
||||
}
|
||||
|
||||
@@ -71,28 +71,10 @@ func (*Chunk) TableName() string {
|
||||
// GetUnassignedChunk retrieves unassigned chunk based on the specified limit.
|
||||
// The returned chunks are sorted in ascending order by their index.
|
||||
func (o *Chunk) GetUnassignedChunk(ctx context.Context, height int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) {
|
||||
//db := o.db.WithContext(ctx)
|
||||
//db = db.Model(&Chunk{})
|
||||
//db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
|
||||
//db = db.Where("active_attempts = 0")
|
||||
//db = db.Where("total_attempts < ?", maxTotalAttempts)
|
||||
////db = db.Where("active_attempts < ?", maxActiveAttempts)
|
||||
//db = db.Where("end_block_number <= ?", height)
|
||||
//
|
||||
//var chunk Chunk
|
||||
//err := db.First(&chunk).Error
|
||||
//if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
// return nil, nil
|
||||
//}
|
||||
//
|
||||
//if err != nil {
|
||||
// return nil, fmt.Errorf("Chunk.GetUnassignedChunks error: %w", err)
|
||||
//}
|
||||
//return &chunk, nil
|
||||
|
||||
var chunk Chunk
|
||||
db := o.db.WithContext(ctx)
|
||||
sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = 1 AND active_attempts = 0 AND total_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT 1;", maxTotalAttempts, height)
|
||||
sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT 1;",
|
||||
int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, height)
|
||||
err := db.Raw(sql).Scan(&chunk).Error
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Chunk.GetUnassignedChunks error: %w", err)
|
||||
@@ -106,28 +88,10 @@ func (o *Chunk) GetUnassignedChunk(ctx context.Context, height int, maxActiveAtt
|
||||
// GetAssignedChunk retrieves assigned chunk based on the specified limit.
|
||||
// The returned chunks are sorted in ascending order by their index.
|
||||
func (o *Chunk) GetAssignedChunk(ctx context.Context, height int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) {
|
||||
//db := o.db.WithContext(ctx)
|
||||
//db = db.Model(&Chunk{})
|
||||
//db = db.Where("proving_status = ?", int(types.ProvingTaskAssigned))
|
||||
//db = db.Where("active_attempts = 0")
|
||||
//db = db.Where("total_attempts < ?", maxTotalAttempts)
|
||||
////db = db.Where("active_attempts < ?", maxActiveAttempts)
|
||||
//db = db.Where("end_block_number <= ?", height)
|
||||
//
|
||||
//var chunk Chunk
|
||||
//err := db.First(&chunk).Error
|
||||
//if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
// return nil, nil
|
||||
//}
|
||||
//
|
||||
//if err != nil {
|
||||
// return nil, fmt.Errorf("Chunk.GetAssignedChunks error: %w", err)
|
||||
//}
|
||||
//return &chunk, nil
|
||||
|
||||
var chunk Chunk
|
||||
db := o.db.WithContext(ctx)
|
||||
sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = 2 AND active_attempts = 0 AND total_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT 1;", maxTotalAttempts, height)
|
||||
sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT 1;",
|
||||
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, height)
|
||||
err := db.Raw(sql).Scan(&chunk).Error
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Chunk.GetAssignedChunks error: %w", err)
|
||||
|
||||
@@ -2082,7 +2082,6 @@ gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0/go.mod h1:WDnlLJ4WF5VGsH/HVa
|
||||
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v3 v3.0.0-20191120175047-4206685974f2/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gorm.io/driver/sqlite v1.5.0 h1:zKYbzRCpBrT1bNijRnxLDJWPjVfImGEn0lSnUY5gZ+c=
|
||||
gotest.tools v1.4.0/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
|
||||
gotest.tools/v3 v3.5.0 h1:Ljk6PdHdOhAb5aDMWXjDLMMhph+BpztA4v1QdqEW2eY=
|
||||
gotest.tools/v3 v3.5.0/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
|
||||
|
||||
Reference in New Issue
Block a user