Compare commits

..

2 Commits

Author SHA1 Message Date
georgehao
22f546ddbb feat: update 2024-04-01 22:26:12 +08:00
georgehao
92ca47d35c feat: optimize pg sql 2024-04-01 22:06:59 +08:00
6 changed files with 66 additions and 106 deletions

View File

@@ -51,8 +51,7 @@ func InitDB(config *Config) (*gorm.DB, error) {
}
db, err := gorm.Open(postgres.Open(config.DSN), &gorm.Config{
PrepareStmt: false,
Logger: &tmpGormLogger,
Logger: &tmpGormLogger,
NowFunc: func() time.Time {
// why set time to UTC.
// if now set this, the inserted data time will use local timezone. like 2023-07-18 18:24:00 CST+8

View File

@@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -63,47 +65,55 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
maxActiveAttempts := cp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts
tmpChunkTask, getTaskError := cp.chunkOrm.GetAssignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts)
if getTaskError != nil {
log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
// Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned`
// chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
if tmpChunkTask == nil {
tmpChunkTask, getTaskError = cp.chunkOrm.GetUnassignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts)
var chunkTask *orm.Chunk
for i := 0; i < 5; i++ {
var getTaskError error
var tmpChunkTask *orm.Chunk
tmpChunkTask, getTaskError = cp.chunkOrm.GetAssignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts)
if getTaskError != nil {
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
// Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned`
// chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
if tmpChunkTask == nil {
tmpChunkTask, getTaskError = cp.chunkOrm.GetUnassignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts)
if getTaskError != nil {
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
}
if tmpChunkTask == nil {
log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight)
return nil, nil
}
rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx, tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts)
if updateAttemptsErr != nil {
log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
return nil, ErrCoordinatorInternalFailure
}
if rowsAffected == 0 {
time.Sleep(100 * time.Millisecond)
continue
}
chunkTask = tmpChunkTask
break
}
if tmpChunkTask == nil {
log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight)
return nil, nil
}
rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx, tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts)
if updateAttemptsErr != nil {
log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
return nil, ErrCoordinatorInternalFailure
}
if rowsAffected == 0 {
return nil, nil
}
if tmpChunkTask == nil {
if chunkTask == nil {
log.Debug("get empty unassigned chunk after retry 5 times", "height", getTaskParameter.ProverHeight)
return nil, nil
}
log.Info("start chunk generation session", "id", tmpChunkTask.Hash, "public key", taskCtx.PublicKey, "prover name", taskCtx.ProverName)
log.Info("start chunk generation session", "id", chunkTask.Hash, "public key", taskCtx.PublicKey, "prover name", taskCtx.ProverName)
proverTask := orm.ProverTask{
TaskID: tmpChunkTask.Hash,
TaskID: chunkTask.Hash,
ProverPublicKey: taskCtx.PublicKey,
TaskType: int16(message.ProofTypeChunk),
ProverName: taskCtx.ProverName,
@@ -115,15 +125,15 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
}
if err = cp.proverTaskOrm.InsertProverTask(ctx, &proverTask); err != nil {
cp.recoverActiveAttempts(ctx, tmpChunkTask)
log.Error("insert chunk prover task fail", "taskID", tmpChunkTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
cp.recoverActiveAttempts(ctx, chunkTask)
log.Error("insert chunk prover task fail", "taskID", chunkTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
return nil, ErrCoordinatorInternalFailure
}
taskMsg, err := cp.formatProverTask(ctx, &proverTask)
if err != nil {
cp.recoverActiveAttempts(ctx, tmpChunkTask)
log.Error("format prover task failure", "hash", tmpChunkTask.Hash, "err", err)
cp.recoverActiveAttempts(ctx, chunkTask)
log.Error("format prover task failure", "hash", chunkTask.Hash, "err", err)
return nil, ErrCoordinatorInternalFailure
}

View File

@@ -399,9 +399,5 @@ func (m *ProofReceiverLogic) updateProverTaskProof(ctx context.Context, proverTa
if len(proofBytes) == 0 || marshalErr != nil {
return fmt.Errorf("updateProverTaskProof marshal proof error:%w", marshalErr)
}
// only info this, don't insert to db
log.Info("prover task pprof submit", "proverTask UUID", proverTask.UUID, "task id", proverTask.TaskID,
"public key", proverTask.ProverPublicKey, "prover name", proverTask.ProverName, "proof", proofBytes)
return nil
//return m.proverTaskOrm.UpdateProverTaskProof(ctx, proverTask.UUID, proofBytes)
return m.proverTaskOrm.UpdateProverTaskProof(ctx, proverTask.UUID, proofBytes)
}

View File

@@ -73,20 +73,16 @@ func (*Batch) TableName() string {
// GetUnassignedBatch retrieves unassigned batch based on the specified limit.
// The returned batch are sorted in ascending order by their index.
func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
db := o.db.WithContext(ctx)
db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady))
var batch Batch
err := db.First(&batch).Error
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT 1;",
int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady))
err := db.Raw(sql).Scan(&batch).Error
if err != nil {
return nil, fmt.Errorf("Batch.GetUnassignedBatches error: %w", err)
return nil, fmt.Errorf("Batch.GetUnassignedBatch error: %w", err)
}
if batch.Hash == "" {
return nil, nil
}
return &batch, nil
}
@@ -94,20 +90,16 @@ func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTo
// GetAssignedBatch retrieves assigned batch based on the specified limit.
// The returned batch are sorted in ascending order by their index.
func (o *Batch) GetAssignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
db := o.db.WithContext(ctx)
db = db.Where("proving_status = ?", int(types.ProvingTaskAssigned))
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady))
var batch Batch
err := db.First(&batch).Error
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT 1;",
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady))
err := db.Raw(sql).Scan(&batch).Error
if err != nil {
return nil, fmt.Errorf("Batch.GetAssignedBatches error: %w", err)
return nil, fmt.Errorf("Batch.GetAssignedBatch error: %w", err)
}
if batch.Hash == "" {
return nil, nil
}
return &batch, nil
}

