Compare commits

..

8 Commits

Author SHA1 Message Date
HAOYUatHZ
2cccdf7003 Merge branch 'develop' into env1-sampling 2024-04-13 13:10:55 +08:00
HAOYUatHZ
6cc1f424d7 revert hongfan 2024-04-13 09:24:45 +08:00
HAOYUatHZ
af8437aa9d Merge branch 'develop' into env1-sampling 2024-04-12 11:54:20 +08:00
HAOYUatHZ
133547c91c Merge branch 'develop' into env1-sampling 2024-04-11 19:45:51 +08:00
HAOYUatHZ
fcbf83bf3d fix 2024-04-09 09:29:42 +08:00
HAOYUatHZ
330fde44a9 fix 2024-04-09 08:59:04 +08:00
HAOYUatHZ
122cffe489 implement 2024-04-08 22:24:58 +08:00
HAOYUatHZ
b4dac7ab82 init config 2024-04-08 22:10:53 +08:00
14 changed files with 106 additions and 86 deletions

View File

@@ -46,7 +46,6 @@ jobs:
with:
context: .
file: ./build/dockerfiles/event_watcher.Dockerfile
platforms: linux/amd64,linux/arm64
push: true
tags: |
${{ secrets.DOCKERHUB_USERNAME }}/${{ env.REPOSITORY }}:${{ env.IMAGE_TAG }}
@@ -91,7 +90,6 @@ jobs:
with:
context: .
file: ./build/dockerfiles/gas_oracle.Dockerfile
platforms: linux/amd64,linux/arm64
push: true
tags: |
${{ secrets.DOCKERHUB_USERNAME }}/${{ env.REPOSITORY }}:${{ env.IMAGE_TAG }}
@@ -136,7 +134,6 @@ jobs:
with:
context: .
file: ./build/dockerfiles/rollup_relayer.Dockerfile
platforms: linux/amd64,linux/arm64
push: true
tags: |
${{ secrets.DOCKERHUB_USERNAME }}/${{ env.REPOSITORY }}:${{ env.IMAGE_TAG }}
@@ -181,7 +178,6 @@ jobs:
with:
context: .
file: ./build/dockerfiles/bridgehistoryapi-fetcher.Dockerfile
platforms: linux/amd64,linux/arm64
push: true
tags: |
${{ secrets.DOCKERHUB_USERNAME }}/${{ env.REPOSITORY }}:${{ env.IMAGE_TAG }}
@@ -226,7 +222,6 @@ jobs:
with:
context: .
file: ./build/dockerfiles/bridgehistoryapi-api.Dockerfile
platforms: linux/amd64,linux/arm64
push: true
tags: |
${{ secrets.DOCKERHUB_USERNAME }}/${{ env.REPOSITORY }}:${{ env.IMAGE_TAG }}
@@ -271,7 +266,6 @@ jobs:
with:
context: .
file: ./build/dockerfiles/coordinator-api.Dockerfile
platforms: linux/amd64,linux/arm64
push: true
tags: |
${{ secrets.DOCKERHUB_USERNAME }}/${{ env.REPOSITORY }}:${{ env.IMAGE_TAG }}
@@ -316,7 +310,6 @@ jobs:
with:
context: .
file: ./build/dockerfiles/coordinator-cron.Dockerfile
platforms: linux/amd64,linux/arm64
push: true
tags: |
${{ secrets.DOCKERHUB_USERNAME }}/${{ env.REPOSITORY }}:${{ env.IMAGE_TAG }}

View File

