mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-04-23 03:00:50 -04:00
feat(bridge): update the watcher and relayer based on the new contract (#305)
Co-authored-by: colinlyguo <651734127@qq.com> Co-authored-by: zimpha <zimpha@gmail.com> Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com> Co-authored-by: HAOYUatHZ <haoyu@protonmail.com>
This commit is contained in:
@@ -4,7 +4,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"scroll-tech/database/orm"
|
||||
"scroll-tech/common/types"
|
||||
)
|
||||
|
||||
// RollerDebugAPI roller api interface in order go get debug message.
|
||||
@@ -63,7 +63,7 @@ func (m *Manager) ListRollers() ([]*RollerInfo, error) {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func newSessionInfo(sess *session, status orm.ProvingStatus, errMsg string, finished bool) *SessionInfo {
|
||||
func newSessionInfo(sess *session, status types.ProvingStatus, errMsg string, finished bool) *SessionInfo {
|
||||
now := time.Now()
|
||||
var nameList []string
|
||||
for pk := range sess.info.Rollers {
|
||||
@@ -90,7 +90,7 @@ func (m *Manager) GetSessionInfo(sessionID string) (*SessionInfo, error) {
|
||||
return info, nil
|
||||
}
|
||||
if s, ok := m.sessions[sessionID]; ok {
|
||||
return newSessionInfo(s, orm.ProvingTaskAssigned, "", false), nil
|
||||
return newSessionInfo(s, types.ProvingTaskAssigned, "", false), nil
|
||||
}
|
||||
return nil, fmt.Errorf("no such session, sessionID: %s", sessionID)
|
||||
}
|
||||
|
||||
@@ -70,6 +70,7 @@ github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46f
|
||||
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
|
||||
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
|
||||
|
||||
@@ -12,15 +12,15 @@ import (
|
||||
cmap "github.com/orcaman/concurrent-map"
|
||||
"github.com/patrickmn/go-cache"
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/core/types"
|
||||
geth_types "github.com/scroll-tech/go-ethereum/core/types"
|
||||
"github.com/scroll-tech/go-ethereum/ethclient"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"github.com/scroll-tech/go-ethereum/rpc"
|
||||
|
||||
"scroll-tech/common/message"
|
||||
"scroll-tech/common/types"
|
||||
|
||||
"scroll-tech/database"
|
||||
"scroll-tech/database/orm"
|
||||
|
||||
"scroll-tech/coordinator/config"
|
||||
"scroll-tech/coordinator/verifier"
|
||||
@@ -33,12 +33,12 @@ const (
|
||||
type rollerProofStatus struct {
|
||||
id string
|
||||
pk string
|
||||
status orm.RollerProveStatus
|
||||
status types.RollerProveStatus
|
||||
}
|
||||
|
||||
// Contains all the information on an ongoing proof generation session.
|
||||
type session struct {
|
||||
info *orm.SessionInfo
|
||||
info *types.SessionInfo
|
||||
// finish channel is used to pass the public key of the rollers who finished proving process.
|
||||
finishChan chan rollerProofStatus
|
||||
}
|
||||
@@ -136,8 +136,8 @@ func (m *Manager) isRunning() bool {
|
||||
// Loop keeps the manager running.
|
||||
func (m *Manager) Loop() {
|
||||
var (
|
||||
tick = time.NewTicker(time.Second * 3)
|
||||
tasks []*orm.BlockBatch
|
||||
tick = time.NewTicker(time.Second * 2)
|
||||
tasks []*types.BlockBatch
|
||||
)
|
||||
defer tick.Stop()
|
||||
|
||||
@@ -148,7 +148,7 @@ func (m *Manager) Loop() {
|
||||
var err error
|
||||
// TODO: add cache
|
||||
if tasks, err = m.orm.GetBlockBatches(
|
||||
map[string]interface{}{"proving_status": orm.ProvingTaskUnassigned},
|
||||
map[string]interface{}{"proving_status": types.ProvingTaskUnassigned},
|
||||
fmt.Sprintf(
|
||||
"ORDER BY index %s LIMIT %d;",
|
||||
m.cfg.OrderSession,
|
||||
@@ -183,9 +183,9 @@ func (m *Manager) restorePrevSessions() {
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if ids, err := m.orm.GetAssignedBatchIDs(); err != nil {
|
||||
log.Error("failed to get assigned batch ids from db", "error", err)
|
||||
} else if prevSessions, err := m.orm.GetSessionInfosByIDs(ids); err != nil {
|
||||
if hashes, err := m.orm.GetAssignedBatchHashes(); err != nil {
|
||||
log.Error("failed to get assigned batch hashes from db", "error", err)
|
||||
} else if prevSessions, err := m.orm.GetSessionInfosByHashes(hashes); err != nil {
|
||||
log.Error("failed to recover roller session info from db", "error", err)
|
||||
} else {
|
||||
for _, v := range prevSessions {
|
||||
@@ -233,7 +233,7 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
if !ok {
|
||||
return fmt.Errorf("roller %s (%s) is not eligible to partake in proof session %v", roller.Name, roller.PublicKey, msg.ID)
|
||||
}
|
||||
if roller.Status == orm.RollerProofValid {
|
||||
if roller.Status == types.RollerProofValid {
|
||||
// In order to prevent DoS attacks, it is forbidden to repeatedly submit valid proofs.
|
||||
// TODO: Defend invalid proof resubmissions by one of the following two methods:
|
||||
// (i) slash the roller for each submission of invalid proof
|
||||
@@ -257,14 +257,14 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
// TODO: maybe we should use db tx for the whole process?
|
||||
// Roll back current proof's status.
|
||||
if dbErr != nil {
|
||||
if err := m.orm.UpdateProvingStatus(msg.ID, orm.ProvingTaskUnassigned); err != nil {
|
||||
if err := m.orm.UpdateProvingStatus(msg.ID, types.ProvingTaskUnassigned); err != nil {
|
||||
log.Error("fail to reset task status as Unassigned", "msg.ID", msg.ID)
|
||||
}
|
||||
}
|
||||
// set proof status
|
||||
status := orm.RollerProofInvalid
|
||||
status := types.RollerProofInvalid
|
||||
if success && dbErr == nil {
|
||||
status = orm.RollerProofValid
|
||||
status = types.RollerProofValid
|
||||
}
|
||||
// notify the session that the roller finishes the proving process
|
||||
sess.finishChan <- rollerProofStatus{msg.ID, pk, status}
|
||||
@@ -282,17 +282,17 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
}
|
||||
|
||||
// store proof content
|
||||
if dbErr = m.orm.UpdateProofByID(m.ctx, msg.ID, msg.Proof.Proof, msg.Proof.FinalPair, proofTimeSec); dbErr != nil {
|
||||
if dbErr = m.orm.UpdateProofByHash(m.ctx, msg.ID, msg.Proof.Proof, msg.Proof.FinalPair, proofTimeSec); dbErr != nil {
|
||||
log.Error("failed to store proof into db", "error", dbErr)
|
||||
return dbErr
|
||||
}
|
||||
if dbErr = m.orm.UpdateProvingStatus(msg.ID, orm.ProvingTaskProved); dbErr != nil {
|
||||
if dbErr = m.orm.UpdateProvingStatus(msg.ID, types.ProvingTaskProved); dbErr != nil {
|
||||
log.Error("failed to update task status as proved", "error", dbErr)
|
||||
return dbErr
|
||||
}
|
||||
|
||||
var err error
|
||||
tasks, err := m.orm.GetBlockBatches(map[string]interface{}{"id": msg.ID})
|
||||
tasks, err := m.orm.GetBlockBatches(map[string]interface{}{"hash": msg.ID})
|
||||
if len(tasks) == 0 {
|
||||
if err != nil {
|
||||
log.Error("failed to get tasks", "error", err)
|
||||
@@ -311,11 +311,11 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
}
|
||||
|
||||
if success {
|
||||
if dbErr = m.orm.UpdateProvingStatus(msg.ID, orm.ProvingTaskVerified); dbErr != nil {
|
||||
if dbErr = m.orm.UpdateProvingStatus(msg.ID, types.ProvingTaskVerified); dbErr != nil {
|
||||
log.Error(
|
||||
"failed to update proving_status",
|
||||
"msg.ID", msg.ID,
|
||||
"status", orm.ProvingTaskVerified,
|
||||
"status", types.ProvingTaskVerified,
|
||||
"error", dbErr)
|
||||
}
|
||||
return dbErr
|
||||
@@ -342,7 +342,7 @@ func (m *Manager) CollectProofs(sess *session) {
|
||||
// First, round up the keys that actually sent in a valid proof.
|
||||
var participatingRollers []string
|
||||
for pk, roller := range sess.info.Rollers {
|
||||
if roller.Status == orm.RollerProofValid {
|
||||
if roller.Status == types.RollerProofValid {
|
||||
participatingRollers = append(participatingRollers, pk)
|
||||
}
|
||||
}
|
||||
@@ -356,7 +356,7 @@ func (m *Manager) CollectProofs(sess *session) {
|
||||
// Note that this is only a workaround for testnet here.
|
||||
// TODO: In real cases we should reset to orm.ProvingTaskUnassigned
|
||||
// so as to re-distribute the task in the future
|
||||
if err := m.orm.UpdateProvingStatus(sess.info.ID, orm.ProvingTaskFailed); err != nil {
|
||||
if err := m.orm.UpdateProvingStatus(sess.info.ID, types.ProvingTaskFailed); err != nil {
|
||||
log.Error("fail to reset task_status as Unassigned", "id", sess.info.ID, "err", err)
|
||||
}
|
||||
return
|
||||
@@ -372,7 +372,7 @@ func (m *Manager) CollectProofs(sess *session) {
|
||||
m.mu.Lock()
|
||||
sess.info.Rollers[ret.pk].Status = ret.status
|
||||
if m.isSessionFailed(sess.info) {
|
||||
if err := m.orm.UpdateProvingStatus(ret.id, orm.ProvingTaskFailed); err != nil {
|
||||
if err := m.orm.UpdateProvingStatus(ret.id, types.ProvingTaskFailed); err != nil {
|
||||
log.Error("failed to update proving_status as failed", "msg.ID", ret.id, "error", err)
|
||||
}
|
||||
}
|
||||
@@ -384,9 +384,9 @@ func (m *Manager) CollectProofs(sess *session) {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) isSessionFailed(info *orm.SessionInfo) bool {
|
||||
func (m *Manager) isSessionFailed(info *types.SessionInfo) bool {
|
||||
for _, roller := range info.Rollers {
|
||||
if roller.Status != orm.RollerProofInvalid {
|
||||
if roller.Status != types.RollerProofInvalid {
|
||||
return false
|
||||
}
|
||||
}
|
||||
@@ -410,32 +410,32 @@ func (m *Manager) APIs() []rpc.API {
|
||||
}
|
||||
|
||||
// StartProofGenerationSession starts a proof generation session
|
||||
func (m *Manager) StartProofGenerationSession(task *orm.BlockBatch) (success bool) {
|
||||
func (m *Manager) StartProofGenerationSession(task *types.BlockBatch) (success bool) {
|
||||
if m.GetNumberOfIdleRollers() == 0 {
|
||||
log.Warn("no idle roller when starting proof generation session", "id", task.ID)
|
||||
log.Warn("no idle roller when starting proof generation session", "id", task.Hash)
|
||||
return false
|
||||
}
|
||||
|
||||
log.Info("start proof generation session", "id", task.ID)
|
||||
log.Info("start proof generation session", "id", task.Hash)
|
||||
defer func() {
|
||||
if !success {
|
||||
if err := m.orm.UpdateProvingStatus(task.ID, orm.ProvingTaskUnassigned); err != nil {
|
||||
log.Error("fail to reset task_status as Unassigned", "id", task.ID, "err", err)
|
||||
if err := m.orm.UpdateProvingStatus(task.Hash, types.ProvingTaskUnassigned); err != nil {
|
||||
log.Error("fail to reset task_status as Unassigned", "id", task.Hash, "err", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Get block traces.
|
||||
blockInfos, err := m.orm.GetBlockInfos(map[string]interface{}{"batch_id": task.ID})
|
||||
blockInfos, err := m.orm.GetL2BlockInfos(map[string]interface{}{"batch_hash": task.Hash})
|
||||
if err != nil {
|
||||
log.Error(
|
||||
"could not GetBlockInfos",
|
||||
"batch_id", task.ID,
|
||||
"batch_hash", task.Hash,
|
||||
"error", err,
|
||||
)
|
||||
return false
|
||||
}
|
||||
traces := make([]*types.BlockTrace, len(blockInfos))
|
||||
traces := make([]*geth_types.BlockTrace, len(blockInfos))
|
||||
for i, blockInfo := range blockInfos {
|
||||
traces[i], err = m.Client.GetBlockTraceByHash(m.ctx, common.HexToHash(blockInfo.Hash))
|
||||
if err != nil {
|
||||
@@ -450,37 +450,37 @@ func (m *Manager) StartProofGenerationSession(task *orm.BlockBatch) (success boo
|
||||
}
|
||||
|
||||
// Dispatch task to rollers.
|
||||
rollers := make(map[string]*orm.RollerStatus)
|
||||
rollers := make(map[string]*types.RollerStatus)
|
||||
for i := 0; i < int(m.cfg.RollersPerSession); i++ {
|
||||
roller := m.selectRoller()
|
||||
if roller == nil {
|
||||
log.Info("selectRoller returns nil")
|
||||
break
|
||||
}
|
||||
log.Info("roller is picked", "session id", task.ID, "name", roller.Name, "public key", roller.PublicKey)
|
||||
log.Info("roller is picked", "session id", task.Hash, "name", roller.Name, "public key", roller.PublicKey)
|
||||
// send trace to roller
|
||||
if !roller.sendTask(task.ID, traces) {
|
||||
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", task.ID)
|
||||
if !roller.sendTask(task.Hash, traces) {
|
||||
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", task.Hash)
|
||||
continue
|
||||
}
|
||||
rollers[roller.PublicKey] = &orm.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: orm.RollerAssigned}
|
||||
rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned}
|
||||
}
|
||||
// No roller assigned.
|
||||
if len(rollers) == 0 {
|
||||
log.Error("no roller assigned", "id", task.ID, "number of idle rollers", m.GetNumberOfIdleRollers())
|
||||
log.Error("no roller assigned", "id", task.Hash, "number of idle rollers", m.GetNumberOfIdleRollers())
|
||||
return false
|
||||
}
|
||||
|
||||
// Update session proving status as assigned.
|
||||
if err = m.orm.UpdateProvingStatus(task.ID, orm.ProvingTaskAssigned); err != nil {
|
||||
log.Error("failed to update task status", "id", task.ID, "err", err)
|
||||
if err = m.orm.UpdateProvingStatus(task.Hash, types.ProvingTaskAssigned); err != nil {
|
||||
log.Error("failed to update task status", "id", task.Hash, "err", err)
|
||||
return false
|
||||
}
|
||||
|
||||
// Create a proof generation session.
|
||||
sess := &session{
|
||||
info: &orm.SessionInfo{
|
||||
ID: task.ID,
|
||||
info: &types.SessionInfo{
|
||||
ID: task.Hash,
|
||||
Rollers: rollers,
|
||||
StartTimestamp: time.Now().Unix(),
|
||||
},
|
||||
@@ -502,7 +502,7 @@ func (m *Manager) StartProofGenerationSession(task *orm.BlockBatch) (success boo
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
m.sessions[task.ID] = sess
|
||||
m.sessions[task.Hash] = sess
|
||||
m.mu.Unlock()
|
||||
go m.CollectProofs(sess)
|
||||
|
||||
@@ -517,7 +517,7 @@ func (m *Manager) IsRollerIdle(hexPk string) bool {
|
||||
// timeout. So a busy roller could be marked as idle in a finished session.
|
||||
for _, sess := range m.sessions {
|
||||
for pk, roller := range sess.info.Rollers {
|
||||
if pk == hexPk && roller.Status == orm.RollerAssigned {
|
||||
if pk == hexPk && roller.Status == types.RollerAssigned {
|
||||
return false
|
||||
}
|
||||
}
|
||||
@@ -527,7 +527,7 @@ func (m *Manager) IsRollerIdle(hexPk string) bool {
|
||||
}
|
||||
|
||||
func (m *Manager) addFailedSession(sess *session, errMsg string) {
|
||||
m.failedSessionInfos[sess.info.ID] = newSessionInfo(sess, orm.ProvingTaskFailed, errMsg, true)
|
||||
m.failedSessionInfos[sess.info.ID] = newSessionInfo(sess, types.ProvingTaskFailed, errMsg, true)
|
||||
}
|
||||
|
||||
// VerifyToken verifies pukey for token and expiration time
|
||||
|
||||
@@ -5,9 +5,11 @@ import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/rand"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -15,20 +17,20 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum"
|
||||
|
||||
geth_types "github.com/scroll-tech/go-ethereum/core/types"
|
||||
"github.com/scroll-tech/go-ethereum/crypto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"scroll-tech/database"
|
||||
"scroll-tech/database/migrate"
|
||||
"scroll-tech/database/orm"
|
||||
|
||||
"scroll-tech/coordinator"
|
||||
client2 "scroll-tech/coordinator/client"
|
||||
|
||||
"scroll-tech/common/docker"
|
||||
"scroll-tech/common/message"
|
||||
"scroll-tech/common/types"
|
||||
"scroll-tech/common/utils"
|
||||
|
||||
bridge_config "scroll-tech/bridge/config"
|
||||
@@ -39,6 +41,8 @@ import (
|
||||
var (
|
||||
cfg *bridge_config.Config
|
||||
dbImg docker.ImgInstance
|
||||
|
||||
batchData *types.BatchData
|
||||
)
|
||||
|
||||
func randomURL() string {
|
||||
@@ -55,6 +59,22 @@ func setEnv(t *testing.T) (err error) {
|
||||
dbImg = docker.NewTestDBDocker(t, cfg.DBConfig.DriverName)
|
||||
cfg.DBConfig.DSN = dbImg.Endpoint()
|
||||
|
||||
templateBlockTrace, err := os.ReadFile("../common/testdata/blockTrace_02.json")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// unmarshal blockTrace
|
||||
blockTrace := &geth_types.BlockTrace{}
|
||||
if err = json.Unmarshal(templateBlockTrace, blockTrace); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
parentBatch := &types.BlockBatch{
|
||||
Index: 1,
|
||||
Hash: "0x0000000000000000000000000000000000000000",
|
||||
}
|
||||
batchData = types.NewBatchData(parentBatch, []*geth_types.BlockTrace{blockTrace}, nil)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -218,6 +238,7 @@ func testSeveralConnections(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testValidProof(t *testing.T) {
|
||||
// Create db handler and reset db.
|
||||
l2db, err := database.NewOrmFactory(cfg.DBConfig)
|
||||
@@ -248,13 +269,12 @@ func testValidProof(t *testing.T) {
|
||||
}()
|
||||
assert.Equal(t, 3, rollerManager.GetNumberOfIdleRollers())
|
||||
|
||||
var ids = make([]string, 1)
|
||||
var hashes = make([]string, 1)
|
||||
dbTx, err := l2db.Beginx()
|
||||
assert.NoError(t, err)
|
||||
for i := range ids {
|
||||
ID, err := l2db.NewBatchInDBTx(dbTx, &orm.BlockInfo{Number: uint64(i)}, &orm.BlockInfo{Number: uint64(i)}, "0f", 1, 194676)
|
||||
assert.NoError(t, err)
|
||||
ids[i] = ID
|
||||
for i := range hashes {
|
||||
assert.NoError(t, l2db.NewBatchInDBTx(dbTx, batchData))
|
||||
hashes[i] = batchData.Hash().Hex()
|
||||
}
|
||||
assert.NoError(t, dbTx.Commit())
|
||||
|
||||
@@ -263,13 +283,13 @@ func testValidProof(t *testing.T) {
|
||||
tick = time.Tick(500 * time.Millisecond)
|
||||
tickStop = time.Tick(10 * time.Second)
|
||||
)
|
||||
for len(ids) > 0 {
|
||||
for len(hashes) > 0 {
|
||||
select {
|
||||
case <-tick:
|
||||
status, err := l2db.GetProvingStatusByID(ids[0])
|
||||
status, err := l2db.GetProvingStatusByHash(hashes[0])
|
||||
assert.NoError(t, err)
|
||||
if status == orm.ProvingTaskVerified {
|
||||
ids = ids[1:]
|
||||
if status == types.ProvingTaskVerified {
|
||||
hashes = hashes[1:]
|
||||
}
|
||||
case <-tickStop:
|
||||
t.Error("failed to check proof status")
|
||||
@@ -307,13 +327,12 @@ func testInvalidProof(t *testing.T) {
|
||||
}()
|
||||
assert.Equal(t, 3, rollerManager.GetNumberOfIdleRollers())
|
||||
|
||||
var ids = make([]string, 1)
|
||||
var hashes = make([]string, 1)
|
||||
dbTx, err := l2db.Beginx()
|
||||
assert.NoError(t, err)
|
||||
for i := range ids {
|
||||
ID, err := l2db.NewBatchInDBTx(dbTx, &orm.BlockInfo{Number: uint64(i)}, &orm.BlockInfo{Number: uint64(i)}, "0f", 1, 194676)
|
||||
assert.NoError(t, err)
|
||||
ids[i] = ID
|
||||
for i := range hashes {
|
||||
assert.NoError(t, l2db.NewBatchInDBTx(dbTx, batchData))
|
||||
hashes[i] = batchData.Hash().Hex()
|
||||
}
|
||||
assert.NoError(t, dbTx.Commit())
|
||||
|
||||
@@ -322,13 +341,13 @@ func testInvalidProof(t *testing.T) {
|
||||
tick = time.Tick(500 * time.Millisecond)
|
||||
tickStop = time.Tick(10 * time.Second)
|
||||
)
|
||||
for len(ids) > 0 {
|
||||
for len(hashes) > 0 {
|
||||
select {
|
||||
case <-tick:
|
||||
status, err := l2db.GetProvingStatusByID(ids[0])
|
||||
status, err := l2db.GetProvingStatusByHash(hashes[0])
|
||||
assert.NoError(t, err)
|
||||
if status == orm.ProvingTaskFailed {
|
||||
ids = ids[1:]
|
||||
if status == types.ProvingTaskFailed {
|
||||
hashes = hashes[1:]
|
||||
}
|
||||
case <-tickStop:
|
||||
t.Error("failed to check proof status")
|
||||
@@ -367,13 +386,12 @@ func testIdleRollerSelection(t *testing.T) {
|
||||
|
||||
assert.Equal(t, len(rollers), rollerManager.GetNumberOfIdleRollers())
|
||||
|
||||
var ids = make([]string, 2)
|
||||
var hashes = make([]string, 1)
|
||||
dbTx, err := l2db.Beginx()
|
||||
assert.NoError(t, err)
|
||||
for i := range ids {
|
||||
ID, err := l2db.NewBatchInDBTx(dbTx, &orm.BlockInfo{Number: uint64(i)}, &orm.BlockInfo{Number: uint64(i)}, "0f", 1, 194676)
|
||||
assert.NoError(t, err)
|
||||
ids[i] = ID
|
||||
for i := range hashes {
|
||||
assert.NoError(t, l2db.NewBatchInDBTx(dbTx, batchData))
|
||||
hashes[i] = batchData.Hash().Hex()
|
||||
}
|
||||
assert.NoError(t, dbTx.Commit())
|
||||
|
||||
@@ -382,13 +400,13 @@ func testIdleRollerSelection(t *testing.T) {
|
||||
tick = time.Tick(500 * time.Millisecond)
|
||||
tickStop = time.Tick(10 * time.Second)
|
||||
)
|
||||
for len(ids) > 0 {
|
||||
for len(hashes) > 0 {
|
||||
select {
|
||||
case <-tick:
|
||||
status, err := l2db.GetProvingStatusByID(ids[0])
|
||||
status, err := l2db.GetProvingStatusByHash(hashes[0])
|
||||
assert.NoError(t, err)
|
||||
if status == orm.ProvingTaskVerified {
|
||||
ids = ids[1:]
|
||||
if status == types.ProvingTaskVerified {
|
||||
hashes = hashes[1:]
|
||||
}
|
||||
case <-tickStop:
|
||||
t.Error("failed to check proof status")
|
||||
@@ -404,12 +422,12 @@ func testGracefulRestart(t *testing.T) {
|
||||
assert.NoError(t, migrate.ResetDB(l2db.GetDB().DB))
|
||||
defer l2db.Close()
|
||||
|
||||
var ids = make([]string, 1)
|
||||
var hashes = make([]string, 1)
|
||||
dbTx, err := l2db.Beginx()
|
||||
assert.NoError(t, err)
|
||||
for i := range ids {
|
||||
ids[i], err = l2db.NewBatchInDBTx(dbTx, &orm.BlockInfo{Number: uint64(i)}, &orm.BlockInfo{Number: uint64(i)}, "0f", 1, 194676)
|
||||
assert.NoError(t, err)
|
||||
for i := range hashes {
|
||||
assert.NoError(t, l2db.NewBatchInDBTx(dbTx, batchData))
|
||||
hashes[i] = batchData.Hash().Hex()
|
||||
}
|
||||
assert.NoError(t, dbTx.Commit())
|
||||
|
||||
@@ -438,15 +456,15 @@ func testGracefulRestart(t *testing.T) {
|
||||
newRollerManager.Stop()
|
||||
}()
|
||||
|
||||
for i := range ids {
|
||||
info, err := newRollerManager.GetSessionInfo(ids[i])
|
||||
assert.Equal(t, orm.ProvingTaskAssigned.String(), info.Status)
|
||||
for i := range hashes {
|
||||
info, err := newRollerManager.GetSessionInfo(hashes[i])
|
||||
assert.Equal(t, types.ProvingTaskAssigned.String(), info.Status)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// at this point, roller haven't submitted
|
||||
status, err := l2db.GetProvingStatusByID(ids[i])
|
||||
status, err := l2db.GetProvingStatusByHash(hashes[i])
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, orm.ProvingTaskAssigned, status)
|
||||
assert.Equal(t, types.ProvingTaskAssigned, status)
|
||||
}
|
||||
|
||||
// will overwrite the roller client for `SubmitProof`
|
||||
@@ -458,15 +476,15 @@ func testGracefulRestart(t *testing.T) {
|
||||
tick = time.Tick(500 * time.Millisecond)
|
||||
tickStop = time.Tick(15 * time.Second)
|
||||
)
|
||||
for len(ids) > 0 {
|
||||
for len(hashes) > 0 {
|
||||
select {
|
||||
case <-tick:
|
||||
// this proves that the roller submits to the new coordinator,
|
||||
// because the roller client for `submitProof` has been overwritten
|
||||
status, err := l2db.GetProvingStatusByID(ids[0])
|
||||
status, err := l2db.GetProvingStatusByHash(hashes[0])
|
||||
assert.NoError(t, err)
|
||||
if status == orm.ProvingTaskVerified {
|
||||
ids = ids[1:]
|
||||
if status == types.ProvingTaskVerified {
|
||||
hashes = hashes[1:]
|
||||
}
|
||||
|
||||
case <-tickStop:
|
||||
|
||||
@@ -7,12 +7,11 @@ import (
|
||||
"time"
|
||||
|
||||
cmap "github.com/orcaman/concurrent-map"
|
||||
"github.com/scroll-tech/go-ethereum/core/types"
|
||||
geth_types "github.com/scroll-tech/go-ethereum/core/types"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
|
||||
"scroll-tech/common/message"
|
||||
|
||||
"scroll-tech/database/orm"
|
||||
"scroll-tech/common/types"
|
||||
)
|
||||
|
||||
// rollerNode records roller status and send task to connected roller.
|
||||
@@ -33,7 +32,7 @@ type rollerNode struct {
|
||||
registerTime time.Time
|
||||
}
|
||||
|
||||
func (r *rollerNode) sendTask(id string, traces []*types.BlockTrace) bool {
|
||||
func (r *rollerNode) sendTask(id string, traces []*geth_types.BlockTrace) bool {
|
||||
select {
|
||||
case r.taskChan <- &message.TaskMsg{
|
||||
ID: id,
|
||||
@@ -53,7 +52,7 @@ func (m *Manager) reloadRollerAssignedTasks(pubkey string) *cmap.ConcurrentMap {
|
||||
taskIDs := cmap.New()
|
||||
for id, sess := range m.sessions {
|
||||
for pk, roller := range sess.info.Rollers {
|
||||
if pk == pubkey && roller.Status == orm.RollerAssigned {
|
||||
if pk == pubkey && roller.Status == types.RollerAssigned {
|
||||
taskIDs.Set(id, struct{}{})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user