mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-12 15:38:18 -05:00
Compare commits
13 Commits
test/code
...
manager_ap
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
50305f3039 | ||
|
|
dfc9a44743 | ||
|
|
08c49d9b2c | ||
|
|
ecb3f5a043 | ||
|
|
7d9e111e9c | ||
|
|
25e43462c6 | ||
|
|
74e0960dc5 | ||
|
|
76cfb97f99 | ||
|
|
6880dd83da | ||
|
|
9d6e53a120 | ||
|
|
0940788143 | ||
|
|
ad46a85a2d | ||
|
|
9d29a95675 |
@@ -25,6 +25,13 @@ type RollerAPI interface {
|
|||||||
SubmitProof(proof *message.ProofMsg) error
|
SubmitProof(proof *message.ProofMsg) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AdminAPI for Coordinator in order to manage process.
|
||||||
|
type AdminAPI interface {
|
||||||
|
StartSend()
|
||||||
|
PauseSend()
|
||||||
|
PauseSendUntil(batchIdx uint64)
|
||||||
|
}
|
||||||
|
|
||||||
// RequestToken generates and sends back register token for roller
|
// RequestToken generates and sends back register token for roller
|
||||||
func (m *Manager) RequestToken(authMsg *message.AuthMsg) (string, error) {
|
func (m *Manager) RequestToken(authMsg *message.AuthMsg) (string, error) {
|
||||||
if ok, err := authMsg.Verify(); !ok {
|
if ok, err := authMsg.Verify(); !ok {
|
||||||
@@ -127,3 +134,18 @@ func (m *Manager) SubmitProof(proof *message.ProofMsg) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StartSend starts to send basic tasks.
|
||||||
|
func (m *Manager) StartSend() {
|
||||||
|
m.StartSendTask()
|
||||||
|
}
|
||||||
|
|
||||||
|
// PauseSend pauses to send basic tasks.
|
||||||
|
func (m *Manager) PauseSend() {
|
||||||
|
m.PauseSendTask()
|
||||||
|
}
|
||||||
|
|
||||||
|
// PauseSendUntil pause to send basic tasks until batchIdx.
|
||||||
|
func (m *Manager) PauseSendUntil(batchIdx uint64) {
|
||||||
|
m.PauseSendTaskUntil(batchIdx)
|
||||||
|
}
|
||||||
|
|||||||
@@ -17,7 +17,9 @@ const (
|
|||||||
|
|
||||||
// RollerManagerConfig loads sequencer configuration items.
|
// RollerManagerConfig loads sequencer configuration items.
|
||||||
type RollerManagerConfig struct {
|
type RollerManagerConfig struct {
|
||||||
CompressionLevel int `json:"compression_level,omitempty"`
|
PauseSendTask bool `json:"pause_send_task"`
|
||||||
|
PauseSendTaskUntil uint64 `json:"pause_send_task_until"`
|
||||||
|
CompressionLevel int `json:"compression_level,omitempty"`
|
||||||
// asc or desc (default: asc)
|
// asc or desc (default: asc)
|
||||||
OrderSession string `json:"order_session,omitempty"`
|
OrderSession string `json:"order_session,omitempty"`
|
||||||
// The amount of rollers to pick per proof generation session.
|
// The amount of rollers to pick per proof generation session.
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ require (
|
|||||||
github.com/scroll-tech/go-ethereum v1.10.14-0.20230508165858-27a3830afa61
|
github.com/scroll-tech/go-ethereum v1.10.14-0.20230508165858-27a3830afa61
|
||||||
github.com/stretchr/testify v1.8.2
|
github.com/stretchr/testify v1.8.2
|
||||||
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
|
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
|
||||||
|
go.uber.org/atomic v1.11.0
|
||||||
golang.org/x/exp v0.0.0-20230206171751-46f607a40771
|
golang.org/x/exp v0.0.0-20230206171751-46f607a40771
|
||||||
golang.org/x/sync v0.1.0
|
golang.org/x/sync v0.1.0
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -133,6 +133,8 @@ github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRT
|
|||||||
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
|
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
|
||||||
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
|
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
|
||||||
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
|
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
|
||||||
|
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
|
||||||
|
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
||||||
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ import (
|
|||||||
"github.com/scroll-tech/go-ethereum/log"
|
"github.com/scroll-tech/go-ethereum/log"
|
||||||
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
|
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
|
||||||
"github.com/scroll-tech/go-ethereum/rpc"
|
"github.com/scroll-tech/go-ethereum/rpc"
|
||||||
|
uatomic "go.uber.org/atomic"
|
||||||
"golang.org/x/exp/rand"
|
"golang.org/x/exp/rand"
|
||||||
|
|
||||||
"scroll-tech/common/metrics"
|
"scroll-tech/common/metrics"
|
||||||
@@ -75,7 +76,9 @@ type Manager struct {
|
|||||||
cfg *config.RollerManagerConfig
|
cfg *config.RollerManagerConfig
|
||||||
|
|
||||||
// The indicator whether the backend is running or not.
|
// The indicator whether the backend is running or not.
|
||||||
running int32
|
running int32
|
||||||
|
sendTaskPaused *uatomic.Bool
|
||||||
|
pauseUntilBatchIdx *uatomic.Uint64
|
||||||
|
|
||||||
// A mutex guarding the boolean below.
|
// A mutex guarding the boolean below.
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
@@ -117,6 +120,8 @@ func New(ctx context.Context, cfg *config.RollerManagerConfig, orm database.OrmF
|
|||||||
return &Manager{
|
return &Manager{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
|
sendTaskPaused: uatomic.NewBool(cfg.PauseSendTask),
|
||||||
|
pauseUntilBatchIdx: uatomic.NewUint64(cfg.PauseSendTaskUntil),
|
||||||
rollerPool: cmap.New(),
|
rollerPool: cmap.New(),
|
||||||
sessions: make(map[string]*session),
|
sessions: make(map[string]*session),
|
||||||
failedSessionInfos: make(map[string]*SessionInfo),
|
failedSessionInfos: make(map[string]*SessionInfo),
|
||||||
@@ -201,7 +206,13 @@ func (m *Manager) Loop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Select basic type roller and send message
|
// Select basic type roller and send message
|
||||||
for len(tasks) > 0 && m.StartBasicProofGenerationSession(tasks[0], nil) {
|
for len(tasks) > 0 {
|
||||||
|
if m.isSendTaskPaused(tasks[0].Index) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if !m.StartBasicProofGenerationSession(tasks[0], nil) {
|
||||||
|
break
|
||||||
|
}
|
||||||
tasks = tasks[1:]
|
tasks = tasks[1:]
|
||||||
}
|
}
|
||||||
case <-m.ctx.Done():
|
case <-m.ctx.Done():
|
||||||
@@ -559,6 +570,11 @@ func (m *Manager) APIs() []rpc.API {
|
|||||||
Service: RollerAPI(m),
|
Service: RollerAPI(m),
|
||||||
Public: true,
|
Public: true,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Namespace: "admin",
|
||||||
|
Service: AdminAPI(m),
|
||||||
|
Public: true,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Namespace: "debug",
|
Namespace: "debug",
|
||||||
Public: true,
|
Public: true,
|
||||||
@@ -567,6 +583,26 @@ func (m *Manager) APIs() []rpc.API {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StartSendTask starts to send basic tasks loop.
|
||||||
|
func (m *Manager) StartSendTask() {
|
||||||
|
m.sendTaskPaused.Store(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PauseSendTask pauses to send basic tasks loop.
|
||||||
|
func (m *Manager) PauseSendTask() {
|
||||||
|
m.sendTaskPaused.Store(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PauseSendTaskUntil pauses to send basic tasks loop until batchIdx.
|
||||||
|
func (m *Manager) PauseSendTaskUntil(batchIdx uint64) {
|
||||||
|
m.PauseSendTask()
|
||||||
|
m.pauseUntilBatchIdx.Store(batchIdx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) isSendTaskPaused(batchIdx uint64) bool {
|
||||||
|
return m.sendTaskPaused.Load() && m.pauseUntilBatchIdx.Load() > batchIdx
|
||||||
|
}
|
||||||
|
|
||||||
// StartBasicProofGenerationSession starts a basic proof generation session
|
// StartBasicProofGenerationSession starts a basic proof generation session
|
||||||
func (m *Manager) StartBasicProofGenerationSession(task *types.BlockBatch, prevSession *session) (success bool) {
|
func (m *Manager) StartBasicProofGenerationSession(task *types.BlockBatch, prevSession *session) (success bool) {
|
||||||
var taskID string
|
var taskID string
|
||||||
|
|||||||
Reference in New Issue
Block a user