@@ -61,11 +61,8 @@ func (t *TestcontainerApps) StartL1GethContainer() error {
req := testcontainers.ContainerRequest{
Image: "scroll_l1geth",
ExposedPorts: []string{"8546/tcp", "8545/tcp"},
WaitingFor: wait.ForAll(
wait.ForListeningPort("8546").WithStartupTimeout(100*time.Second),
wait.ForListeningPort("8545").WithStartupTimeout(100*time.Second),
),
Cmd: []string{"--log.debug", "ANY"},
WaitingFor: wait.ForHTTP("/").WithPort("8545").WithStartupTimeout(100 * time.Second),
Cmd: []string{"--log.debug", "ANY"},
}
genericContainerReq := testcontainers.GenericContainerRequest{
ContainerRequest: req,
@@ -88,10 +85,7 @@ func (t *TestcontainerApps) StartL2GethContainer() error {
req := testcontainers.ContainerRequest{
Image: "scroll_l2geth",
ExposedPorts: []string{"8546/tcp", "8545/tcp"},
WaitingFor: wait.ForAll(
wait.ForListeningPort("8546").WithStartupTimeout(100*time.Second),
wait.ForListeningPort("8545").WithStartupTimeout(100*time.Second),
),
WaitingFor: wait.ForHTTP("/").WithPort("8545").WithStartupTimeout(100 * time.Second),
}
genericContainerReq := testcontainers.GenericContainerRequest{
ContainerRequest: req,

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "v4.3.94"
var tag = "v4.3.92"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -38,7 +38,7 @@ make lint
## Configure
The coordinator behavior can be configured using [`conf/config.json`](conf/config.json). Check the code comments under `ProverManager` in [`internal/config/config.go`](internal/config/config.go) for more details.
The coordinator behavior can be configured using [`config.json`](config.json). Check the code comments under `ProverManager` in [`config/config.go`](config/config.go) for more details.
## Start

View File

@@ -30,8 +30,8 @@ func (c *Collector) cleanupChallenge() {
log.Error("manager context canceled with error", "error", c.ctx.Err())
}
return
case <-c.stopCleanChallengeChan:
log.Info("the coordinator cleanupChallenge run loop exit")
case <-c.stopTimeoutChan:
log.Info("the coordinator run loop exit")
return
}
}

View File

@@ -23,10 +23,7 @@ type Collector struct {
db *gorm.DB
ctx context.Context
stopChunkTimeoutChan chan struct{}
stopBatchTimeoutChan chan struct{}
stopBatchAllChunkReadyChan chan struct{}
stopCleanChallengeChan chan struct{}
stopTimeoutChan chan struct{}
proverTaskOrm *orm.ProverTask
chunkOrm *orm.Chunk
@@ -43,17 +40,14 @@ type Collector struct {
// NewCollector create a collector to cron collect the data to send to prover
func NewCollector(ctx context.Context, db *gorm.DB, cfg *config.Config, reg prometheus.Registerer) *Collector {
c := &Collector{
cfg: cfg,
db: db,
ctx: ctx,
stopChunkTimeoutChan: make(chan struct{}),
stopBatchTimeoutChan: make(chan struct{}),
stopBatchAllChunkReadyChan: make(chan struct{}),
stopCleanChallengeChan: make(chan struct{}),
proverTaskOrm: orm.NewProverTask(db),
chunkOrm: orm.NewChunk(db),
batchOrm: orm.NewBatch(db),
challenge: orm.NewChallenge(db),
cfg: cfg,
db: db,
ctx: ctx,
stopTimeoutChan: make(chan struct{}),
proverTaskOrm: orm.NewProverTask(db),
chunkOrm: orm.NewChunk(db),
batchOrm: orm.NewBatch(db),
challenge: orm.NewChallenge(db),
timeoutBatchCheckerRunTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "coordinator_batch_timeout_checker_run_total",
@@ -89,10 +83,7 @@ func NewCollector(ctx context.Context, db *gorm.DB, cfg *config.Config, reg prom
// Stop all the collector
func (c *Collector) Stop() {
c.stopChunkTimeoutChan <- struct{}{}
c.stopBatchTimeoutChan <- struct{}{}
c.stopBatchAllChunkReadyChan <- struct{}{}
c.stopCleanChallengeChan <- struct{}{}
c.stopTimeoutChan <- struct{}{}
}
// timeoutTask cron check the send task is timeout. if timeout reached, restore the
@@ -122,8 +113,8 @@ func (c *Collector) timeoutBatchProofTask() {
log.Error("manager context canceled with error", "error", c.ctx.Err())
}
return
case <-c.stopBatchTimeoutChan:
log.Info("the coordinator timeoutBatchProofTask run loop exit")
case <-c.stopTimeoutChan:
log.Info("the coordinator run loop exit")
return
}
}
@@ -155,8 +146,8 @@ func (c *Collector) timeoutChunkProofTask() {
log.Error("manager context canceled with error", "error", c.ctx.Err())
}
return
case <-c.stopChunkTimeoutChan:
log.Info("the coordinator timeoutChunkProofTask run loop exit")
case <-c.stopTimeoutChan:
log.Info("the coordinator run loop exit")
return
}
}
@@ -262,8 +253,8 @@ func (c *Collector) checkBatchAllChunkReady() {
log.Error("manager context canceled with error", "error", c.ctx.Err())
}
return
case <-c.stopBatchAllChunkReadyChan:
log.Info("the coordinator checkBatchAllChunkReady run loop exit")
case <-c.stopTimeoutChan:
log.Info("the coordinator run loop exit")
return
}
}

View File

@@ -59,20 +59,20 @@ func testResetDB(t *testing.T) {
cur, err := Current(pgDB)
assert.NoError(t, err)
// total number of tables.
assert.Equal(t, int64(18), cur)
assert.Equal(t, int64(17), cur)
}
func testMigrate(t *testing.T) {
assert.NoError(t, Migrate(pgDB))
cur, err := Current(pgDB)
assert.NoError(t, err)
assert.Equal(t, int64(18), cur)
assert.Equal(t, int64(17), cur)
}
func testRollback(t *testing.T) {
version, err := Current(pgDB)
assert.NoError(t, err)
assert.Equal(t, int64(18), version)
assert.Equal(t, int64(17), version)
assert.NoError(t, Rollback(pgDB, nil))

View File

@@ -1,18 +0,0 @@
-- +goose Up
-- +goose StatementBegin
create index if not exists idx_prover_task_created_at on prover_task(created_at) where deleted_at IS NULL;
create index if not exists idx_prover_task_task_id on prover_task(task_id) where deleted_at IS NULL;
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
drop index if exists idx_prover_task_created_at;
drop index if exists idx_prover_task_task_id;
-- +goose StatementEnd

View File

@@ -24,7 +24,7 @@ rollup_relayer: ## Builds the rollup_relayer bin
test:
go test -v -race -coverprofile=coverage.txt -covermode=atomic -p 1 $(PWD)/...
lint: mock_abi ## Lint the files - used for CI
lint: ## Lint the files - used for CI
GOBIN=$(PWD)/build/bin go run ../build/lint.go
clean: ## Empty out the bin folder

View File

@@ -42,4 +42,7 @@ type BatchProposerConfig struct {
MaxL1CommitCalldataSizePerBatch uint64 `json:"max_l1_commit_calldata_size_per_batch"`
BatchTimeoutSec uint64 `json:"batch_timeout_sec"`
GasCostIncreaseMultiplier float64 `json:"gas_cost_increase_multiplier"`
EnableTestEnvSamplingFeature bool `json:"enable_test_env_sampling_feature,omitempty"`
SamplingPercentage uint64 `json:"sampling_percentage,omitempty"`
}

View File

@@ -64,6 +64,9 @@ type RelayerConfig struct {
EnableTestEnvBypassFeatures bool `json:"enable_test_env_bypass_features"`
// The timeout in seconds for finalizing a batch without proof, only used when EnableTestEnvBypassFeatures is true.
FinalizeBatchWithoutProofTimeoutSec uint64 `json:"finalize_batch_without_proof_timeout_sec"`
EnableTestEnvSamplingFeature bool `json:"enable_test_env_sampling_feature,omitempty"`
SamplingPercentage uint64 `json:"sampling_percentage,omitempty"`
}
// GasOracleConfig The config for updating gas price oracle.
@@ -128,6 +131,10 @@ func (r *RelayerConfig) UnmarshalJSON(input []byte) error {
return fmt.Errorf("error converting and checking finalize sender private key: %w", err)
}
if r.SamplingPercentage == 0 {
r.SamplingPercentage = 100
}
return nil
}

View File

@@ -464,8 +464,9 @@ func (r *Layer2Relayer) ProcessCommittedBatches() {
case types.ProvingTaskVerified:
log.Info("Start to roll up zk proof", "hash", batch.Hash)
r.metrics.rollupL2RelayerProcessCommittedBatchesFinalizedTotal.Inc()
if err := r.finalizeBatch(batch, true); err != nil {
log.Error("Failed to finalize batch with proof", "index", batch.Index, "hash", batch.Hash, "err", err)
skipProof := r.cfg.EnableTestEnvSamplingFeature && ((batch.Index % 100) >= r.cfg.SamplingPercentage)
if err := r.finalizeBatch(batch, !skipProof); err != nil {
log.Error("Failed to finalize batch", "index", batch.Index, "hash", batch.Hash, "withProof", !skipProof, "err", err)
}
case types.ProvingTaskFailed:
@@ -578,7 +579,7 @@ func (r *Layer2Relayer) finalizeBatch(dbBatch *orm.Batch, withProof bool) error
return err
}
log.Info("finalizeBatch in layer1", "with proof", withProof, "index", dbBatch.Index, "batch hash", dbBatch.Hash, "tx hash", txHash.String())
log.Info("finalizeBatch in layer1", "with proof", withProof, "index", dbBatch.Index, "batch hash", dbBatch.Hash, "tx hash", txHash)
// record and sync with db, @todo handle db error
if err := r.batchOrm.UpdateFinalizeTxHashAndRollupStatus(r.ctx, dbBatch.Hash, txHash.String(), types.RollupFinalizing); err != nil {
@@ -586,22 +587,22 @@ func (r *Layer2Relayer) finalizeBatch(dbBatch *orm.Batch, withProof bool) error
return err
}
// Updating the proving status when finalizing without proof, thus the coordinator could omit this task.
// it isn't a necessary step, so don't put in a transaction with UpdateFinalizeTxHashAndRollupStatus
if !withProof {
txErr := r.db.Transaction(func(tx *gorm.DB) error {
if updateErr := r.batchOrm.UpdateProvingStatus(r.ctx, dbBatch.Hash, types.ProvingTaskVerified); updateErr != nil {
return updateErr
}
if updateErr := r.chunkOrm.UpdateProvingStatusByBatchHash(r.ctx, dbBatch.Hash, types.ProvingTaskVerified); updateErr != nil {
return updateErr
}
return nil
})
if txErr != nil {
log.Error("Updating chunk and batch proving status when finalizing without proof failure", "batchHash", dbBatch.Hash, "err", txErr)
}
}
// // Updating the proving status when finalizing without proof, thus the coordinator could omit this task.
// // it isn't a necessary step, so don't put in a transaction with UpdateFinalizeTxHashAndRollupStatus
// if !withProof {
// txErr := r.db.Transaction(func(tx *gorm.DB) error {
// if updateErr := r.batchOrm.UpdateProvingStatus(r.ctx, dbBatch.Hash, types.ProvingTaskVerified); updateErr != nil {
// return updateErr
// }
// if updateErr := r.chunkOrm.UpdateProvingStatusByBatchHash(r.ctx, dbBatch.Hash, types.ProvingTaskVerified); updateErr != nil {
// return updateErr
// }
// return nil
// })
// if txErr != nil {
// log.Error("Updating chunk and batch proving status when finalizing without proof failure", "batchHash", dbBatch.Hash, "err", txErr)
// }
// }
r.metrics.rollupL2RelayerProcessCommittedBatchesFinalizedSuccessTotal.Inc()
return nil

View File

@@ -14,6 +14,7 @@ import (
"gorm.io/gorm"
"scroll-tech/common/forks"
"scroll-tech/common/types"
"scroll-tech/common/types/encoding"
"scroll-tech/rollup/internal/config"
@@ -37,6 +38,7 @@ type BatchProposer struct {
gasCostIncreaseMultiplier float64
forkMap map[uint64]bool
cfg *config.BatchProposerConfig
chainCfg *params.ChainConfig
batchProposerCircleTotal prometheus.Counter
@@ -74,6 +76,7 @@ func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, chai
batchTimeoutSec: cfg.BatchTimeoutSec,
gasCostIncreaseMultiplier: cfg.GasCostIncreaseMultiplier,
forkMap: forkMap,
cfg: cfg,
chainCfg: chainCfg,
batchProposerCircleTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
@@ -144,6 +147,27 @@ func (p *BatchProposer) updateDBBatchInfo(batch *encoding.Batch, codecVersion en
log.Warn("BatchProposer.UpdateBatchHashInRange update the chunk's batch hash failure", "hash", batch.Hash, "error", dbErr)
return dbErr
}
skipProof := false
if p.cfg.EnableTestEnvSamplingFeature && ((batch.Index % 100) >= p.cfg.SamplingPercentage) {
skipProof = true
}
if skipProof {
dbErr = p.batchOrm.UpdateProvingStatus(p.ctx, batch.Hash, types.ProvingTaskVerified, dbTX)
if dbErr != nil {
log.Warn("BatchProposer.updateBatchInfoInDB update batch proving_status failure",
"batch hash", batch.Hash, "error", dbErr)
return dbErr
}
dbErr = p.chunkOrm.UpdateProvingStatusInRange(p.ctx, batch.StartChunkIndex, batch.EndChunkIndex, types.ProvingTaskVerified, dbTX)
if dbErr != nil {
log.Warn("BatchProposer.updateBatchInfoInDB update chunk proving_status failure",
"start chunk index", batch.StartChunkIndex, "end chunk index", batch.EndChunkIndex,
"batch hash", batch.Hash, "error", dbErr)
return dbErr
}
}
return nil
})
if err != nil {

View File

@@ -306,3 +306,28 @@ func (o *Chunk) UpdateBatchHashInRange(ctx context.Context, startIndex uint64, e
}
return nil
}
func (o *Chunk) UpdateProvingStatusInRange(ctx context.Context, startIndex uint64, endIndex uint64, status types.ProvingStatus, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("index >= ? AND index <= ?", startIndex, endIndex)
updateFields := make(map[string]interface{})
updateFields["proving_status"] = int(status)
switch status {
case types.ProvingTaskAssigned:
updateFields["prover_assigned_at"] = time.Now()
case types.ProvingTaskUnassigned:
updateFields["prover_assigned_at"] = nil
case types.ProvingTaskVerified:
updateFields["proved_at"] = time.Now()
}
if err := db.Updates(updateFields).Error; err != nil {
return fmt.Errorf("Chunk.UpdateProvingStatusInRange error: %w, start index: %v, end index: %v, status: %v", err, startIndex, endIndex, status.String())
}
return nil
}