mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-12 23:48:15 -05:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c44b7f7bf4 | ||
|
|
a8c71b5e36 | ||
|
|
ae2f62df00 | ||
|
|
ce5c6e0aa3 | ||
|
|
e8ddf99184 |
@@ -226,7 +226,7 @@ func (r *Layer1Relayer) handleConfirmLoop(ctx context.Context) {
|
||||
log.Info("transaction confirmed in layer2", "confirmation", cfm)
|
||||
}
|
||||
case cfm := <-r.gasOracleSender.ConfirmChan():
|
||||
r.metrics.bridgeL1MsgsRelayedConfirmedTotal.Inc()
|
||||
r.metrics.bridgeL1GasOraclerConfirmedTotal.Inc()
|
||||
if !cfm.IsSuccessful {
|
||||
// @discuss: maybe make it pending again?
|
||||
err := r.l1BlockOrm.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String())
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
var tag = "v4.1.98"
|
||||
var tag = "v4.1.103"
|
||||
|
||||
var commit = func() string {
|
||||
if info, ok := debug.ReadBuildInfo(); ok {
|
||||
@@ -78,7 +78,7 @@ func CheckScrollProverVersionTag(proverVersion string) bool {
|
||||
if remoteTagMinor != 1 {
|
||||
return false
|
||||
}
|
||||
if remoteTagPatch < 96 {
|
||||
if remoteTagPatch < 98 {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
|
||||
@@ -29,10 +29,11 @@ type Collector struct {
|
||||
chunkOrm *orm.Chunk
|
||||
batchOrm *orm.Batch
|
||||
|
||||
timeoutBatchCheckerRunTotal prometheus.Counter
|
||||
batchProverTaskTimeoutTotal prometheus.Counter
|
||||
timeoutChunkCheckerRunTotal prometheus.Counter
|
||||
chunkProverTaskTimeoutTotal prometheus.Counter
|
||||
timeoutBatchCheckerRunTotal prometheus.Counter
|
||||
batchProverTaskTimeoutTotal prometheus.Counter
|
||||
timeoutChunkCheckerRunTotal prometheus.Counter
|
||||
chunkProverTaskTimeoutTotal prometheus.Counter
|
||||
checkBatchAllChunkReadyRunTotal prometheus.Counter
|
||||
}
|
||||
|
||||
// NewCollector create a collector to cron collect the data to send to prover
|
||||
@@ -62,10 +63,15 @@ func NewCollector(ctx context.Context, db *gorm.DB, cfg *config.Config, reg prom
|
||||
Name: "coordinator_chunk_prover_task_timeout_total",
|
||||
Help: "Total number of chunk timeout prover task.",
|
||||
}),
|
||||
checkBatchAllChunkReadyRunTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
|
||||
Name: "coordinator_check_batch_all_chunk_ready_run_total",
|
||||
Help: "Total number of check batch all chunks ready total",
|
||||
}),
|
||||
}
|
||||
|
||||
go c.timeoutBatchProofTask()
|
||||
go c.timeoutChunkProofTask()
|
||||
go c.checkBatchAllChunkReady()
|
||||
|
||||
log.Info("Start coordinator successfully.")
|
||||
|
||||
@@ -79,7 +85,6 @@ func (c *Collector) Stop() {
|
||||
|
||||
// timeoutTask cron check the send task is timeout. if timeout reached, restore the
|
||||
// chunk/batch task to unassigned. then the batch/chunk collector can retry it.
|
||||
|
||||
func (c *Collector) timeoutBatchProofTask() {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
@@ -189,3 +194,60 @@ func (c *Collector) check(assignedProverTasks []orm.ProverTask, timeout promethe
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Collector) checkBatchAllChunkReady() {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
nerr := fmt.Errorf("check batch all chunk ready panic error:%v", err)
|
||||
log.Warn(nerr.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
ticker := time.NewTicker(time.Second * 10)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
c.checkBatchAllChunkReadyRunTotal.Inc()
|
||||
page := 1
|
||||
pageSize := 50
|
||||
for {
|
||||
offset := (page - 1) * pageSize
|
||||
batches, err := c.batchOrm.GetUnassignedAndChunksUnreadyBatches(c.ctx, offset, pageSize)
|
||||
if err != nil {
|
||||
log.Warn("checkBatchAllChunkReady GetUnassignedAndChunksUnreadyBatches", "error", err)
|
||||
break
|
||||
}
|
||||
|
||||
for _, batch := range batches {
|
||||
allReady, checkErr := c.chunkOrm.CheckIfBatchChunkProofsAreReady(c.ctx, batch.Hash)
|
||||
if checkErr != nil {
|
||||
log.Warn("checkBatchAllChunkReady CheckIfBatchChunkProofsAreReady failure", "error", checkErr, "hash", batch.Hash)
|
||||
continue
|
||||
}
|
||||
|
||||
if !allReady {
|
||||
continue
|
||||
}
|
||||
|
||||
if updateErr := c.batchOrm.UpdateChunkProofsStatusByBatchHash(c.ctx, batch.Hash, types.ChunkProofsStatusReady); updateErr != nil {
|
||||
log.Warn("checkBatchAllChunkReady UpdateChunkProofsStatusByBatchHash failure", "error", checkErr, "hash", batch.Hash)
|
||||
}
|
||||
}
|
||||
|
||||
if len(batches) < pageSize {
|
||||
break
|
||||
}
|
||||
page++
|
||||
}
|
||||
|
||||
case <-c.ctx.Done():
|
||||
if c.ctx.Err() != nil {
|
||||
log.Error("manager context canceled with error", "error", c.ctx.Err())
|
||||
}
|
||||
return
|
||||
case <-c.stopTimeoutChan:
|
||||
log.Info("the coordinator run loop exit")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -377,7 +377,7 @@ func (m *ProofReceiverLogic) processProverErr(ctx context.Context, taskID, pk st
|
||||
log.Error("update prover task proving status failure", "taskID", taskID, "proverPublicKey", pk, "taskType", taskType, "error", updateErr)
|
||||
}
|
||||
|
||||
proverTasks, err := m.proverTaskOrm.GetValidOrAssignedTaskOfOtherProvers(ctx, taskType, taskID, pk)
|
||||
proverTasks, err := m.proverTaskOrm.GetAssignedTaskOfOtherProvers(ctx, taskType, taskID, pk)
|
||||
if err != nil {
|
||||
log.Warn("checkIsAssignedToOtherProver failure", "taskID", taskID, "proverPublicKey", pk, "taskType", taskType, "error", err)
|
||||
return
|
||||
|
||||
@@ -91,6 +91,26 @@ func (o *Batch) GetUnassignedBatches(ctx context.Context, limit int) ([]*Batch,
|
||||
return batches, nil
|
||||
}
|
||||
|
||||
// GetUnassignedAndChunksUnreadyBatches get the batches which is unassigned and chunks is not ready
|
||||
func (o *Batch) GetUnassignedAndChunksUnreadyBatches(ctx context.Context, offset, limit int) ([]*Batch, error) {
|
||||
if offset < 0 || limit < 0 {
|
||||
return nil, errors.New("limit and offset must not be smaller than 0")
|
||||
}
|
||||
|
||||
db := o.db.WithContext(ctx)
|
||||
db = db.Where("proving_status = ?", types.ProvingTaskUnassigned)
|
||||
db = db.Where("chunk_proofs_status = ?", types.ChunkProofsStatusPending)
|
||||
db = db.Order("index ASC")
|
||||
db = db.Offset(offset)
|
||||
db = db.Limit(limit)
|
||||
|
||||
var batches []*Batch
|
||||
if err := db.Find(&batches).Error; err != nil {
|
||||
return nil, fmt.Errorf("Batch.GetUnassignedAndChunksUnreadyBatches error: %w", err)
|
||||
}
|
||||
return batches, nil
|
||||
}
|
||||
|
||||
// GetAssignedBatches retrieves all batches whose proving_status is either types.ProvingTaskAssigned.
|
||||
func (o *Batch) GetAssignedBatches(ctx context.Context) ([]*Batch, error) {
|
||||
db := o.db.WithContext(ctx)
|
||||
|
||||
@@ -131,14 +131,14 @@ func (o *ProverTask) GetProverTaskByTaskIDAndProver(ctx context.Context, taskTyp
|
||||
return &proverTask, nil
|
||||
}
|
||||
|
||||
// GetValidOrAssignedTaskOfOtherProvers get the chunk/batch task assigned other provers
|
||||
func (o *ProverTask) GetValidOrAssignedTaskOfOtherProvers(ctx context.Context, taskType message.ProofType, taskID, proverPublicKey string) ([]ProverTask, error) {
|
||||
// GetAssignedTaskOfOtherProvers get the chunk/batch task assigned other provers
|
||||
func (o *ProverTask) GetAssignedTaskOfOtherProvers(ctx context.Context, taskType message.ProofType, taskID, proverPublicKey string) ([]ProverTask, error) {
|
||||
db := o.db.WithContext(ctx)
|
||||
db = db.Model(&ProverTask{})
|
||||
db = db.Where("task_type", int(taskType))
|
||||
db = db.Where("task_id", taskID)
|
||||
db = db.Where("prover_public_key != ?", proverPublicKey)
|
||||
db = db.Where("proving_status in (?)", []int{int(types.ProverAssigned), int(types.ProverProofValid)})
|
||||
db = db.Where("proving_status = ?", int(types.ProverAssigned))
|
||||
|
||||
var proverTasks []ProverTask
|
||||
if err := db.Find(&proverTasks).Error; err != nil {
|
||||
|
||||
@@ -110,7 +110,7 @@ func setupCoordinator(t *testing.T, proversPerSession uint8, coordinatorURL stri
|
||||
}
|
||||
|
||||
func setEnv(t *testing.T) {
|
||||
version.Version = "v4.1.97-aaa-bbb-ccc"
|
||||
version.Version = "v4.1.98-aaa-bbb-ccc"
|
||||
|
||||
base = docker.NewDockerApp()
|
||||
base.RunDBImage(t)
|
||||
|
||||
Reference in New Issue
Block a user