Compare commits

..

2 Commits

Author SHA1 Message Date
Mengran Lan
e45838f3ac feat(coordinator): coordinator support multi-circuits prover (#1351)
Co-authored-by: georgehao <haohongfan@gmail.com>
Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>
Co-authored-by: colinlyguo <colinlyguo@scroll.io>
2024-06-05 14:35:48 +08:00
petercover
acd1432d44 chore: fix function name in comment (#1335)
Signed-off-by: petercover <raowanxiang@outlook.com>
Co-authored-by: georgehao <haohongfan@gmail.com>
2024-06-05 10:35:54 +08:00
12 changed files with 31 additions and 278 deletions

View File

@@ -154,11 +154,10 @@ func (t *TestcontainerApps) GetL2GethEndPoint() (string, error) {
// GetGormDBClient returns a gorm.DB by connecting to the running postgres container
func (t *TestcontainerApps) GetGormDBClient() (*gorm.DB, error) {
// endpoint, err := t.GetDBEndPoint()
// if err != nil {
// return nil, err
// }
endpoint := "postgres://lmr:@localhost:5432/unittest?sslmode=disable"
endpoint, err := t.GetDBEndPoint()
if err != nil {
return nil, err
}
dbCfg := &database.Config{
DSN: endpoint,
DriverName: "postgres",

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "v4.4.9"
var tag = "v4.4.10"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -18,7 +18,6 @@ import (
"scroll-tech/coordinator/internal/logic/provertask"
"scroll-tech/coordinator/internal/logic/verifier"
coordinatorType "scroll-tech/coordinator/internal/types"
itypes "scroll-tech/coordinator/internal/types"
)
// GetTaskController the get prover task api controller
@@ -71,10 +70,6 @@ func (ptc *GetTaskController) incGetTaskAccessCounter(ctx *gin.Context) error {
// GetTasks get assigned chunk/batch task
func (ptc *GetTaskController) GetTasks(ctx *gin.Context) {
ctx.Set(itypes.PublicKey, "fake_public_key2")
ctx.Set(itypes.ProverName, "test")
ctx.Set(itypes.ProverVersion, "v4.4.9-000000-000000-000000")
var getTaskParameter coordinatorType.GetTaskParameter
if err := ctx.ShouldBind(&getTaskParameter); err != nil {
nerr := fmt.Errorf("prover task parameter invalid, err:%w", err)

View File

@@ -82,7 +82,7 @@ func (r *chunkIndexRange) merge(o chunkIndexRange) *chunkIndexRange {
}
func (r *chunkIndexRange) contains(start, end uint64) bool {
return r.start <= start && r.end >= end+1
return r.start <= start && r.end > end
}
type getHardForkNameByBatchFunc func(*orm.Batch) (string, error)
@@ -146,7 +146,7 @@ func (bp *BatchProverTask) doAssignTaskWithinChunkRange(ctx *gin.Context, taskCt
if getHardForkName != nil {
hardForkName, err = getHardForkName(batchTask)
if err != nil {
log.Error("failed to get version by chunk", "error", err.Error())
log.Error("failed to get hard fork name by batch", "error", err.Error())
return nil, ErrCoordinatorInternalFailure
}
}
@@ -222,7 +222,6 @@ func (bp *BatchProverTask) getChunkRangeByName(ctx *gin.Context, hardForkName st
endChunkIndex = toChunk.Index
}
}
fmt.Printf("%s index range %+v\n", hardForkName, &chunkIndexRange{startChunkIndex, endChunkIndex})
return &chunkIndexRange{startChunkIndex, endChunkIndex}, nil
}

View File

@@ -122,12 +122,12 @@ func (cp *ChunkProverTask) doAssignTaskWithinBlockRange(ctx *gin.Context, taskCt
var (
proverVersion = taskCtx.ProverVersion
hardForkName = taskCtx.HardForkName
err error
)
var err error
if getHardForkName != nil {
hardForkName, err = getHardForkName(chunkTask)
if err != nil {
log.Error("failed to get version by chunk", "error", err.Error())
log.Error("failed to get hard fork name by chunk", "error", err.Error())
return nil, ErrCoordinatorInternalFailure
}
}
@@ -229,7 +229,7 @@ func (r *blockRange) merge(o blockRange) (*blockRange, error) {
}
func (r *blockRange) contains(start, end uint64) bool {
return r.from <= start && r.to >= end+1
return r.from <= start && r.to > end
}
func (cp *ChunkProverTask) getBlockRangeByName(hardForkName string) (*blockRange, error) {
@@ -251,7 +251,6 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
}
if len(getTaskParameter.VKs) > 0 {
return cp.assignWithTwoCircuits(ctx, taskCtx, getTaskParameter)
}
return cp.assignWithSingleCircuit(ctx, taskCtx, getTaskParameter)

View File

@@ -94,13 +94,7 @@ func (b *BaseProverTask) checkParameter(ctx *gin.Context, getTaskParameter *coor
if len(getTaskParameter.VKs) != 2 {
return nil, fmt.Errorf("parameter vks length must be 2")
}
// min prover version supporting multi circuits, maybe put it to config file?
var minMultiCircuitsProverVersion = "v4.4.7"
if !version.CheckScrollRepoVersion(ptc.ProverVersion, minMultiCircuitsProverVersion) {
return nil, fmt.Errorf("incompatible prover version. please upgrade your prover, minimum allowed version: %s, actual version: %s", minMultiCircuitsProverVersion, ptc.ProverVersion)
}
for _, vk := range getTaskParameter.VKs {
fmt.Printf("%+v\n", b.reverseVkMap)
if _, exists := b.reverseVkMap[vk]; !exists {
return nil, fmt.Errorf("incompatible vk. vk %s is invalid", vk)
}
@@ -131,10 +125,10 @@ func (b *BaseProverTask) checkParameter(ctx *gin.Context, getTaskParameter *coor
isBlocked, err := b.proverBlockListOrm.IsPublicKeyBlocked(ctx.Copy(), publicKey.(string))
if err != nil {
return nil, fmt.Errorf("failed to check whether the public key %s is blocked before assigning a chunk task, err: %w, proverName: %s", publicKey, err, proverName)
return nil, fmt.Errorf("failed to check whether the public key %s is blocked before assigning a chunk task, err: %w, proverName: %s, proverVersion: %s", publicKey, err, proverName, proverVersion)
}
if isBlocked {
return nil, fmt.Errorf("public key %s is blocked from fetching tasks. ProverName: %s, ProverVersion: %s", publicKey, proverName, ptc.ProverVersion)
return nil, fmt.Errorf("public key %s is blocked from fetching tasks. ProverName: %s, ProverVersion: %s", publicKey, proverName, proverVersion)
}
isAssigned, err := b.proverTaskOrm.IsProverAssigned(ctx.Copy(), publicKey.(string))
@@ -143,7 +137,7 @@ func (b *BaseProverTask) checkParameter(ctx *gin.Context, getTaskParameter *coor
}
if isAssigned {
return nil, fmt.Errorf("prover with publicKey %s is already assigned a task. ProverName: %s", publicKey, proverName)
return nil, fmt.Errorf("prover with publicKey %s is already assigned a task. ProverName: %s, ProverVersion: %s", publicKey, proverName, proverVersion)
}
return &ptc, nil
}

View File

@@ -12,17 +12,17 @@ import (
func NewVerifier(cfg *config.VerifierConfig) (*Verifier, error) {
batchVKMap := map[string]string{
"shanghai": "",
"bernoulli": "bernoulli",
"london": "london",
"istanbul": "istanbul",
"bernoulli": "",
"london": "",
"istanbul": "",
"homestead": "",
"eip155": "",
}
chunkVKMap := map[string]string{
"shanghai": "",
"bernoulli": "bernoulli",
"london": "london",
"istanbul": "istanbul",
"bernoulli": "",
"london": "",
"istanbul": "",
"homestead": "",
"eip155": "",
}

View File

@@ -32,7 +32,7 @@ func v1(router *gin.RouterGroup, conf *config.Config) {
r.POST("/login", challengeMiddleware.MiddlewareFunc(), loginMiddleware.LoginHandler)
// need jwt token api
// r.Use(loginMiddleware.MiddlewareFunc())
r.Use(loginMiddleware.MiddlewareFunc())
{
r.POST("/get_task", api.GetTask.GetTasks)
r.POST("/submit_proof", api.SubmitProof.SubmitProof)

View File

@@ -4,8 +4,8 @@ package types
type GetTaskParameter struct {
ProverHeight uint64 `form:"prover_height" json:"prover_height"`
TaskType int `form:"task_type" json:"task_type"`
VK string `form:"vk" json:"vk"`
VKs []string `form:"vks" json:"vks"`
VK string `form:"vk" json:"vk"` // will be deprecated after all go_prover offline
VKs []string `form:"vks" json:"vks"` // for rust_prover that supporting multi-circuits
}
// GetTaskSchema the schema data return to prover for get prover task

View File

@@ -18,7 +18,6 @@ import (
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/params"
"github.com/stretchr/testify/assert"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"scroll-tech/common/testcontainers"
@@ -27,7 +26,6 @@ import (
"scroll-tech/common/version"
"scroll-tech/database/migrate"
cutils "scroll-tech/common/utils"
"scroll-tech/coordinator/internal/config"
"scroll-tech/coordinator/internal/controller/api"
"scroll-tech/coordinator/internal/controller/cron"
@@ -82,25 +80,6 @@ func randomURL() string {
return fmt.Sprintf("localhost:%d", 10000+2000+id.Int64())
}
func useLocalDB(dsn string) *gorm.DB {
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{
// Logger: &tmpGormLogger,
NowFunc: func() time.Time {
// why set time to UTC.
// if now set this, the inserted data time will use local timezone. like 2023-07-18 18:24:00 CST+8
// but when inserted, store to postgres is 2023-07-18 18:24:00 UTC+0 the timezone is incorrect.
// As mysql dsn user:pass@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local, we cant set
// the timezone by loc=Local. but postgres's dsn don't have loc option to set timezone, so just need set the gorm option like that.
return cutils.NowUTC()
},
})
if err != nil {
fmt.Println("failed to init db", err.Error())
panic(err.Error())
}
return db
}
func setupCoordinator(t *testing.T, proversPerSession uint8, coordinatorURL string, nameForkMap map[string]int64) (*cron.Collector, *http.Server) {
var err error
db, err = testApps.GetGormDBClient()
@@ -221,14 +200,14 @@ func TestApis(t *testing.T) {
// Set up the test environment.
setEnv(t)
// t.Run("TestHandshake", testHandshake)
// t.Run("TestFailedHandshake", testFailedHandshake)
// t.Run("TestGetTaskBlocked", testGetTaskBlocked)
// t.Run("TestOutdatedProverVersion", testOutdatedProverVersion)
// t.Run("TestValidProof", testValidProof)
// t.Run("TestInvalidProof", testInvalidProof)
// t.Run("TestProofGeneratedFailed", testProofGeneratedFailed)
// t.Run("TestTimeoutProof", testTimeoutProof)
t.Run("TestHandshake", testHandshake)
t.Run("TestFailedHandshake", testFailedHandshake)
t.Run("TestGetTaskBlocked", testGetTaskBlocked)
t.Run("TestOutdatedProverVersion", testOutdatedProverVersion)
t.Run("TestValidProof", testValidProof)
t.Run("TestInvalidProof", testInvalidProof)
t.Run("TestProofGeneratedFailed", testProofGeneratedFailed)
t.Run("TestTimeoutProof", testTimeoutProof)
t.Run("TestHardFork", testHardForkAssignTask)
}
@@ -498,106 +477,6 @@ func testHardForkAssignTask(t *testing.T) {
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
coordinatorURL := randomURL()
collector, httpHandler := setupCoordinator(t, 3, coordinatorURL, tt.forkNumbers)
defer func() {
collector.Stop()
assert.NoError(t, httpHandler.Shutdown(context.Background()))
}()
chunkProof := &message.ChunkProof{
StorageTrace: []byte("testStorageTrace"),
Protocol: []byte("testProtocol"),
Proof: []byte("testProof"),
Instances: []byte("testInstance"),
Vk: []byte("testVk"),
ChunkInfo: nil,
}
// the insert block number is 2 and 3
// chunk1 batch1 contains block number 2
// chunk2 batch2 contains block number 3
err := l2BlockOrm.InsertL2Blocks(context.Background(), []*encoding.Block{block1, block2})
assert.NoError(t, err)
dbHardForkChunk1, err := chunkOrm.InsertChunk(context.Background(), hardForkChunk1)
assert.NoError(t, err)
err = l2BlockOrm.UpdateChunkHashInRange(context.Background(), 0, 2, dbHardForkChunk1.Hash)
assert.NoError(t, err)
err = chunkOrm.UpdateProofAndProvingStatusByHash(context.Background(), dbHardForkChunk1.Hash, chunkProof, types.ProvingTaskUnassigned, 1)
assert.NoError(t, err)
dbHardForkBatch1, err := batchOrm.InsertBatch(context.Background(), hardForkBatch1)
assert.NoError(t, err)
err = chunkOrm.UpdateBatchHashInRange(context.Background(), 0, 0, dbHardForkBatch1.Hash)
assert.NoError(t, err)
err = batchOrm.UpdateChunkProofsStatusByBatchHash(context.Background(), dbHardForkBatch1.Hash, types.ChunkProofsStatusReady)
assert.NoError(t, err)
dbHardForkChunk2, err := chunkOrm.InsertChunk(context.Background(), hardForkChunk2)
assert.NoError(t, err)
err = l2BlockOrm.UpdateChunkHashInRange(context.Background(), 3, 100, dbHardForkChunk2.Hash)
assert.NoError(t, err)
err = chunkOrm.UpdateProofAndProvingStatusByHash(context.Background(), dbHardForkChunk2.Hash, chunkProof, types.ProvingTaskUnassigned, 1)
assert.NoError(t, err)
dbHardForkBatch2, err := batchOrm.InsertBatch(context.Background(), hardForkBatch2)
assert.NoError(t, err)
err = chunkOrm.UpdateBatchHashInRange(context.Background(), 1, 1, dbHardForkBatch2.Hash)
assert.NoError(t, err)
err = batchOrm.UpdateChunkProofsStatusByBatchHash(context.Background(), dbHardForkBatch2.Hash, types.ChunkProofsStatusReady)
assert.NoError(t, err)
fmt.Println("data inserted")
time.Sleep(50 * time.Second)
getTaskNumber := 0
for i := 0; i < 2; i++ {
mockProver := newMockProver(t, fmt.Sprintf("mock_prover_%d", i), coordinatorURL, tt.proofType, version.Version)
proverTask, errCode, errMsg := mockProver.getProverTask(t, tt.proofType, tt.proverForkNames[i])
assert.Equal(t, tt.exceptGetTaskErrCodes[i], errCode)
assert.Equal(t, tt.exceptGetTaskErrMsgs[i], errMsg)
if errCode != types.Success {
continue
}
getTaskNumber++
mockProver.submitProof(t, proverTask, verifiedSuccess, types.Success, tt.proverForkNames[i])
}
assert.Equal(t, getTaskNumber, tt.exceptTaskNumber)
})
}
}
func testHardForkAssignTaskMultiCircuits(t *testing.T) {
tests := []struct {
name string
proofType message.ProofType
forkNumbers map[string]int64
proverForkNames []string
exceptTaskNumber int
exceptGetTaskErrCodes []int
exceptGetTaskErrMsgs []string
}{
{ // hard fork 4, prover 4 block [2-3]
name: "noTaskForkChunkProverVersionLargeOrEqualThanHardFork",
proofType: message.ProofTypeChunk,
forkNumbers: map[string]int64{"bernoulli": forkNumberFour},
exceptTaskNumber: 0,
proverForkNames: []string{"bernoulli", "bernoulli"},
exceptGetTaskErrCodes: []int{types.ErrCoordinatorEmptyProofData, types.ErrCoordinatorEmptyProofData},
exceptGetTaskErrMsgs: []string{"get empty prover task", "get empty prover task"},
},
{
name: "noTaskForkBatchProverVersionLargeOrEqualThanHardFork",
proofType: message.ProofTypeBatch,
forkNumbers: map[string]int64{"bernoulli": forkNumberFour},
exceptTaskNumber: 0,
proverForkNames: []string{"bernoulli", "bernoulli"},
exceptGetTaskErrCodes: []int{types.ErrCoordinatorEmptyProofData, types.ErrCoordinatorEmptyProofData},
exceptGetTaskErrMsgs: []string{"get empty prover task", "get empty prover task"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
coordinatorURL := randomURL()

View File

@@ -1,112 +0,0 @@
package main
import (
"context"
"errors"
"math/big"
"net/http"
"scroll-tech/common/database"
"scroll-tech/common/version"
"scroll-tech/coordinator/internal/config"
"scroll-tech/coordinator/internal/controller/api"
"scroll-tech/coordinator/internal/controller/cron"
"scroll-tech/coordinator/internal/route"
"time"
"github.com/gin-gonic/gin"
"github.com/scroll-tech/go-ethereum/params"
"gorm.io/gorm"
)
// GetGormDBClient returns a gorm.DB by connecting to the running postgres container
func GetGormDBClient() (*gorm.DB, error) {
// endpoint, err := t.GetDBEndPoint()
// if err != nil {
// return nil, err
// }
endpoint := "postgres://lmr:@localhost:5432/unittest?sslmode=disable"
dbCfg := &database.Config{
DSN: endpoint,
DriverName: "postgres",
MaxOpenNum: 200,
MaxIdleNum: 20,
}
return database.InitDB(dbCfg)
}
func setupCoordinator(proversPerSession uint8, coordinatorURL string, nameForkMap map[string]int64) (*cron.Collector, *http.Server) {
db, err := GetGormDBClient()
if err != nil {
panic(err.Error())
}
tokenTimeout := 6
conf := &config.Config{
L2: &config.L2{
ChainID: 111,
},
ProverManager: &config.ProverManager{
ProversPerSession: proversPerSession,
Verifier: &config.VerifierConfig{
MockMode: true,
},
BatchCollectionTimeSec: 10,
ChunkCollectionTimeSec: 10,
MaxVerifierWorkers: 10,
SessionAttempts: 5,
MinProverVersion: version.Version,
},
Auth: &config.Auth{
ChallengeExpireDurationSec: tokenTimeout,
LoginExpireDurationSec: tokenTimeout,
},
}
var chainConf params.ChainConfig
for forkName, forkNumber := range nameForkMap {
switch forkName {
case "shanghai":
chainConf.ShanghaiBlock = big.NewInt(forkNumber)
case "bernoulli":
chainConf.BernoulliBlock = big.NewInt(forkNumber)
case "london":
chainConf.LondonBlock = big.NewInt(forkNumber)
case "istanbul":
chainConf.IstanbulBlock = big.NewInt(forkNumber)
case "homestead":
chainConf.HomesteadBlock = big.NewInt(forkNumber)
case "eip155":
chainConf.EIP155Block = big.NewInt(forkNumber)
}
}
proofCollector := cron.NewCollector(context.Background(), db, conf, nil)
router := gin.New()
api.InitController(conf, &chainConf, db, nil)
route.Route(router, conf, nil)
srv := &http.Server{
Addr: coordinatorURL,
Handler: router,
}
go func() {
runErr := srv.ListenAndServe()
if runErr != nil && !errors.Is(runErr, http.ErrServerClosed) {
panic(runErr.Error())
}
}()
time.Sleep(time.Second * 2)
return proofCollector, srv
}
func main() {
coordinatorURL := ":9091"
nameForkMap := map[string]int64{"london": 2,
"istanbul": 3,
"bernoulli": 4}
setupCoordinator(1, coordinatorURL, nameForkMap)
var c = make(chan struct{}, 1)
_ = <-c
}

View File

@@ -32,7 +32,7 @@ func setupL1RelayerDB(t *testing.T) *gorm.DB {
return db
}
// testCreateNewRelayer test create new relayer instance and stop
// testCreateNewL1Relayer test create new relayer instance and stop
func testCreateNewL1Relayer(t *testing.T) {
db := setupL1RelayerDB(t)
defer database.CloseDB(db)