Compare commits

..

2 Commits

Author SHA1 Message Date
colin
bad77eac2f feat(coordinator): prover monitoring (#392)
Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
2023-04-07 09:06:58 +08:00
Péter Garamvölgyi
5d761ad812 Make sure attempts can be deserialized from db on startup (#410) 2023-04-05 19:00:54 +02:00
8 changed files with 146 additions and 44 deletions

View File

@@ -23,11 +23,11 @@ import (
var (
bridgeL2BatchesGasOverThresholdTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/gas/over/threshold/total", metrics.ScrollRegistry)
bridgeL2BatchesTxsOverThresholdTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/txs/over/threshold/total", metrics.ScrollRegistry)
bridgeL2BatchesCommitTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/commit/total", metrics.ScrollRegistry)
bridgeL2BatchesBlocksCreatedTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/blocks/created/total", metrics.ScrollRegistry)
bridgeL2BatchesCommitsSentTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/commits/sent/total", metrics.ScrollRegistry)
bridgeL2BatchesCreatedRateMeter = geth_metrics.NewRegisteredMeter("bridge/l2/batches/blocks/created/rate", metrics.ScrollRegistry)
bridgeL2BatchesTxsCreatedRateMeter = geth_metrics.NewRegisteredMeter("bridge/l2/batches/txs/created/rate", metrics.ScrollRegistry)
bridgeL2BatchesGasCreatedRateMeter = geth_metrics.NewRegisteredMeter("bridge/l2/batches/gas/created/rate", metrics.ScrollRegistry)
bridgeL2BatchesTxsCreatedPerBatchGauge = geth_metrics.NewRegisteredGauge("bridge/l2/batches/txs/created/per/batch", metrics.ScrollRegistry)
bridgeL2BatchesGasCreatedPerBatchGauge = geth_metrics.NewRegisteredGauge("bridge/l2/batches/gas/created/per/batch", metrics.ScrollRegistry)
)
// AddBatchInfoToDB inserts the batch information to the BlockBatch table and updates the batch_hash
@@ -257,7 +257,7 @@ func (p *BatchProposer) TryCommitBatches() {
log.Error("SendCommitTx failed", "error", err)
} else {
// pop the processed batches from the buffer
bridgeL2BatchesCommitTotalCounter.Inc(1)
bridgeL2BatchesCommitsSentTotalCounter.Inc(1)
p.batchDataBuffer = p.batchDataBuffer[index:]
}
}
@@ -273,9 +273,9 @@ func (p *BatchProposer) proposeBatch(blocks []*types.BlockInfo) bool {
if err := p.createBatchForBlocks(blocks[:1]); err != nil {
log.Error("failed to create batch", "number", blocks[0].Number, "err", err)
} else {
bridgeL2BatchesTxsCreatedRateMeter.Mark(int64(blocks[0].TxNum))
bridgeL2BatchesGasCreatedRateMeter.Mark(int64(blocks[0].GasUsed))
bridgeL2BatchesCreatedRateMeter.Mark(1)
bridgeL2BatchesTxsCreatedPerBatchGauge.Update(int64(blocks[0].TxNum))
bridgeL2BatchesGasCreatedPerBatchGauge.Update(int64(blocks[0].GasUsed))
bridgeL2BatchesBlocksCreatedTotalCounter.Inc(1)
}
return true
}
@@ -286,9 +286,9 @@ func (p *BatchProposer) proposeBatch(blocks []*types.BlockInfo) bool {
if err := p.createBatchForBlocks(blocks[:1]); err != nil {
log.Error("failed to create batch", "number", blocks[0].Number, "err", err)
} else {
bridgeL2BatchesTxsCreatedRateMeter.Mark(int64(blocks[0].TxNum))
bridgeL2BatchesGasCreatedRateMeter.Mark(int64(blocks[0].GasUsed))
bridgeL2BatchesCreatedRateMeter.Mark(1)
bridgeL2BatchesTxsCreatedPerBatchGauge.Update(int64(blocks[0].TxNum))
bridgeL2BatchesGasCreatedPerBatchGauge.Update(int64(blocks[0].GasUsed))
bridgeL2BatchesBlocksCreatedTotalCounter.Inc(1)
}
return true
}
@@ -316,9 +316,9 @@ func (p *BatchProposer) proposeBatch(blocks []*types.BlockInfo) bool {
if err := p.createBatchForBlocks(blocks); err != nil {
log.Error("failed to create batch", "from", blocks[0].Number, "to", blocks[len(blocks)-1].Number, "err", err)
} else {
bridgeL2BatchesTxsCreatedRateMeter.Mark(int64(txNum))
bridgeL2BatchesGasCreatedRateMeter.Mark(int64(gasUsed))
bridgeL2BatchesCreatedRateMeter.Mark(int64(len(blocks)))
bridgeL2BatchesTxsCreatedPerBatchGauge.Update(int64(txNum))
bridgeL2BatchesGasCreatedPerBatchGauge.Update(int64(gasUsed))
bridgeL2BatchesBlocksCreatedTotalCounter.Inc(int64(len(blocks)))
}
return true

View File

@@ -28,8 +28,8 @@ import (
// Metrics
var (
bridgeL2MsgsSyncHeightGauge = geth_metrics.NewRegisteredGauge("bridge/l2/msgs/sync/height", metrics.ScrollRegistry)
bridgeL2TracesFetchedHeightGauge = geth_metrics.NewRegisteredGauge("bridge/l2/traces/fetched/height", metrics.ScrollRegistry)
bridgeL2TracesFetchedGapGauge = geth_metrics.NewRegisteredGauge("bridge/l2/traces/fetched/gap", metrics.ScrollRegistry)
bridgeL2BlocksFetchedHeightGauge = geth_metrics.NewRegisteredGauge("bridge/l2/blocks/fetched/height", metrics.ScrollRegistry)
bridgeL2BlocksFetchedGapGauge = geth_metrics.NewRegisteredGauge("bridge/l2/blocks/fetched/gap", metrics.ScrollRegistry)
bridgeL2MsgsSentEventsTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/msgs/sent/events/total", metrics.ScrollRegistry)
bridgeL2MsgsAppendEventsTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/msgs/append/events/total", metrics.ScrollRegistry)
@@ -162,8 +162,8 @@ func (w *L2WatcherClient) TryFetchRunningMissingBlocks(ctx context.Context, bloc
log.Error("fail to getAndStoreBlockTraces", "from", from, "to", to, "err", err)
return
}
bridgeL2TracesFetchedHeightGauge.Update(int64(to))
bridgeL2TracesFetchedGapGauge.Update(int64(blockHeight - to))
bridgeL2BlocksFetchedHeightGauge.Update(int64(to))
bridgeL2BlocksFetchedGapGauge.Update(int64(blockHeight - to))
}
}

View File

@@ -162,7 +162,7 @@ type SessionInfo struct {
ID string `json:"id"`
Rollers map[string]*RollerStatus `json:"rollers"`
StartTimestamp int64 `json:"start_timestamp"`
Attempts uint8 `json:"attempts"`
Attempts uint8 `json:"attempts,omitempty"`
}
// ProvingStatus block_batch proving_status (unassigned, assigned, proved, verified, submitted)

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "v3.0.5"
var tag = "v3.0.7"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -117,6 +117,8 @@ func (m *Manager) SubmitProof(proof *message.ProofMsg) (bool, error) {
return false, fmt.Errorf("the roller or session id doesn't exist, pubkey: %s, ID: %s", pubkey, proof.ID)
}
m.updateMetricRollerProofsLastFinishedTimestampGauge(pubkey)
err := m.handleZkProof(pubkey, proof.ProofDetail)
if err != nil {
return false, err

View File

@@ -30,10 +30,19 @@ import (
)
var (
coordinatorSessionsTimeoutTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/timeout/total", metrics.ScrollRegistry)
coordinatorProofsReceivedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/proofs/received/total", metrics.ScrollRegistry)
coordinatorProofsVerifiedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/proofs/verified/total", metrics.ScrollRegistry)
coordinatorProofsVerifiedFailedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/proofs/verified/failed/total", metrics.ScrollRegistry)
// proofs
coordinatorProofsReceivedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/proofs/received/total", metrics.ScrollRegistry)
coordinatorProofsVerifiedSuccessTimeTimer = geth_metrics.NewRegisteredTimer("coordinator/proofs/verified/success/time", metrics.ScrollRegistry)
coordinatorProofsVerifiedFailedTimeTimer = geth_metrics.NewRegisteredTimer("coordinator/proofs/verified/failed/time", metrics.ScrollRegistry)
coordinatorProofsGeneratedFailedTimeTimer = geth_metrics.NewRegisteredTimer("coordinator/proofs/generated/failed/time", metrics.ScrollRegistry)
// sessions
coordinatorSessionsSuccessTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/success/total", metrics.ScrollRegistry)
coordinatorSessionsTimeoutTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/timeout/total", metrics.ScrollRegistry)
coordinatorSessionsFailedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/failed/total", metrics.ScrollRegistry)
coordinatorSessionsActiveNumberGauge = geth_metrics.NewRegisteredCounter("coordinator/sessions/active/number", metrics.ScrollRegistry)
)
const (
@@ -242,7 +251,8 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
if !ok {
return fmt.Errorf("proof generation session for id %v does not existID", msg.ID)
}
proofTimeSec := uint64(time.Since(time.Unix(sess.info.StartTimestamp, 0)).Seconds())
proofTime := time.Since(time.Unix(sess.info.StartTimestamp, 0))
proofTimeSec := uint64(proofTime.Seconds())
// Ensure this roller is eligible to participate in the session.
roller, ok := sess.info.Rollers[pk]
@@ -267,6 +277,7 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
"proof id", msg.ID,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"proof time", proofTimeSec,
)
defer func() {
@@ -287,11 +298,13 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
}()
if msg.Status != message.StatusOk {
log.Error(
"Roller failed to generate proof",
"msg.ID", msg.ID,
coordinatorProofsGeneratedFailedTimeTimer.Update(proofTime)
log.Info(
"proof generated by roller failed",
"proof id", msg.ID,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"proof time", proofTimeSec,
"error", msg.Error,
)
return nil
@@ -329,15 +342,24 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
"error", dbErr)
return dbErr
}
coordinatorProofsVerifiedTotalCounter.Inc(1)
coordinatorProofsVerifiedSuccessTimeTimer.Update(proofTime)
m.updateMetricRollerProofsVerifiedSuccessTimeTimer(roller.PublicKey, proofTime)
log.Info("proof verified by coordinator success", "proof id", msg.ID, "roller name", roller.Name,
"roller pk", roller.PublicKey, "proof time", proofTimeSec)
} else {
coordinatorProofsVerifiedFailedTotalCounter.Inc(1)
coordinatorProofsVerifiedFailedTimeTimer.Update(proofTime)
m.updateMetricRollerProofsVerifiedFailedTimeTimer(roller.PublicKey, proofTime)
log.Info("proof verified by coordinator failed", "proof id", msg.ID, "roller name", roller.Name,
"roller pk", roller.PublicKey, "proof time", proofTimeSec)
}
return nil
}
// CollectProofs collects proofs corresponding to a proof generation session.
func (m *Manager) CollectProofs(sess *session) {
coordinatorSessionsActiveNumberGauge.Inc(1)
defer coordinatorSessionsActiveNumberGauge.Dec(1)
for {
select {
//Execute after timeout, set in config.json. Consider all rollers failed.
@@ -371,6 +393,7 @@ func (m *Manager) CollectProofs(sess *session) {
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
coordinatorSessionsTimeoutTotalCounter.Inc(1)
return
//Execute after one of the roller finishes sending proof, return early if all rollers had sent results.
@@ -381,6 +404,7 @@ func (m *Manager) CollectProofs(sess *session) {
if err := m.orm.UpdateProvingStatus(ret.id, types.ProvingTaskFailed); err != nil {
log.Error("failed to update proving_status as failed", "msg.ID", ret.id, "error", err)
}
coordinatorSessionsFailedTotalCounter.Inc(1)
}
if err := m.orm.SetSessionInfo(sess.info); err != nil {
log.Error("db set session info fail", "pk", ret.pk, "error", err)
@@ -400,6 +424,8 @@ func (m *Manager) CollectProofs(sess *session) {
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
coordinatorSessionsSuccessTotalCounter.Inc(1)
return
}
m.mu.Unlock()
@@ -517,6 +543,7 @@ func (m *Manager) StartProofGenerationSession(task *types.BlockBatch, prevSessio
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", taskId)
continue
}
m.updateMetricRollerProofsLastAssignedTimestampGauge(roller.PublicKey)
rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned}
}
// No roller assigned.
@@ -545,17 +572,18 @@ func (m *Manager) StartProofGenerationSession(task *types.BlockBatch, prevSessio
sess.info.Attempts += prevSession.info.Attempts
}
for _, roller := range sess.info.Rollers {
log.Info(
"assigned proof to roller",
"session id", sess.info.ID,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"proof status", roller.Status)
}
// Store session info.
if err = m.orm.SetSessionInfo(sess.info); err != nil {
log.Error("db set session info fail", "error", err)
for _, roller := range sess.info.Rollers {
log.Error(
"restore roller info for session",
"session id", sess.info.ID,
"roller name", roller.Name,
"public key", roller.PublicKey,
"proof status", roller.Status)
}
log.Error("db set session info fail", "session id", sess.info.ID, "error", err)
return false
}

View File

@@ -0,0 +1,60 @@
package coordinator
import (
"time"
"github.com/scroll-tech/go-ethereum/log"
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
)
type rollerMetrics struct {
rollerProofsVerifiedSuccessTimeTimer geth_metrics.Timer
rollerProofsVerifiedFailedTimeTimer geth_metrics.Timer
rollerProofsGeneratedFailedTimeTimer geth_metrics.Timer
rollerProofsLastAssignedTimestampGauge geth_metrics.Gauge
rollerProofsLastFinishedTimestampGauge geth_metrics.Gauge
}
func (m *Manager) updateMetricRollerProofsLastFinishedTimestampGauge(pk string) {
if node, ok := m.rollerPool.Get(pk); ok {
rMs := node.(*rollerNode).rollerMetrics
if rMs != nil {
rMs.rollerProofsLastFinishedTimestampGauge.Update(time.Now().Unix())
} else {
log.Error("rollerProofsLastFinishedTimestampGauge is nil", "roller pk", pk)
}
}
}
func (m *Manager) updateMetricRollerProofsLastAssignedTimestampGauge(pk string) {
if node, ok := m.rollerPool.Get(pk); ok {
rMs := node.(*rollerNode).rollerMetrics
if rMs != nil {
rMs.rollerProofsLastAssignedTimestampGauge.Update(time.Now().Unix())
} else {
log.Error("rollerProofsLastAssignedTimestampGauge is nil", "roller pk", pk)
}
}
}
func (m *Manager) updateMetricRollerProofsVerifiedSuccessTimeTimer(pk string, d time.Duration) {
if node, ok := m.rollerPool.Get(pk); ok {
rMs := node.(*rollerNode).rollerMetrics
if rMs != nil {
rMs.rollerProofsVerifiedSuccessTimeTimer.Update(d)
} else {
log.Error("rollerProofsVerifiedSuccessTimeTimer is nil", "roller pk", pk)
}
}
}
func (m *Manager) updateMetricRollerProofsVerifiedFailedTimeTimer(pk string, d time.Duration) {
if node, ok := m.rollerPool.Get(pk); ok {
rMs := node.(*rollerNode).rollerMetrics
if rMs != nil {
rMs.rollerProofsVerifiedFailedTimeTimer.Update(d)
} else {
log.Error("rollerProofsVerifiedFailedTimeTimer is nil", "roller pk", pk)
}
}
}

View File

@@ -9,8 +9,10 @@ import (
cmap "github.com/orcaman/concurrent-map"
geth_types "github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/log"
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
"scroll-tech/common/message"
"scroll-tech/common/metrics"
"scroll-tech/common/types"
)
@@ -30,6 +32,8 @@ type rollerNode struct {
// Time of message creation
registerTime time.Time
*rollerMetrics
}
func (r *rollerNode) sendTask(id string, traces []*geth_types.BlockTrace) bool {
@@ -64,12 +68,20 @@ func (m *Manager) register(pubkey string, identity *message.Identity) (<-chan *m
node, ok := m.rollerPool.Get(pubkey)
if !ok {
taskIDs := m.reloadRollerAssignedTasks(pubkey)
rMs := &rollerMetrics{
rollerProofsVerifiedSuccessTimeTimer: geth_metrics.GetOrRegisterTimer(fmt.Sprintf("roller/proofs/verified/success/time/%s", pubkey), metrics.ScrollRegistry),
rollerProofsVerifiedFailedTimeTimer: geth_metrics.GetOrRegisterTimer(fmt.Sprintf("roller/proofs/verified/failed/time/%s", pubkey), metrics.ScrollRegistry),
rollerProofsGeneratedFailedTimeTimer: geth_metrics.GetOrRegisterTimer(fmt.Sprintf("roller/proofs/generated/failed/time/%s", pubkey), metrics.ScrollRegistry),
rollerProofsLastAssignedTimestampGauge: geth_metrics.GetOrRegisterGauge(fmt.Sprintf("roller/proofs/last/assigned/timestamp/%s", pubkey), metrics.ScrollRegistry),
rollerProofsLastFinishedTimestampGauge: geth_metrics.GetOrRegisterGauge(fmt.Sprintf("roller/proofs/last/finished/timestamp/%s", pubkey), metrics.ScrollRegistry),
}
node = &rollerNode{
Name: identity.Name,
Version: identity.Version,
PublicKey: pubkey,
TaskIDs: *taskIDs,
taskChan: make(chan *message.TaskMsg, 4),
Name: identity.Name,
Version: identity.Version,
PublicKey: pubkey,
TaskIDs: *taskIDs,
taskChan: make(chan *message.TaskMsg, 4),
rollerMetrics: rMs,
}
m.rollerPool.Set(pubkey, node)
}