View File

@@ -71,28 +71,10 @@ func (*Chunk) TableName() string {
// GetUnassignedChunk retrieves unassigned chunk based on the specified limit.
// The returned chunks are sorted in ascending order by their index.
func (o *Chunk) GetUnassignedChunk(ctx context.Context, height int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) {
//db := o.db.WithContext(ctx)
//db = db.Model(&Chunk{})
//db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
//db = db.Where("active_attempts = 0")
//db = db.Where("total_attempts < ?", maxTotalAttempts)
////db = db.Where("active_attempts < ?", maxActiveAttempts)
//db = db.Where("end_block_number <= ?", height)
//
//var chunk Chunk
//err := db.First(&chunk).Error
//if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
// return nil, nil
//}
//
//if err != nil {
// return nil, fmt.Errorf("Chunk.GetUnassignedChunks error: %w", err)
//}
//return &chunk, nil
var chunk Chunk
db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = 1 AND active_attempts = 0 AND total_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT 1;", maxTotalAttempts, height)
sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT 1;",
int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, height)
err := db.Raw(sql).Scan(&chunk).Error
if err != nil {
return nil, fmt.Errorf("Chunk.GetUnassignedChunks error: %w", err)
@@ -106,28 +88,10 @@ func (o *Chunk) GetUnassignedChunk(ctx context.Context, height int, maxActiveAtt
// GetAssignedChunk retrieves assigned chunk based on the specified limit.
// The returned chunks are sorted in ascending order by their index.
func (o *Chunk) GetAssignedChunk(ctx context.Context, height int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) {
//db := o.db.WithContext(ctx)
//db = db.Model(&Chunk{})
//db = db.Where("proving_status = ?", int(types.ProvingTaskAssigned))
//db = db.Where("active_attempts = 0")
//db = db.Where("total_attempts < ?", maxTotalAttempts)
////db = db.Where("active_attempts < ?", maxActiveAttempts)
//db = db.Where("end_block_number <= ?", height)
//
//var chunk Chunk
//err := db.First(&chunk).Error
//if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
// return nil, nil
//}
//
//if err != nil {
// return nil, fmt.Errorf("Chunk.GetAssignedChunks error: %w", err)
//}
//return &chunk, nil
var chunk Chunk
db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = 2 AND active_attempts = 0 AND total_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT 1;", maxTotalAttempts, height)
sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT 1;",
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, height)
err := db.Raw(sql).Scan(&chunk).Error
if err != nil {
return nil, fmt.Errorf("Chunk.GetAssignedChunks error: %w", err)

View File

@@ -2082,7 +2082,6 @@ gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0/go.mod h1:WDnlLJ4WF5VGsH/HVa
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20191120175047-4206685974f2/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/sqlite v1.5.0 h1:zKYbzRCpBrT1bNijRnxLDJWPjVfImGEn0lSnUY5gZ+c=
gotest.tools v1.4.0/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.5.0 h1:Ljk6PdHdOhAb5aDMWXjDLMMhph+BpztA4v1QdqEW2eY=
gotest.tools/v3 v3.5.0/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=