|
|
|
|
@@ -29,8 +29,10 @@ type Collector struct {
|
|
|
|
|
chunkOrm *orm.Chunk
|
|
|
|
|
batchOrm *orm.Batch
|
|
|
|
|
|
|
|
|
|
timeoutCheckerRunTotal prometheus.Counter
|
|
|
|
|
proverTaskTimeoutTotal prometheus.Counter
|
|
|
|
|
timeoutBatchCheckerRunTotal prometheus.Counter
|
|
|
|
|
batchProverTaskTimeoutTotal prometheus.Counter
|
|
|
|
|
timeoutChunkCheckerRunTotal prometheus.Counter
|
|
|
|
|
chunkProverTaskTimeoutTotal prometheus.Counter
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewCollector create a collector to cron collect the data to send to prover
|
|
|
|
|
@@ -44,17 +46,26 @@ func NewCollector(ctx context.Context, db *gorm.DB, cfg *config.Config, reg prom
|
|
|
|
|
chunkOrm: orm.NewChunk(db),
|
|
|
|
|
batchOrm: orm.NewBatch(db),
|
|
|
|
|
|
|
|
|
|
timeoutCheckerRunTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
|
|
|
|
|
Name: "coordinator_timeout_checker_run_total",
|
|
|
|
|
Help: "Total number of timeout checker run.",
|
|
|
|
|
timeoutBatchCheckerRunTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
|
|
|
|
|
Name: "coordinator_batch_timeout_checker_run_total",
|
|
|
|
|
Help: "Total number of batch timeout checker run.",
|
|
|
|
|
}),
|
|
|
|
|
proverTaskTimeoutTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
|
|
|
|
|
Name: "coordinator_prover_task_timeout_total",
|
|
|
|
|
Help: "Total number of timeout prover task.",
|
|
|
|
|
batchProverTaskTimeoutTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
|
|
|
|
|
Name: "coordinator_batch_prover_task_timeout_total",
|
|
|
|
|
Help: "Total number of batch timeout prover task.",
|
|
|
|
|
}),
|
|
|
|
|
timeoutChunkCheckerRunTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
|
|
|
|
|
Name: "coordinator_chunk_timeout_checker_run_total",
|
|
|
|
|
Help: "Total number of chunk timeout checker run.",
|
|
|
|
|
}),
|
|
|
|
|
chunkProverTaskTimeoutTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
|
|
|
|
|
Name: "coordinator_chunk_prover_task_timeout_total",
|
|
|
|
|
Help: "Total number of chunk timeout prover task.",
|
|
|
|
|
}),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
go c.timeoutProofTask()
|
|
|
|
|
go c.timeoutBatchProofTask()
|
|
|
|
|
go c.timeoutChunkProofTask()
|
|
|
|
|
|
|
|
|
|
log.Info("Start coordinator successfully.")
|
|
|
|
|
|
|
|
|
|
@@ -69,10 +80,10 @@ func (c *Collector) Stop() {
|
|
|
|
|
// timeoutTask cron check the send task is timeout. if timeout reached, restore the
|
|
|
|
|
// chunk/batch task to unassigned. then the batch/chunk collector can retry it.
|
|
|
|
|
|
|
|
|
|
func (c *Collector) timeoutProofTask() {
|
|
|
|
|
func (c *Collector) timeoutBatchProofTask() {
|
|
|
|
|
defer func() {
|
|
|
|
|
if err := recover(); err != nil {
|
|
|
|
|
nerr := fmt.Errorf("timeout proof task panic error:%v", err)
|
|
|
|
|
nerr := fmt.Errorf("timeout batch proof task panic error:%v", err)
|
|
|
|
|
log.Warn(nerr.Error())
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
@@ -81,52 +92,14 @@ func (c *Collector) timeoutProofTask() {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
c.timeoutCheckerRunTotal.Inc()
|
|
|
|
|
timeout := time.Duration(c.cfg.ProverManager.CollectionTimeSec) * time.Second
|
|
|
|
|
assignedProverTasks, err := c.proverTaskOrm.GetTimeoutAssignedProverTasks(c.ctx, 10, timeout)
|
|
|
|
|
c.timeoutBatchCheckerRunTotal.Inc()
|
|
|
|
|
timeout := time.Duration(c.cfg.ProverManager.BatchCollectionTimeSec) * time.Second
|
|
|
|
|
assignedProverTasks, err := c.proverTaskOrm.GetTimeoutAssignedProverTasks(c.ctx, 10, message.ProofTypeBatch, timeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error("get unassigned session info failure", "error", err)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// here not update the block batch proving status failed, because the collector loop will check
|
|
|
|
|
// the attempt times. if reach the times, the collector will set the block batch proving status.
|
|
|
|
|
for _, assignedProverTask := range assignedProverTasks {
|
|
|
|
|
c.proverTaskTimeoutTotal.Inc()
|
|
|
|
|
log.Warn("proof task have reach the timeout", "task id", assignedProverTask.TaskID,
|
|
|
|
|
"prover public key", assignedProverTask.ProverPublicKey, "prover name", assignedProverTask.ProverName, "task type", assignedProverTask.TaskType)
|
|
|
|
|
err = c.db.Transaction(func(tx *gorm.DB) error {
|
|
|
|
|
// update prover task proving status as ProverProofInvalid
|
|
|
|
|
if err = c.proverTaskOrm.UpdateProverTaskProvingStatus(c.ctx, message.ProofType(assignedProverTask.TaskType),
|
|
|
|
|
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverProofInvalid, tx); err != nil {
|
|
|
|
|
log.Error("update prover task proving status failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// update prover task failure type
|
|
|
|
|
if err = c.proverTaskOrm.UpdateProverTaskFailureType(c.ctx, message.ProofType(assignedProverTask.TaskType),
|
|
|
|
|
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverTaskFailureTypeTimeout, tx); err != nil {
|
|
|
|
|
log.Error("update prover task failure type failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// update the task to unassigned, let collector restart it
|
|
|
|
|
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeChunk {
|
|
|
|
|
if err = c.chunkOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskUnassigned, tx); err != nil {
|
|
|
|
|
log.Error("update chunk proving status to unassigned to restart it failure", "hash", assignedProverTask.TaskID, "err", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeBatch {
|
|
|
|
|
if err = c.batchOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskUnassigned, tx); err != nil {
|
|
|
|
|
log.Error("update batch proving status to unassigned to restart it failure", "hash", assignedProverTask.TaskID, "err", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error("check task proof is timeout failure", "error", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
c.check(assignedProverTasks, c.batchProverTaskTimeoutTotal)
|
|
|
|
|
case <-c.ctx.Done():
|
|
|
|
|
if c.ctx.Err() != nil {
|
|
|
|
|
log.Error("manager context canceled with error", "error", c.ctx.Err())
|
|
|
|
|
@@ -138,3 +111,77 @@ func (c *Collector) timeoutProofTask() {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Collector) timeoutChunkProofTask() {
|
|
|
|
|
defer func() {
|
|
|
|
|
if err := recover(); err != nil {
|
|
|
|
|
nerr := fmt.Errorf("timeout proof chunk task panic error:%v", err)
|
|
|
|
|
log.Warn(nerr.Error())
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
ticker := time.NewTicker(time.Second * 2)
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
c.timeoutChunkCheckerRunTotal.Inc()
|
|
|
|
|
timeout := time.Duration(c.cfg.ProverManager.ChunkCollectionTimeSec) * time.Second
|
|
|
|
|
assignedProverTasks, err := c.proverTaskOrm.GetTimeoutAssignedProverTasks(c.ctx, 10, message.ProofTypeChunk, timeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error("get unassigned session info failure", "error", err)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
c.check(assignedProverTasks, c.chunkProverTaskTimeoutTotal)
|
|
|
|
|
|
|
|
|
|
case <-c.ctx.Done():
|
|
|
|
|
if c.ctx.Err() != nil {
|
|
|
|
|
log.Error("manager context canceled with error", "error", c.ctx.Err())
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
case <-c.stopTimeoutChan:
|
|
|
|
|
log.Info("the coordinator run loop exit")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Collector) check(assignedProverTasks []orm.ProverTask, timeout prometheus.Counter) {
|
|
|
|
|
// here not update the block batch proving status failed, because the collector loop will check
|
|
|
|
|
// the attempt times. if reach the times, the collector will set the block batch proving status.
|
|
|
|
|
for _, assignedProverTask := range assignedProverTasks {
|
|
|
|
|
timeout.Inc()
|
|
|
|
|
log.Warn("proof task have reach the timeout", "task id", assignedProverTask.TaskID,
|
|
|
|
|
"prover public key", assignedProverTask.ProverPublicKey, "prover name", assignedProverTask.ProverName, "task type", assignedProverTask.TaskType)
|
|
|
|
|
err := c.db.Transaction(func(tx *gorm.DB) error {
|
|
|
|
|
// update prover task proving status as ProverProofInvalid
|
|
|
|
|
if err := c.proverTaskOrm.UpdateProverTaskProvingStatus(c.ctx, message.ProofType(assignedProverTask.TaskType),
|
|
|
|
|
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverProofInvalid, tx); err != nil {
|
|
|
|
|
log.Error("update prover task proving status failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// update prover task failure type
|
|
|
|
|
if err := c.proverTaskOrm.UpdateProverTaskFailureType(c.ctx, message.ProofType(assignedProverTask.TaskType),
|
|
|
|
|
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverTaskFailureTypeTimeout, tx); err != nil {
|
|
|
|
|
log.Error("update prover task failure type failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// update the task to unassigned, let collector restart it
|
|
|
|
|
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeChunk {
|
|
|
|
|
if err := c.chunkOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskUnassigned, tx); err != nil {
|
|
|
|
|
log.Error("update chunk proving status to unassigned to restart it failure", "hash", assignedProverTask.TaskID, "err", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeBatch {
|
|
|
|
|
if err := c.batchOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskUnassigned, tx); err != nil {
|
|
|
|
|
log.Error("update batch proving status to unassigned to restart it failure", "hash", assignedProverTask.TaskID, "err", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error("check task proof is timeout failure", "error", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|