|
|
|
|
@@ -23,7 +23,10 @@ type Collector struct {
|
|
|
|
|
db *gorm.DB
|
|
|
|
|
ctx context.Context
|
|
|
|
|
|
|
|
|
|
stopTimeoutChan chan struct{}
|
|
|
|
|
stopChunkTimeoutChan chan struct{}
|
|
|
|
|
stopBatchTimeoutChan chan struct{}
|
|
|
|
|
stopBatchAllChunkReadyChan chan struct{}
|
|
|
|
|
stopCleanChallengeChan chan struct{}
|
|
|
|
|
|
|
|
|
|
proverTaskOrm *orm.ProverTask
|
|
|
|
|
chunkOrm *orm.Chunk
|
|
|
|
|
@@ -40,14 +43,17 @@ type Collector struct {
|
|
|
|
|
// NewCollector create a collector to cron collect the data to send to prover
|
|
|
|
|
func NewCollector(ctx context.Context, db *gorm.DB, cfg *config.Config, reg prometheus.Registerer) *Collector {
|
|
|
|
|
c := &Collector{
|
|
|
|
|
cfg: cfg,
|
|
|
|
|
db: db,
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
stopTimeoutChan: make(chan struct{}),
|
|
|
|
|
proverTaskOrm: orm.NewProverTask(db),
|
|
|
|
|
chunkOrm: orm.NewChunk(db),
|
|
|
|
|
batchOrm: orm.NewBatch(db),
|
|
|
|
|
challenge: orm.NewChallenge(db),
|
|
|
|
|
cfg: cfg,
|
|
|
|
|
db: db,
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
stopChunkTimeoutChan: make(chan struct{}),
|
|
|
|
|
stopBatchTimeoutChan: make(chan struct{}),
|
|
|
|
|
stopBatchAllChunkReadyChan: make(chan struct{}),
|
|
|
|
|
stopCleanChallengeChan: make(chan struct{}),
|
|
|
|
|
proverTaskOrm: orm.NewProverTask(db),
|
|
|
|
|
chunkOrm: orm.NewChunk(db),
|
|
|
|
|
batchOrm: orm.NewBatch(db),
|
|
|
|
|
challenge: orm.NewChallenge(db),
|
|
|
|
|
|
|
|
|
|
timeoutBatchCheckerRunTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
|
|
|
|
|
Name: "coordinator_batch_timeout_checker_run_total",
|
|
|
|
|
@@ -83,7 +89,10 @@ func NewCollector(ctx context.Context, db *gorm.DB, cfg *config.Config, reg prom
|
|
|
|
|
|
|
|
|
|
// Stop all the collector
|
|
|
|
|
func (c *Collector) Stop() {
|
|
|
|
|
c.stopTimeoutChan <- struct{}{}
|
|
|
|
|
c.stopChunkTimeoutChan <- struct{}{}
|
|
|
|
|
c.stopBatchTimeoutChan <- struct{}{}
|
|
|
|
|
c.stopBatchAllChunkReadyChan <- struct{}{}
|
|
|
|
|
c.stopCleanChallengeChan <- struct{}{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// timeoutTask cron check the send task is timeout. if timeout reached, restore the
|
|
|
|
|
@@ -113,8 +122,8 @@ func (c *Collector) timeoutBatchProofTask() {
|
|
|
|
|
log.Error("manager context canceled with error", "error", c.ctx.Err())
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
case <-c.stopTimeoutChan:
|
|
|
|
|
log.Info("the coordinator run loop exit")
|
|
|
|
|
case <-c.stopBatchTimeoutChan:
|
|
|
|
|
log.Info("the coordinator timeoutBatchProofTask run loop exit")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -146,8 +155,8 @@ func (c *Collector) timeoutChunkProofTask() {
|
|
|
|
|
log.Error("manager context canceled with error", "error", c.ctx.Err())
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
case <-c.stopTimeoutChan:
|
|
|
|
|
log.Info("the coordinator run loop exit")
|
|
|
|
|
case <-c.stopChunkTimeoutChan:
|
|
|
|
|
log.Info("the coordinator timeoutChunkProofTask run loop exit")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -253,8 +262,8 @@ func (c *Collector) checkBatchAllChunkReady() {
|
|
|
|
|
log.Error("manager context canceled with error", "error", c.ctx.Err())
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
case <-c.stopTimeoutChan:
|
|
|
|
|
log.Info("the coordinator run loop exit")
|
|
|
|
|
case <-c.stopBatchAllChunkReadyChan:
|
|
|
|
|
log.Info("the coordinator checkBatchAllChunkReady run loop exit")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|