fix coordiantor assign bug (#1621)

Co-authored-by: georgehao <georgehao@users.noreply.github.com>
Co-authored-by: colinlyguo <colinlyguo@scroll.io>
This commit is contained in:
georgehao
2025-03-12 15:13:10 +08:00
committed by GitHub
parent 228cba48b7
commit 13c8605211
15 changed files with 142 additions and 120 deletions

View File

@@ -31,12 +31,12 @@ fmt: ## Format the code
cd $(PWD)/rollup/ && go mod tidy
cd $(PWD)/tests/integration-test/ && go mod tidy
goimports -local $(PWD)/bridge-history-api/ -w .
goimports -local $(PWD)/common/ -w .
goimports -local $(PWD)/coordinator/ -w .
goimports -local $(PWD)/database/ -w .
goimports -local $(PWD)/rollup/ -w .
goimports -local $(PWD)/tests/integration-test/ -w .
goimports -local scroll-tech/bridge-history-api/ -w .
goimports -local scroll-tech/common/ -w .
goimports -local scroll-tech/coordinator/ -w .
goimports -local scroll-tech/database/ -w .
goimports -local scroll-tech/rollup/ -w .
goimports -local scroll-tech/tests/integration-test/ -w .
dev_docker: ## Build docker images for development/testing usages
docker pull postgres

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "v4.4.95"
var tag = "v4.4.96"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -46,7 +46,7 @@ func (a *AuthController) Login(c *gin.Context) (interface{}, error) {
hardForkNames, err := a.loginLogic.ProverHardForkName(&login)
if err != nil {
return "", fmt.Errorf("prover hard name failure:%w", err)
return "", fmt.Errorf("prover hard fork name failure:%w", err)
}
// check the challenge is used, if used, return failure

View File

@@ -77,6 +77,7 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
}
var batchTask *orm.Batch
var hardForkName string
for i := 0; i < 5; i++ {
var getTaskError error
var tmpBatchTask *orm.Batch
@@ -101,10 +102,20 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
return nil, nil
}
taskCtx.taskType = message.ProofTypeBatch
taskCtx.batchTask = tmpBatchTask
var checkErr error
hardForkName, checkErr = bp.hardForkSanityCheck(ctx, taskCtx)
if checkErr != nil {
log.Debug("hard fork sanity check failed", "height", getTaskParameter.ProverHeight, "err", checkErr)
return nil, nil
}
// Don't dispatch the same failing job to the same prover
proverTasks, getTaskError := bp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, 2)
if getTaskError != nil {
log.Error("failed to get prover tasks", "proof type", message.ProofTypeBatch.String(), "task ID", tmpBatchTask.Hash, "error", getTaskError)
proverTasks, getFailedTaskError := bp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, 2)
if getFailedTaskError != nil {
log.Error("failed to get prover tasks", "proof type", message.ProofTypeBatch.String(), "task ID", tmpBatchTask.Hash, "error", getFailedTaskError)
return nil, ErrCoordinatorInternalFailure
}
for i := 0; i < len(proverTasks); i++ {
@@ -135,22 +146,6 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
return nil, nil
}
hardForkName, getHardForkErr := bp.hardForkName(ctx, batchTask)
if getHardForkErr != nil {
bp.recoverActiveAttempts(ctx, batchTask)
log.Error("retrieve hard fork name by batch failed", "task_id", batchTask.Hash, "err", getHardForkErr)
return nil, ErrCoordinatorInternalFailure
}
if _, ok := taskCtx.HardForkNames[hardForkName]; !ok {
bp.recoverActiveAttempts(ctx, batchTask)
log.Debug("incompatible prover version",
"requisite hard fork name", hardForkName,
"prover hard fork name", taskCtx.HardForkNames,
"task_id", batchTask.Hash)
return nil, nil
}
log.Info("start batch proof generation session", "task_id", batchTask.Hash, "public key", taskCtx.PublicKey, "prover name", taskCtx.ProverName)
proverTask := orm.ProverTask{
TaskID: batchTask.Hash,
@@ -188,20 +183,6 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
return taskMsg, nil
}
func (bp *BatchProverTask) hardForkName(ctx *gin.Context, batchTask *orm.Batch) (string, error) {
startChunk, getChunkErr := bp.chunkOrm.GetChunkByHash(ctx, batchTask.StartChunkHash)
if getChunkErr != nil {
return "", getChunkErr
}
l2Block, getBlockErr := bp.blockOrm.GetL2BlockByNumber(ctx.Copy(), startChunk.StartBlockNumber)
if getBlockErr != nil {
return "", getBlockErr
}
hardForkName := encoding.GetHardforkName(bp.chainCfg, l2Block.Number, l2Block.BlockTimestamp)
return hardForkName, nil
}
func (bp *BatchProverTask) formatProverTask(ctx context.Context, task *orm.ProverTask, batch *orm.Batch, hardForkName string) (*coordinatorType.GetTaskSchema, error) {
// get chunk from db
chunks, err := bp.chunkOrm.GetChunksByBatchHash(ctx, task.TaskID)

View File

@@ -9,19 +9,18 @@ import (
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/scroll-tech/da-codec/encoding"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/params"
"gorm.io/gorm"
"scroll-tech/common/types"
"scroll-tech/common/types/message"
"scroll-tech/common/utils"
"scroll-tech/coordinator/internal/config"
"scroll-tech/coordinator/internal/orm"
coordinatorType "scroll-tech/coordinator/internal/types"
cutils "scroll-tech/coordinator/internal/utils"
"scroll-tech/common/types"
"scroll-tech/common/types/message"
"scroll-tech/common/utils"
)
// BundleProverTask is prover task implement for bundle proof
@@ -77,6 +76,7 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
}
var bundleTask *orm.Bundle
var hardForkName string
for i := 0; i < 5; i++ {
var getTaskError error
var tmpBundleTask *orm.Bundle
@@ -101,6 +101,16 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
return nil, nil
}
taskCtx.taskType = message.ProofTypeBundle
taskCtx.bundleTask = tmpBundleTask
var checkErr error
hardForkName, checkErr = bp.hardForkSanityCheck(ctx, taskCtx)
if checkErr != nil {
log.Debug("hard fork sanity check failed", "height", getTaskParameter.ProverHeight, "err", checkErr)
return nil, nil
}
// Don't dispatch the same failing job to the same prover
proverTasks, getTaskError := bp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeBundle, tmpBundleTask.Hash, 2)
if getTaskError != nil {
@@ -135,22 +145,6 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
return nil, nil
}
hardForkName, getHardForkErr := bp.hardForkName(ctx, bundleTask)
if getHardForkErr != nil {
bp.recoverActiveAttempts(ctx, bundleTask)
log.Error("retrieve hard fork name by bundle failed", "task_id", bundleTask.Hash, "err", getHardForkErr)
return nil, ErrCoordinatorInternalFailure
}
if _, ok := taskCtx.HardForkNames[hardForkName]; !ok {
bp.recoverActiveAttempts(ctx, bundleTask)
log.Debug("incompatible prover version",
"requisite hard fork name", hardForkName,
"prover hard fork name", taskCtx.HardForkNames,
"task_id", bundleTask.Hash)
return nil, nil
}
log.Info("start bundle proof generation session", "task index", bundleTask.Index, "public key", taskCtx.PublicKey, "prover name", taskCtx.ProverName)
proverTask := orm.ProverTask{
TaskID: bundleTask.Hash,
@@ -188,26 +182,6 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
return taskMsg, nil
}
func (bp *BundleProverTask) hardForkName(ctx *gin.Context, bundleTask *orm.Bundle) (string, error) {
startBatch, getBatchErr := bp.batchOrm.GetBatchByHash(ctx, bundleTask.StartBatchHash)
if getBatchErr != nil {
return "", getBatchErr
}
startChunk, getChunkErr := bp.chunkOrm.GetChunkByHash(ctx, startBatch.StartChunkHash)
if getChunkErr != nil {
return "", getChunkErr
}
l2Block, getBlockErr := bp.blockOrm.GetL2BlockByNumber(ctx.Copy(), startChunk.StartBlockNumber)
if getBlockErr != nil {
return "", getBlockErr
}
hardForkName := encoding.GetHardforkName(bp.chainCfg, l2Block.Number, l2Block.BlockTimestamp)
return hardForkName, nil
}
func (bp *BundleProverTask) formatProverTask(ctx context.Context, task *orm.ProverTask, hardForkName string) (*coordinatorType.GetTaskSchema, error) {
// get bundle from db
batches, err := bp.batchOrm.GetBatchesByBundleHash(ctx, task.TaskID)

View File

@@ -9,7 +9,6 @@ import (
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/scroll-tech/da-codec/encoding"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/params"
"gorm.io/gorm"
@@ -75,6 +74,7 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
}
var chunkTask *orm.Chunk
var hardForkName string
for i := 0; i < 5; i++ {
var getTaskError error
var tmpChunkTask *orm.Chunk
@@ -99,10 +99,20 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
return nil, nil
}
taskCtx.taskType = message.ProofTypeChunk
taskCtx.chunkTask = tmpChunkTask
var checkErr error
hardForkName, checkErr = cp.hardForkSanityCheck(ctx, taskCtx)
if checkErr != nil {
log.Debug("hard fork sanity check failed", "height", getTaskParameter.ProverHeight, "err", checkErr)
return nil, nil
}
// Don't dispatch the same failing job to the same prover
proverTasks, getTaskError := cp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, 2)
if getTaskError != nil {
log.Error("failed to get prover tasks", "proof type", message.ProofTypeChunk.String(), "task ID", tmpChunkTask.Hash, "error", getTaskError)
proverTasks, getFailedTaskError := cp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, 2)
if getFailedTaskError != nil {
log.Error("failed to get prover tasks", "proof type", message.ProofTypeChunk.String(), "task ID", tmpChunkTask.Hash, "error", getFailedTaskError)
return nil, ErrCoordinatorInternalFailure
}
for i := 0; i < len(proverTasks); i++ {
@@ -133,22 +143,6 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
return nil, nil
}
hardForkName, getHardForkErr := cp.hardForkName(ctx, chunkTask)
if getHardForkErr != nil {
cp.recoverActiveAttempts(ctx, chunkTask)
log.Error("retrieve hard fork name by chunk failed", "task_id", chunkTask.Hash, "err", getHardForkErr)
return nil, ErrCoordinatorInternalFailure
}
if _, ok := taskCtx.HardForkNames[hardForkName]; !ok {
cp.recoverActiveAttempts(ctx, chunkTask)
log.Debug("incompatible prover version",
"requisite hard fork name", hardForkName,
"prover hard fork name", taskCtx.HardForkNames,
"task_id", chunkTask.Hash)
return nil, nil
}
log.Info("start chunk generation session", "task_id", chunkTask.Hash, "public key", taskCtx.PublicKey, "prover name", taskCtx.ProverName)
proverTask := orm.ProverTask{
TaskID: chunkTask.Hash,
@@ -185,15 +179,6 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
return taskMsg, nil
}
func (cp *ChunkProverTask) hardForkName(ctx *gin.Context, chunkTask *orm.Chunk) (string, error) {
l2Block, getBlockErr := cp.blockOrm.GetL2BlockByNumber(ctx.Copy(), chunkTask.StartBlockNumber)
if getBlockErr != nil {
return "", getBlockErr
}
hardForkName := encoding.GetHardforkName(cp.chainCfg, l2Block.Number, l2Block.BlockTimestamp)
return hardForkName, nil
}
func (cp *ChunkProverTask) formatProverTask(ctx context.Context, task *orm.ProverTask, hardForkName string) (*coordinatorType.GetTaskSchema, error) {
// Get block hashes.
blockHashes, dbErr := cp.blockOrm.GetL2BlockHashesByChunkHash(ctx, task.TaskID)

View File

@@ -9,9 +9,12 @@ import (
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/scroll-tech/da-codec/encoding"
"github.com/scroll-tech/go-ethereum/params"
"gorm.io/gorm"
"scroll-tech/common/types/message"
"scroll-tech/coordinator/internal/config"
"scroll-tech/coordinator/internal/orm"
coordinatorType "scroll-tech/coordinator/internal/types"
@@ -52,6 +55,76 @@ type proverTaskContext struct {
ProverVersion string
ProverProviderType uint8
HardForkNames map[string]struct{}
taskType message.ProofType
chunkTask *orm.Chunk
batchTask *orm.Batch
bundleTask *orm.Bundle
}
// hardForkName get the chunk/batch/bundle hard fork name
func (b *BaseProverTask) hardForkName(ctx *gin.Context, taskCtx *proverTaskContext) (string, error) {
switch {
case taskCtx.taskType == message.ProofTypeChunk:
if taskCtx.chunkTask == nil {
return "", errors.New("chunk task is nil")
}
l2Block, getBlockErr := b.blockOrm.GetL2BlockByNumber(ctx.Copy(), taskCtx.chunkTask.StartBlockNumber)
if getBlockErr != nil {
return "", getBlockErr
}
hardForkName := encoding.GetHardforkName(b.chainCfg, l2Block.Number, l2Block.BlockTimestamp)
return hardForkName, nil
case taskCtx.taskType == message.ProofTypeBatch:
if taskCtx.batchTask == nil {
return "", errors.New("batch task is nil")
}
startChunk, getChunkErr := b.chunkOrm.GetChunkByHash(ctx, taskCtx.batchTask.StartChunkHash)
if getChunkErr != nil {
return "", getChunkErr
}
l2Block, getBlockErr := b.blockOrm.GetL2BlockByNumber(ctx.Copy(), startChunk.StartBlockNumber)
if getBlockErr != nil {
return "", getBlockErr
}
hardForkName := encoding.GetHardforkName(b.chainCfg, l2Block.Number, l2Block.BlockTimestamp)
return hardForkName, nil
case taskCtx.taskType == message.ProofTypeBundle:
if taskCtx.bundleTask == nil {
return "", errors.New("bundle task is nil")
}
startBatch, getBatchErr := b.batchOrm.GetBatchByHash(ctx, taskCtx.bundleTask.StartBatchHash)
if getBatchErr != nil {
return "", getBatchErr
}
startChunk, getChunkErr := b.chunkOrm.GetChunkByHash(ctx, startBatch.StartChunkHash)
if getChunkErr != nil {
return "", getChunkErr
}
l2Block, getBlockErr := b.blockOrm.GetL2BlockByNumber(ctx.Copy(), startChunk.StartBlockNumber)
if getBlockErr != nil {
return "", getBlockErr
}
hardForkName := encoding.GetHardforkName(b.chainCfg, l2Block.Number, l2Block.BlockTimestamp)
return hardForkName, nil
default:
return "", errors.New("illegal task type")
}
}
// hardForkSanityCheck check the task's hard fork name is the same as prover
func (b *BaseProverTask) hardForkSanityCheck(ctx *gin.Context, taskCtx *proverTaskContext) (string, error) {
hardForkName, getHardForkErr := b.hardForkName(ctx, taskCtx)
if getHardForkErr != nil {
return "", getHardForkErr
}
if _, ok := taskCtx.HardForkNames[hardForkName]; !ok {
return "", errors.New("to be assigned prover task's hard-fork name is not the same as prover")
}
return hardForkName, nil
}
// checkParameter check the prover task parameter illegal

View File

@@ -9,11 +9,12 @@ import (
"github.com/stretchr/testify/assert"
"gorm.io/gorm"
"scroll-tech/database/migrate"
"scroll-tech/common/testcontainers"
"scroll-tech/common/types"
"scroll-tech/common/types/message"
"scroll-tech/common/utils"
"scroll-tech/database/migrate"
)
var (

View File

@@ -20,11 +20,12 @@ import (
"github.com/stretchr/testify/assert"
"gorm.io/gorm"
"scroll-tech/database/migrate"
"scroll-tech/common/testcontainers"
"scroll-tech/common/types"
"scroll-tech/common/types/message"
"scroll-tech/common/version"
"scroll-tech/database/migrate"
"scroll-tech/coordinator/internal/config"
"scroll-tech/coordinator/internal/controller/api"

View File

@@ -3,9 +3,10 @@ package config
import (
"fmt"
"reflect"
"scroll-tech/common/database"
"strings"
"scroll-tech/common/database"
"github.com/mitchellh/mapstructure"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/rpc"

View File

@@ -27,9 +27,10 @@ import (
"github.com/stretchr/testify/assert"
"gorm.io/gorm"
"scroll-tech/database/migrate"
"scroll-tech/common/testcontainers"
"scroll-tech/common/types"
"scroll-tech/database/migrate"
bridgeAbi "scroll-tech/rollup/abi"
"scroll-tech/rollup/internal/config"

View File

@@ -11,9 +11,10 @@ import (
"github.com/stretchr/testify/assert"
"gorm.io/gorm"
"scroll-tech/database/migrate"
"scroll-tech/common/database"
"scroll-tech/common/testcontainers"
"scroll-tech/database/migrate"
"scroll-tech/rollup/internal/config"
)

View File

@@ -13,10 +13,11 @@ import (
"github.com/stretchr/testify/assert"
"gorm.io/gorm"
"scroll-tech/database/migrate"
"scroll-tech/common/testcontainers"
"scroll-tech/common/types"
"scroll-tech/common/types/message"
"scroll-tech/database/migrate"
"scroll-tech/rollup/internal/utils"
)

View File

@@ -22,10 +22,11 @@ import (
"github.com/stretchr/testify/assert"
"gorm.io/gorm"
"scroll-tech/database/migrate"
"scroll-tech/common/database"
tc "scroll-tech/common/testcontainers"
"scroll-tech/common/utils"
"scroll-tech/database/migrate"
bcmd "scroll-tech/rollup/cmd"
"scroll-tech/rollup/mock_bridge"

View File

@@ -12,11 +12,13 @@ import (
"github.com/stretchr/testify/assert"
"gorm.io/gorm"
"scroll-tech/integration-test/orm"
"scroll-tech/database/migrate"
"scroll-tech/common/testcontainers"
"scroll-tech/common/utils"
"scroll-tech/common/version"
"scroll-tech/database/migrate"
"scroll-tech/integration-test/orm"
capp "scroll-tech/coordinator/cmd/api/app"