Files
scroll/coordinator/rollers.go
Haichen Shen fb7002bd6d feat(bridge): update the watcher and relayer based on the new contract (#305)
Co-authored-by: colinlyguo <651734127@qq.com>
Co-authored-by: zimpha <zimpha@gmail.com>
Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
Co-authored-by: HAOYUatHZ <haoyu@protonmail.com>
2023-02-22 18:15:44 +08:00

138 lines
3.3 KiB
Go

package coordinator
import (
"crypto/rand"
"fmt"
"math/big"
"time"
cmap "github.com/orcaman/concurrent-map"
geth_types "github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/log"
"scroll-tech/common/message"
"scroll-tech/common/types"
)
// rollerNode records roller status and send task to connected roller.
type rollerNode struct {
// Roller name
Name string
// Roller public key
PublicKey string
// Roller version
Version string
// task channel
taskChan chan *message.TaskMsg
// session id list which delivered to roller.
TaskIDs cmap.ConcurrentMap
// Time of message creation
registerTime time.Time
}
func (r *rollerNode) sendTask(id string, traces []*geth_types.BlockTrace) bool {
select {
case r.taskChan <- &message.TaskMsg{
ID: id,
Traces: traces,
}:
r.TaskIDs.Set(id, struct{}{})
default:
log.Warn("roller channel is full", "roller name", r.Name, "public key", r.PublicKey)
return false
}
return true
}
func (m *Manager) reloadRollerAssignedTasks(pubkey string) *cmap.ConcurrentMap {
m.mu.RLock()
defer m.mu.RUnlock()
taskIDs := cmap.New()
for id, sess := range m.sessions {
for pk, roller := range sess.info.Rollers {
if pk == pubkey && roller.Status == types.RollerAssigned {
taskIDs.Set(id, struct{}{})
}
}
}
return &taskIDs
}
func (m *Manager) register(pubkey string, identity *message.Identity) (<-chan *message.TaskMsg, error) {
node, ok := m.rollerPool.Get(pubkey)
if !ok {
taskIDs := m.reloadRollerAssignedTasks(pubkey)
node = &rollerNode{
Name: identity.Name,
Version: identity.Version,
PublicKey: pubkey,
TaskIDs: *taskIDs,
taskChan: make(chan *message.TaskMsg, 4),
}
m.rollerPool.Set(pubkey, node)
}
roller := node.(*rollerNode)
// avoid reconnection too frequently.
if time.Since(roller.registerTime) < 60 {
log.Warn("roller reconnect too frequently", "roller_name", identity.Name, "public key", pubkey)
return nil, fmt.Errorf("roller reconnect too frequently")
}
// update register time and status
roller.registerTime = time.Now()
return roller.taskChan, nil
}
func (m *Manager) freeRoller(pk string) {
m.rollerPool.Pop(pk)
}
func (m *Manager) existTaskIDForRoller(pk string, id string) bool {
if node, ok := m.rollerPool.Get(pk); ok {
r := node.(*rollerNode)
return r.TaskIDs.Has(id)
}
return false
}
func (m *Manager) freeTaskIDForRoller(pk string, id string) {
if node, ok := m.rollerPool.Get(pk); ok {
r := node.(*rollerNode)
r.TaskIDs.Pop(id)
}
}
// GetNumberOfIdleRollers return the count of idle rollers.
func (m *Manager) GetNumberOfIdleRollers() (count int) {
for i, pk := range m.rollerPool.Keys() {
if val, ok := m.rollerPool.Get(pk); ok {
r := val.(*rollerNode)
if r.TaskIDs.Count() == 0 {
count++
}
} else {
log.Error("rollerPool Get fail", "pk", pk, "idx", i, "pk len", pk)
}
}
return count
}
func (m *Manager) selectRoller() *rollerNode {
pubkeys := m.rollerPool.Keys()
for len(pubkeys) > 0 {
idx, _ := rand.Int(rand.Reader, big.NewInt(int64(len(pubkeys))))
if val, ok := m.rollerPool.Get(pubkeys[idx.Int64()]); ok {
r := val.(*rollerNode)
if r.TaskIDs.Count() == 0 {
return r
}
} else {
log.Error("rollerPool Get fail", "pk", pubkeys[idx.Int64()], "idx", idx.Int64(), "pk len", len(pubkeys))
}
pubkeys[idx.Int64()], pubkeys = pubkeys[0], pubkeys[1:]
}
return nil
}