mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-12 23:48:15 -05:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bad77eac2f |
@@ -23,11 +23,11 @@ import (
|
||||
var (
|
||||
bridgeL2BatchesGasOverThresholdTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/gas/over/threshold/total", metrics.ScrollRegistry)
|
||||
bridgeL2BatchesTxsOverThresholdTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/txs/over/threshold/total", metrics.ScrollRegistry)
|
||||
bridgeL2BatchesCommitTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/commit/total", metrics.ScrollRegistry)
|
||||
bridgeL2BatchesBlocksCreatedTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/blocks/created/total", metrics.ScrollRegistry)
|
||||
bridgeL2BatchesCommitsSentTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/commits/sent/total", metrics.ScrollRegistry)
|
||||
|
||||
bridgeL2BatchesCreatedRateMeter = geth_metrics.NewRegisteredMeter("bridge/l2/batches/blocks/created/rate", metrics.ScrollRegistry)
|
||||
bridgeL2BatchesTxsCreatedRateMeter = geth_metrics.NewRegisteredMeter("bridge/l2/batches/txs/created/rate", metrics.ScrollRegistry)
|
||||
bridgeL2BatchesGasCreatedRateMeter = geth_metrics.NewRegisteredMeter("bridge/l2/batches/gas/created/rate", metrics.ScrollRegistry)
|
||||
bridgeL2BatchesTxsCreatedPerBatchGauge = geth_metrics.NewRegisteredGauge("bridge/l2/batches/txs/created/per/batch", metrics.ScrollRegistry)
|
||||
bridgeL2BatchesGasCreatedPerBatchGauge = geth_metrics.NewRegisteredGauge("bridge/l2/batches/gas/created/per/batch", metrics.ScrollRegistry)
|
||||
)
|
||||
|
||||
// AddBatchInfoToDB inserts the batch information to the BlockBatch table and updates the batch_hash
|
||||
@@ -257,7 +257,7 @@ func (p *BatchProposer) TryCommitBatches() {
|
||||
log.Error("SendCommitTx failed", "error", err)
|
||||
} else {
|
||||
// pop the processed batches from the buffer
|
||||
bridgeL2BatchesCommitTotalCounter.Inc(1)
|
||||
bridgeL2BatchesCommitsSentTotalCounter.Inc(1)
|
||||
p.batchDataBuffer = p.batchDataBuffer[index:]
|
||||
}
|
||||
}
|
||||
@@ -273,9 +273,9 @@ func (p *BatchProposer) proposeBatch(blocks []*types.BlockInfo) bool {
|
||||
if err := p.createBatchForBlocks(blocks[:1]); err != nil {
|
||||
log.Error("failed to create batch", "number", blocks[0].Number, "err", err)
|
||||
} else {
|
||||
bridgeL2BatchesTxsCreatedRateMeter.Mark(int64(blocks[0].TxNum))
|
||||
bridgeL2BatchesGasCreatedRateMeter.Mark(int64(blocks[0].GasUsed))
|
||||
bridgeL2BatchesCreatedRateMeter.Mark(1)
|
||||
bridgeL2BatchesTxsCreatedPerBatchGauge.Update(int64(blocks[0].TxNum))
|
||||
bridgeL2BatchesGasCreatedPerBatchGauge.Update(int64(blocks[0].GasUsed))
|
||||
bridgeL2BatchesBlocksCreatedTotalCounter.Inc(1)
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -286,9 +286,9 @@ func (p *BatchProposer) proposeBatch(blocks []*types.BlockInfo) bool {
|
||||
if err := p.createBatchForBlocks(blocks[:1]); err != nil {
|
||||
log.Error("failed to create batch", "number", blocks[0].Number, "err", err)
|
||||
} else {
|
||||
bridgeL2BatchesTxsCreatedRateMeter.Mark(int64(blocks[0].TxNum))
|
||||
bridgeL2BatchesGasCreatedRateMeter.Mark(int64(blocks[0].GasUsed))
|
||||
bridgeL2BatchesCreatedRateMeter.Mark(1)
|
||||
bridgeL2BatchesTxsCreatedPerBatchGauge.Update(int64(blocks[0].TxNum))
|
||||
bridgeL2BatchesGasCreatedPerBatchGauge.Update(int64(blocks[0].GasUsed))
|
||||
bridgeL2BatchesBlocksCreatedTotalCounter.Inc(1)
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -316,9 +316,9 @@ func (p *BatchProposer) proposeBatch(blocks []*types.BlockInfo) bool {
|
||||
if err := p.createBatchForBlocks(blocks); err != nil {
|
||||
log.Error("failed to create batch", "from", blocks[0].Number, "to", blocks[len(blocks)-1].Number, "err", err)
|
||||
} else {
|
||||
bridgeL2BatchesTxsCreatedRateMeter.Mark(int64(txNum))
|
||||
bridgeL2BatchesGasCreatedRateMeter.Mark(int64(gasUsed))
|
||||
bridgeL2BatchesCreatedRateMeter.Mark(int64(len(blocks)))
|
||||
bridgeL2BatchesTxsCreatedPerBatchGauge.Update(int64(txNum))
|
||||
bridgeL2BatchesGasCreatedPerBatchGauge.Update(int64(gasUsed))
|
||||
bridgeL2BatchesBlocksCreatedTotalCounter.Inc(int64(len(blocks)))
|
||||
}
|
||||
|
||||
return true
|
||||
|
||||
@@ -28,8 +28,8 @@ import (
|
||||
// Metrics
|
||||
var (
|
||||
bridgeL2MsgsSyncHeightGauge = geth_metrics.NewRegisteredGauge("bridge/l2/msgs/sync/height", metrics.ScrollRegistry)
|
||||
bridgeL2TracesFetchedHeightGauge = geth_metrics.NewRegisteredGauge("bridge/l2/traces/fetched/height", metrics.ScrollRegistry)
|
||||
bridgeL2TracesFetchedGapGauge = geth_metrics.NewRegisteredGauge("bridge/l2/traces/fetched/gap", metrics.ScrollRegistry)
|
||||
bridgeL2BlocksFetchedHeightGauge = geth_metrics.NewRegisteredGauge("bridge/l2/blocks/fetched/height", metrics.ScrollRegistry)
|
||||
bridgeL2BlocksFetchedGapGauge = geth_metrics.NewRegisteredGauge("bridge/l2/blocks/fetched/gap", metrics.ScrollRegistry)
|
||||
|
||||
bridgeL2MsgsSentEventsTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/msgs/sent/events/total", metrics.ScrollRegistry)
|
||||
bridgeL2MsgsAppendEventsTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/msgs/append/events/total", metrics.ScrollRegistry)
|
||||
@@ -162,8 +162,8 @@ func (w *L2WatcherClient) TryFetchRunningMissingBlocks(ctx context.Context, bloc
|
||||
log.Error("fail to getAndStoreBlockTraces", "from", from, "to", to, "err", err)
|
||||
return
|
||||
}
|
||||
bridgeL2TracesFetchedHeightGauge.Update(int64(to))
|
||||
bridgeL2TracesFetchedGapGauge.Update(int64(blockHeight - to))
|
||||
bridgeL2BlocksFetchedHeightGauge.Update(int64(to))
|
||||
bridgeL2BlocksFetchedGapGauge.Update(int64(blockHeight - to))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
var tag = "v3.0.6"
|
||||
var tag = "v3.0.7"
|
||||
|
||||
var commit = func() string {
|
||||
if info, ok := debug.ReadBuildInfo(); ok {
|
||||
|
||||
@@ -117,6 +117,8 @@ func (m *Manager) SubmitProof(proof *message.ProofMsg) (bool, error) {
|
||||
return false, fmt.Errorf("the roller or session id doesn't exist, pubkey: %s, ID: %s", pubkey, proof.ID)
|
||||
}
|
||||
|
||||
m.updateMetricRollerProofsLastFinishedTimestampGauge(pubkey)
|
||||
|
||||
err := m.handleZkProof(pubkey, proof.ProofDetail)
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
||||
@@ -30,10 +30,19 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
coordinatorSessionsTimeoutTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/timeout/total", metrics.ScrollRegistry)
|
||||
coordinatorProofsReceivedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/proofs/received/total", metrics.ScrollRegistry)
|
||||
coordinatorProofsVerifiedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/proofs/verified/total", metrics.ScrollRegistry)
|
||||
coordinatorProofsVerifiedFailedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/proofs/verified/failed/total", metrics.ScrollRegistry)
|
||||
// proofs
|
||||
coordinatorProofsReceivedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/proofs/received/total", metrics.ScrollRegistry)
|
||||
|
||||
coordinatorProofsVerifiedSuccessTimeTimer = geth_metrics.NewRegisteredTimer("coordinator/proofs/verified/success/time", metrics.ScrollRegistry)
|
||||
coordinatorProofsVerifiedFailedTimeTimer = geth_metrics.NewRegisteredTimer("coordinator/proofs/verified/failed/time", metrics.ScrollRegistry)
|
||||
coordinatorProofsGeneratedFailedTimeTimer = geth_metrics.NewRegisteredTimer("coordinator/proofs/generated/failed/time", metrics.ScrollRegistry)
|
||||
|
||||
// sessions
|
||||
coordinatorSessionsSuccessTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/success/total", metrics.ScrollRegistry)
|
||||
coordinatorSessionsTimeoutTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/timeout/total", metrics.ScrollRegistry)
|
||||
coordinatorSessionsFailedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/failed/total", metrics.ScrollRegistry)
|
||||
|
||||
coordinatorSessionsActiveNumberGauge = geth_metrics.NewRegisteredCounter("coordinator/sessions/active/number", metrics.ScrollRegistry)
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -242,7 +251,8 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
if !ok {
|
||||
return fmt.Errorf("proof generation session for id %v does not existID", msg.ID)
|
||||
}
|
||||
proofTimeSec := uint64(time.Since(time.Unix(sess.info.StartTimestamp, 0)).Seconds())
|
||||
proofTime := time.Since(time.Unix(sess.info.StartTimestamp, 0))
|
||||
proofTimeSec := uint64(proofTime.Seconds())
|
||||
|
||||
// Ensure this roller is eligible to participate in the session.
|
||||
roller, ok := sess.info.Rollers[pk]
|
||||
@@ -267,6 +277,7 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
"proof id", msg.ID,
|
||||
"roller name", roller.Name,
|
||||
"roller pk", roller.PublicKey,
|
||||
"proof time", proofTimeSec,
|
||||
)
|
||||
|
||||
defer func() {
|
||||
@@ -287,11 +298,13 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
}()
|
||||
|
||||
if msg.Status != message.StatusOk {
|
||||
log.Error(
|
||||
"Roller failed to generate proof",
|
||||
"msg.ID", msg.ID,
|
||||
coordinatorProofsGeneratedFailedTimeTimer.Update(proofTime)
|
||||
log.Info(
|
||||
"proof generated by roller failed",
|
||||
"proof id", msg.ID,
|
||||
"roller name", roller.Name,
|
||||
"roller pk", roller.PublicKey,
|
||||
"proof time", proofTimeSec,
|
||||
"error", msg.Error,
|
||||
)
|
||||
return nil
|
||||
@@ -329,15 +342,24 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
"error", dbErr)
|
||||
return dbErr
|
||||
}
|
||||
coordinatorProofsVerifiedTotalCounter.Inc(1)
|
||||
coordinatorProofsVerifiedSuccessTimeTimer.Update(proofTime)
|
||||
m.updateMetricRollerProofsVerifiedSuccessTimeTimer(roller.PublicKey, proofTime)
|
||||
log.Info("proof verified by coordinator success", "proof id", msg.ID, "roller name", roller.Name,
|
||||
"roller pk", roller.PublicKey, "proof time", proofTimeSec)
|
||||
} else {
|
||||
coordinatorProofsVerifiedFailedTotalCounter.Inc(1)
|
||||
coordinatorProofsVerifiedFailedTimeTimer.Update(proofTime)
|
||||
m.updateMetricRollerProofsVerifiedFailedTimeTimer(roller.PublicKey, proofTime)
|
||||
log.Info("proof verified by coordinator failed", "proof id", msg.ID, "roller name", roller.Name,
|
||||
"roller pk", roller.PublicKey, "proof time", proofTimeSec)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CollectProofs collects proofs corresponding to a proof generation session.
|
||||
func (m *Manager) CollectProofs(sess *session) {
|
||||
coordinatorSessionsActiveNumberGauge.Inc(1)
|
||||
defer coordinatorSessionsActiveNumberGauge.Dec(1)
|
||||
|
||||
for {
|
||||
select {
|
||||
//Execute after timeout, set in config.json. Consider all rollers failed.
|
||||
@@ -371,6 +393,7 @@ func (m *Manager) CollectProofs(sess *session) {
|
||||
}
|
||||
delete(m.sessions, sess.info.ID)
|
||||
m.mu.Unlock()
|
||||
coordinatorSessionsTimeoutTotalCounter.Inc(1)
|
||||
return
|
||||
|
||||
//Execute after one of the roller finishes sending proof, return early if all rollers had sent results.
|
||||
@@ -381,6 +404,7 @@ func (m *Manager) CollectProofs(sess *session) {
|
||||
if err := m.orm.UpdateProvingStatus(ret.id, types.ProvingTaskFailed); err != nil {
|
||||
log.Error("failed to update proving_status as failed", "msg.ID", ret.id, "error", err)
|
||||
}
|
||||
coordinatorSessionsFailedTotalCounter.Inc(1)
|
||||
}
|
||||
if err := m.orm.SetSessionInfo(sess.info); err != nil {
|
||||
log.Error("db set session info fail", "pk", ret.pk, "error", err)
|
||||
@@ -400,6 +424,8 @@ func (m *Manager) CollectProofs(sess *session) {
|
||||
}
|
||||
delete(m.sessions, sess.info.ID)
|
||||
m.mu.Unlock()
|
||||
|
||||
coordinatorSessionsSuccessTotalCounter.Inc(1)
|
||||
return
|
||||
}
|
||||
m.mu.Unlock()
|
||||
@@ -517,6 +543,7 @@ func (m *Manager) StartProofGenerationSession(task *types.BlockBatch, prevSessio
|
||||
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", taskId)
|
||||
continue
|
||||
}
|
||||
m.updateMetricRollerProofsLastAssignedTimestampGauge(roller.PublicKey)
|
||||
rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned}
|
||||
}
|
||||
// No roller assigned.
|
||||
@@ -545,17 +572,18 @@ func (m *Manager) StartProofGenerationSession(task *types.BlockBatch, prevSessio
|
||||
sess.info.Attempts += prevSession.info.Attempts
|
||||
}
|
||||
|
||||
for _, roller := range sess.info.Rollers {
|
||||
log.Info(
|
||||
"assigned proof to roller",
|
||||
"session id", sess.info.ID,
|
||||
"roller name", roller.Name,
|
||||
"roller pk", roller.PublicKey,
|
||||
"proof status", roller.Status)
|
||||
}
|
||||
|
||||
// Store session info.
|
||||
if err = m.orm.SetSessionInfo(sess.info); err != nil {
|
||||
log.Error("db set session info fail", "error", err)
|
||||
for _, roller := range sess.info.Rollers {
|
||||
log.Error(
|
||||
"restore roller info for session",
|
||||
"session id", sess.info.ID,
|
||||
"roller name", roller.Name,
|
||||
"public key", roller.PublicKey,
|
||||
"proof status", roller.Status)
|
||||
}
|
||||
log.Error("db set session info fail", "session id", sess.info.ID, "error", err)
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
60
coordinator/roller_metrics.go
Normal file
60
coordinator/roller_metrics.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package coordinator
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
|
||||
)
|
||||
|
||||
type rollerMetrics struct {
|
||||
rollerProofsVerifiedSuccessTimeTimer geth_metrics.Timer
|
||||
rollerProofsVerifiedFailedTimeTimer geth_metrics.Timer
|
||||
rollerProofsGeneratedFailedTimeTimer geth_metrics.Timer
|
||||
rollerProofsLastAssignedTimestampGauge geth_metrics.Gauge
|
||||
rollerProofsLastFinishedTimestampGauge geth_metrics.Gauge
|
||||
}
|
||||
|
||||
func (m *Manager) updateMetricRollerProofsLastFinishedTimestampGauge(pk string) {
|
||||
if node, ok := m.rollerPool.Get(pk); ok {
|
||||
rMs := node.(*rollerNode).rollerMetrics
|
||||
if rMs != nil {
|
||||
rMs.rollerProofsLastFinishedTimestampGauge.Update(time.Now().Unix())
|
||||
} else {
|
||||
log.Error("rollerProofsLastFinishedTimestampGauge is nil", "roller pk", pk)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) updateMetricRollerProofsLastAssignedTimestampGauge(pk string) {
|
||||
if node, ok := m.rollerPool.Get(pk); ok {
|
||||
rMs := node.(*rollerNode).rollerMetrics
|
||||
if rMs != nil {
|
||||
rMs.rollerProofsLastAssignedTimestampGauge.Update(time.Now().Unix())
|
||||
} else {
|
||||
log.Error("rollerProofsLastAssignedTimestampGauge is nil", "roller pk", pk)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) updateMetricRollerProofsVerifiedSuccessTimeTimer(pk string, d time.Duration) {
|
||||
if node, ok := m.rollerPool.Get(pk); ok {
|
||||
rMs := node.(*rollerNode).rollerMetrics
|
||||
if rMs != nil {
|
||||
rMs.rollerProofsVerifiedSuccessTimeTimer.Update(d)
|
||||
} else {
|
||||
log.Error("rollerProofsVerifiedSuccessTimeTimer is nil", "roller pk", pk)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) updateMetricRollerProofsVerifiedFailedTimeTimer(pk string, d time.Duration) {
|
||||
if node, ok := m.rollerPool.Get(pk); ok {
|
||||
rMs := node.(*rollerNode).rollerMetrics
|
||||
if rMs != nil {
|
||||
rMs.rollerProofsVerifiedFailedTimeTimer.Update(d)
|
||||
} else {
|
||||
log.Error("rollerProofsVerifiedFailedTimeTimer is nil", "roller pk", pk)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -9,8 +9,10 @@ import (
|
||||
cmap "github.com/orcaman/concurrent-map"
|
||||
geth_types "github.com/scroll-tech/go-ethereum/core/types"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
|
||||
|
||||
"scroll-tech/common/message"
|
||||
"scroll-tech/common/metrics"
|
||||
"scroll-tech/common/types"
|
||||
)
|
||||
|
||||
@@ -30,6 +32,8 @@ type rollerNode struct {
|
||||
|
||||
// Time of message creation
|
||||
registerTime time.Time
|
||||
|
||||
*rollerMetrics
|
||||
}
|
||||
|
||||
func (r *rollerNode) sendTask(id string, traces []*geth_types.BlockTrace) bool {
|
||||
@@ -64,12 +68,20 @@ func (m *Manager) register(pubkey string, identity *message.Identity) (<-chan *m
|
||||
node, ok := m.rollerPool.Get(pubkey)
|
||||
if !ok {
|
||||
taskIDs := m.reloadRollerAssignedTasks(pubkey)
|
||||
rMs := &rollerMetrics{
|
||||
rollerProofsVerifiedSuccessTimeTimer: geth_metrics.GetOrRegisterTimer(fmt.Sprintf("roller/proofs/verified/success/time/%s", pubkey), metrics.ScrollRegistry),
|
||||
rollerProofsVerifiedFailedTimeTimer: geth_metrics.GetOrRegisterTimer(fmt.Sprintf("roller/proofs/verified/failed/time/%s", pubkey), metrics.ScrollRegistry),
|
||||
rollerProofsGeneratedFailedTimeTimer: geth_metrics.GetOrRegisterTimer(fmt.Sprintf("roller/proofs/generated/failed/time/%s", pubkey), metrics.ScrollRegistry),
|
||||
rollerProofsLastAssignedTimestampGauge: geth_metrics.GetOrRegisterGauge(fmt.Sprintf("roller/proofs/last/assigned/timestamp/%s", pubkey), metrics.ScrollRegistry),
|
||||
rollerProofsLastFinishedTimestampGauge: geth_metrics.GetOrRegisterGauge(fmt.Sprintf("roller/proofs/last/finished/timestamp/%s", pubkey), metrics.ScrollRegistry),
|
||||
}
|
||||
node = &rollerNode{
|
||||
Name: identity.Name,
|
||||
Version: identity.Version,
|
||||
PublicKey: pubkey,
|
||||
TaskIDs: *taskIDs,
|
||||
taskChan: make(chan *message.TaskMsg, 4),
|
||||
Name: identity.Name,
|
||||
Version: identity.Version,
|
||||
PublicKey: pubkey,
|
||||
TaskIDs: *taskIDs,
|
||||
taskChan: make(chan *message.TaskMsg, 4),
|
||||
rollerMetrics: rMs,
|
||||
}
|
||||
m.rollerPool.Set(pubkey, node)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user