Merge branch 'staging' into fix/upgrade_blockheigth_type

This commit is contained in:
Nazarii Denha
2023-01-13 12:09:05 +01:00
committed by GitHub
21 changed files with 224 additions and 212 deletions

3
.github/pull_request_template.md vendored Normal file
View File

@@ -0,0 +1,3 @@
1. Purpose or design rationale of this PR
2. Does this PR involve a new deployment, and involve a new git tag & docker image tag? If so, has `tag` in `common/version.go` been updated?
3. Is this PR a breaking change? If so, have it been attached a `breaking-change` label?

View File

@@ -15,5 +15,5 @@ func TestRunBridge(t *testing.T) {
// wait result
bridge.ExpectWithTimeout(true, time.Second*3, fmt.Sprintf("bridge version %s", version.Version))
bridge.RunApp(false)
bridge.RunApp(nil)
}

View File

@@ -179,6 +179,13 @@ linters:
- depguard
- gocyclo
- unparam
- exportloopref
- sqlclosecheck
- rowserrcheck
- durationcheck
- bidichk
- typecheck
- unused
enable-all: false
disable:
@@ -200,16 +207,7 @@ issues:
# Exclude some linters from running on tests files.
- path: _test\.go
linters:
- gocyclo
- errcheck
- dupl
- gosec
# Exclude known linters from partially hard-vendored code,
# which is impossible to exclude via "nolint" comments.
- path: internal/hmac/
text: "weak cryptographic primitive"
linters:
- gosec
# Exclude some staticcheck messages
@@ -217,18 +215,6 @@ issues:
- staticcheck
text: "SA9003:"
- linters:
- golint
text: "package comment should be of the form"
- linters:
- golint
text: "don't use ALL_CAPS in Go names;"
- linters:
- golint
text: "don't use underscores in Go names;"
# Exclude lll issues for long lines with go:generate
- linters:
- lll

View File

@@ -1,17 +1,13 @@
package cmd
import (
"fmt"
"os"
"os/exec"
"strings"
"sync"
"testing"
"time"
"github.com/docker/docker/pkg/reexec"
cmap "github.com/orcaman/concurrent-map"
"github.com/stretchr/testify/assert"
)
var verbose bool
@@ -51,48 +47,6 @@ func NewCmd(t *testing.T, name string, args ...string) *Cmd {
}
}
// RunApp exec's the current binary using name as argv[0] which will trigger the
// reexec init function for that name (e.g. "geth-test" in cmd/geth/run_test.go)
func (t *Cmd) RunApp(parallel bool) {
t.Log("cmd: ", append([]string{t.name}, t.args...))
cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{t.name}, t.args...),
Stderr: t,
Stdout: t,
}
if parallel {
go func() {
_ = cmd.Run()
}()
} else {
_ = cmd.Run()
}
t.mu.Lock()
t.cmd = cmd
t.mu.Unlock()
}
// WaitExit wait util process exit.
func (t *Cmd) WaitExit() {
// Wait all the check funcs are finished or test status is failed.
for !(t.Failed() || t.checkFuncs.IsEmpty()) {
<-time.After(time.Millisecond * 500)
}
// Send interrupt signal.
t.mu.Lock()
_ = t.cmd.Process.Signal(os.Interrupt)
t.mu.Unlock()
}
// Interrupt send interrupt signal.
func (t *Cmd) Interrupt() {
t.mu.Lock()
t.Err = t.cmd.Process.Signal(os.Interrupt)
t.mu.Unlock()
}
// RegistFunc register check func
func (t *Cmd) RegistFunc(key string, check checkFunc) {
t.checkFuncs.Set(key, check)
@@ -103,39 +57,6 @@ func (t *Cmd) UnRegistFunc(key string) {
t.checkFuncs.Pop(key)
}
// ExpectWithTimeout wait result during timeout time.
func (t *Cmd) ExpectWithTimeout(parallel bool, timeout time.Duration, keyword string) {
if keyword == "" {
return
}
okCh := make(chan struct{}, 1)
t.RegistFunc(keyword, func(buf string) {
if strings.Contains(buf, keyword) {
select {
case okCh <- struct{}{}:
default:
return
}
}
})
waitResult := func() {
defer t.UnRegistFunc(keyword)
select {
case <-okCh:
return
case <-time.After(timeout):
assert.Fail(t, fmt.Sprintf("didn't get the desired result before timeout, keyword: %s", keyword))
}
}
if parallel {
go waitResult()
} else {
waitResult()
}
}
func (t *Cmd) runCmd() {
cmd := exec.Command(t.args[0], t.args[1:]...) //nolint:gosec
cmd.Stdout = t

114
common/cmd/cmd_app.go Normal file
View File

@@ -0,0 +1,114 @@
package cmd
import (
"fmt"
"os"
"os/exec"
"strings"
"time"
"github.com/docker/docker/pkg/reexec"
"github.com/stretchr/testify/assert"
)
// RunApp exec's the current binary using name as argv[0] which will trigger the
// reexec init function for that name (e.g. "geth-test" in cmd/geth/run_test.go)
func (t *Cmd) RunApp(waitResult func() bool) {
t.Log("cmd: ", append([]string{t.name}, t.args...))
cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{t.name}, t.args...),
Stderr: t,
Stdout: t,
}
if waitResult != nil {
go func() {
_ = cmd.Run()
}()
waitResult()
} else {
_ = cmd.Run()
}
t.mu.Lock()
t.cmd = cmd
t.mu.Unlock()
}
// WaitExit wait util process exit.
func (t *Cmd) WaitExit() {
// Wait all the check funcs are finished or test status is failed.
for !(t.Failed() || t.checkFuncs.IsEmpty()) {
<-time.After(time.Millisecond * 500)
}
// Send interrupt signal.
t.mu.Lock()
_ = t.cmd.Process.Signal(os.Interrupt)
t.mu.Unlock()
}
// Interrupt send interrupt signal.
func (t *Cmd) Interrupt() {
t.mu.Lock()
t.Err = t.cmd.Process.Signal(os.Interrupt)
t.mu.Unlock()
}
// WaitResult return true when get the keyword during timeout.
func (t *Cmd) WaitResult(timeout time.Duration, keyword string) bool {
if keyword == "" {
return false
}
okCh := make(chan struct{}, 1)
t.RegistFunc(keyword, func(buf string) {
if strings.Contains(buf, keyword) {
select {
case okCh <- struct{}{}:
default:
return
}
}
})
defer t.UnRegistFunc(keyword)
select {
case <-okCh:
return true
case <-time.After(timeout):
assert.Fail(t, fmt.Sprintf("didn't get the desired result before timeout, keyword: %s", keyword))
}
return false
}
// ExpectWithTimeout wait result during timeout time.
func (t *Cmd) ExpectWithTimeout(parallel bool, timeout time.Duration, keyword string) {
if keyword == "" {
return
}
okCh := make(chan struct{}, 1)
t.RegistFunc(keyword, func(buf string) {
if strings.Contains(buf, keyword) {
select {
case okCh <- struct{}{}:
default:
return
}
}
})
waitResult := func() {
defer t.UnRegistFunc(keyword)
select {
case <-okCh:
return
case <-time.After(timeout):
assert.Fail(t, fmt.Sprintf("didn't get the desired result before timeout, keyword: %s", keyword))
}
}
if parallel {
go waitResult()
} else {
waitResult()
}
}

View File

@@ -12,7 +12,6 @@ require (
github.com/scroll-tech/go-ethereum v1.10.14-0.20221221073256-5ca70bf3a257
github.com/stretchr/testify v1.8.0
github.com/urfave/cli/v2 v2.10.2
golang.org/x/sync v0.1.0
)
require (
@@ -81,6 +80,7 @@ require (
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/net v0.3.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/text v0.5.0 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect

View File

@@ -4,12 +4,12 @@ import (
"crypto/ecdsa"
"crypto/rand"
"encoding/hex"
"encoding/json"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/common/hexutil"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/crypto"
"github.com/scroll-tech/go-ethereum/rlp"
)
// RespStatus represents status code from roller to scroll
@@ -36,11 +36,11 @@ type AuthMsg struct {
type Identity struct {
// Roller name
Name string `json:"name"`
// Time of message creation
Timestamp int64 `json:"timestamp"`
// Unverified Unix timestamp of message creation
Timestamp uint32 `json:"timestamp"`
// Roller public key
PublicKey string `json:"publicKey"`
// Version is common.Version+ZK_VERSION. Use the following to check the latest ZK_VERSION version.
// Version is common.Version+ZkVersion. Use the following to check the latest ZkVersion version.
// curl -sL https://api.github.com/repos/scroll-tech/scroll-zkevm/commits | jq -r ".[0].sha"
Version string `json:"version"`
// Random unique token generated by manager
@@ -115,12 +115,11 @@ func (a *AuthMsg) PublicKey() (string, error) {
// Hash returns the hash of the auth message, which should be the message used
// to construct the Signature.
func (i *Identity) Hash() ([]byte, error) {
bs, err := json.Marshal(i)
byt, err := rlp.EncodeToBytes(i)
if err != nil {
return nil, err
}
hash := crypto.Keccak256Hash(bs)
hash := crypto.Keccak256Hash(byt)
return hash[:], nil
}
@@ -204,12 +203,12 @@ type ProofDetail struct {
// Hash return proofMsg content hash.
func (z *ProofDetail) Hash() ([]byte, error) {
bs, err := json.Marshal(z)
byt, err := rlp.EncodeToBytes(z)
if err != nil {
return nil, err
}
hash := crypto.Keccak256Hash(bs)
hash := crypto.Keccak256Hash(byt)
return hash[:], nil
}

View File

@@ -16,7 +16,7 @@ func TestAuthMessageSignAndVerify(t *testing.T) {
authMsg := &AuthMsg{
Identity: &Identity{
Name: "testRoller",
Timestamp: time.Now().UnixNano(),
Timestamp: uint32(time.Now().Unix()),
},
}
assert.NoError(t, authMsg.Sign(privkey))

View File

@@ -1,30 +1,10 @@
package utils
import (
"sync/atomic"
"github.com/scroll-tech/go-ethereum/core/types"
"golang.org/x/sync/errgroup"
)
// ComputeTraceGasCost computes gascost based on ExecutionResults.StructLogs.GasCost
func ComputeTraceGasCost(trace *types.BlockTrace) uint64 {
var (
gasCost uint64
eg errgroup.Group
)
for idx := range trace.ExecutionResults {
i := idx
eg.Go(func() error {
var sum uint64
for _, log := range trace.ExecutionResults[i].StructLogs {
sum += log.GasCost
}
atomic.AddUint64(&gasCost, sum)
return nil
})
}
_ = eg.Wait()
return gasCost
return trace.Header.GasUsed
}

View File

@@ -19,13 +19,7 @@ func TestComputeTraceCost(t *testing.T) {
blockTrace := &types.BlockTrace{}
err = json.Unmarshal(templateBlockTrace, blockTrace)
assert.NoError(t, err)
var sum uint64
for _, v := range blockTrace.ExecutionResults {
for _, sv := range v.StructLogs {
sum += sv.GasCost
}
}
res := utils.ComputeTraceGasCost(blockTrace)
assert.Equal(t, sum, res)
var expected = blockTrace.Header.GasUsed
got := utils.ComputeTraceGasCost(blockTrace)
assert.Equal(t, expected, got)
}

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "prealpha-v9.3"
var tag = "prealpha-v10.2"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {
@@ -22,8 +22,8 @@ var commit = func() string {
return ""
}()
// ZK_VERSION is commit-id of common/libzkp/impl/cargo.lock/scroll-zkevm
var ZK_VERSION string
// ZkVersion is commit-id of common/libzkp/impl/cargo.lock/scroll-zkevm
var ZkVersion string
// Version denote the version of scroll protocol, including the l2geth, relayer, coordinator, roller, contracts and etc.
var Version = fmt.Sprintf("%s-%s-%s", tag, commit, ZK_VERSION)
var Version = fmt.Sprintf("%s-%s-%s", tag, commit, ZkVersion)

View File

@@ -18,7 +18,7 @@ libzkp:
cp -r ../common/libzkp/interface ./verifier/lib
coordinator: libzkp ## Builds the Coordinator instance.
go build -ldflags "-X scroll-tech/common/version.ZK_VERSION=${ZK_VERSION}" -o $(PWD)/build/bin/coordinator ./cmd
go build -ldflags "-X scroll-tech/common/version.ZkVersion=${ZK_VERSION}" -o $(PWD)/build/bin/coordinator ./cmd
test-verifier: libzkp
go test -tags ffi -timeout 0 -v ./verifier

View File

@@ -15,5 +15,5 @@ func TestRunCoordinator(t *testing.T) {
// wait result
coordinator.ExpectWithTimeout(true, time.Second*3, fmt.Sprintf("coordinator version %s", version.Version))
coordinator.RunApp(false)
coordinator.RunApp(nil)
}

View File

@@ -198,7 +198,12 @@ func (m *Manager) restorePrevSessions() {
log.Info("Coordinator restart reload sessions", "session start time", time.Unix(sess.info.StartTimestamp, 0))
for _, roller := range sess.info.Rollers {
log.Info("restore roller info for session", "session id", sess.info.ID, "roller name", roller.Name, "public key", roller.PublicKey, "proof status", roller.Status)
log.Info(
"restore roller info for session",
"session id", sess.info.ID,
"roller name", roller.Name,
"public key", roller.PublicKey,
"proof status", roller.Status)
}
go m.CollectProofs(sess)
@@ -274,11 +279,6 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
"roller pk", roller.PublicKey,
"error", msg.Error,
)
if dbErr = m.orm.UpdateProvingStatus(msg.ID, orm.ProvingTaskFailed); dbErr != nil {
log.Error("failed to update task status as failed", "error", dbErr)
}
// record the failed session.
m.addFailedSession(sess, msg.Error)
return nil
}
@@ -303,8 +303,6 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
success, err = m.verifier.VerifyProof(msg.Proof)
if err != nil {
// record failed session.
m.addFailedSession(sess, err.Error())
// TODO: this is only a temp workaround for testnet, we should return err in real cases
success = false
log.Error("Failed to verify zk proof", "proof id", msg.ID, "error", err)
@@ -313,21 +311,17 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
log.Info("Verify zk proof successfully", "verification result", success, "proof id", msg.ID)
}
var status orm.ProvingStatus
if success {
status = orm.ProvingTaskVerified
} else {
// Set status as skipped if verification fails.
// Note that this is only a workaround for testnet here.
// TODO: In real cases we should reset to orm.ProvingTaskUnassigned
// so as to re-distribute the task in the future
status = orm.ProvingTaskFailed
if dbErr = m.orm.UpdateProvingStatus(msg.ID, orm.ProvingTaskVerified); dbErr != nil {
log.Error(
"failed to update proving_status",
"msg.ID", msg.ID,
"status", orm.ProvingTaskVerified,
"error", dbErr)
}
return dbErr
}
if dbErr = m.orm.UpdateProvingStatus(msg.ID, status); dbErr != nil {
log.Error("failed to update proving_status", "msg.ID", msg.ID, "status", status, "error", dbErr)
}
return dbErr
return nil
}
// CollectProofs collects proofs corresponding to a proof generation session.
@@ -398,12 +392,12 @@ func (m *Manager) APIs() []rpc.API {
// StartProofGenerationSession starts a proof generation session
func (m *Manager) StartProofGenerationSession(task *orm.BlockBatch) (success bool) {
roller := m.selectRoller()
if roller == nil {
if m.GetNumberOfIdleRollers() == 0 {
log.Warn("no idle roller when starting proof generation session", "id", task.ID)
return false
}
log.Info("start proof generation session", "id", task.ID)
log.Info("start proof generation session", "id", task.ID)
defer func() {
if !success {
if err := m.orm.UpdateProvingStatus(task.ID, orm.ProvingTaskUnassigned); err != nil {
@@ -411,11 +405,8 @@ func (m *Manager) StartProofGenerationSession(task *orm.BlockBatch) (success boo
}
}
}()
if err := m.orm.UpdateProvingStatus(task.ID, orm.ProvingTaskAssigned); err != nil {
log.Error("failed to update task status", "id", task.ID, "err", err)
return false
}
// Get block traces.
blockInfos, err := m.orm.GetBlockInfos(map[string]interface{}{"batch_id": task.ID})
if err != nil {
log.Error(
@@ -425,7 +416,6 @@ func (m *Manager) StartProofGenerationSession(task *orm.BlockBatch) (success boo
)
return false
}
traces := make([]*types.BlockTrace, len(blockInfos))
for i, blockInfo := range blockInfos {
traces[i], err = m.Client.GetBlockTraceByHash(m.ctx, common.HexToHash(blockInfo.Hash))
@@ -440,41 +430,61 @@ func (m *Manager) StartProofGenerationSession(task *orm.BlockBatch) (success boo
}
}
log.Info("roller is picked", "session id", task.ID, "name", roller.Name, "public key", roller.PublicKey)
// send trace to roller
if !roller.sendTask(task.ID, traces) {
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", task.ID)
// Dispatch task to rollers.
rollers := make(map[string]*orm.RollerStatus)
for i := 0; i < int(m.cfg.RollersPerSession); i++ {
roller := m.selectRoller()
if roller == nil {
break
}
log.Info("roller is picked", "session id", task.ID, "name", roller.Name, "public key", roller.PublicKey)
// send trace to roller
if !roller.sendTask(task.ID, traces) {
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", task.ID)
continue
}
rollers[roller.PublicKey] = &orm.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: orm.RollerAssigned}
}
// No roller assigned.
if len(rollers) == 0 {
log.Error("no roller assigned", "id", task.ID, "number of idle rollers", m.GetNumberOfIdleRollers())
return false
}
// Update session proving status as assigned.
if err := m.orm.UpdateProvingStatus(task.ID, orm.ProvingTaskAssigned); err != nil {
log.Error("failed to update task status", "id", task.ID, "err", err)
return false
}
pk := roller.PublicKey
// Create a proof generation session.
s := &session{
sess := &session{
info: &orm.SessionInfo{
ID: task.ID,
Rollers: map[string]*orm.RollerStatus{
pk: {
PublicKey: pk,
Name: roller.Name,
Status: orm.RollerAssigned,
},
},
ID: task.ID,
Rollers: rollers,
StartTimestamp: time.Now().Unix(),
},
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
// Store session info.
if err = m.orm.SetSessionInfo(s.info); err != nil {
log.Error("db set session info fail", "roller name", roller.Name, "public key", pk, "error", err)
if err = m.orm.SetSessionInfo(sess.info); err != nil {
log.Error("db set session info fail", "error", err)
for _, roller := range sess.info.Rollers {
log.Error(
"restore roller info for session",
"session id", sess.info.ID,
"roller name", roller.Name,
"public key", roller.PublicKey,
"proof status", roller.Status)
}
return false
}
m.mu.Lock()
m.sessions[task.ID] = s
m.sessions[task.ID] = sess
m.mu.Unlock()
go m.CollectProofs(s)
go m.CollectProofs(sess)
return true
}

View File

@@ -129,7 +129,7 @@ func testFailedHandshake(t *testing.T) {
authMsg := &message.AuthMsg{
Identity: &message.Identity{
Name: name,
Timestamp: time.Now().UnixNano(),
Timestamp: uint32(time.Now().Unix()),
},
}
assert.NoError(t, authMsg.Sign(privkey))
@@ -147,7 +147,7 @@ func testFailedHandshake(t *testing.T) {
authMsg = &message.AuthMsg{
Identity: &message.Identity{
Name: name,
Timestamp: time.Now().UnixNano(),
Timestamp: uint32(time.Now().Unix()),
},
}
assert.NoError(t, authMsg.Sign(privkey))
@@ -421,7 +421,7 @@ func (r *mockRoller) connectToCoordinator() (*client2.Client, ethereum.Subscript
authMsg := &message.AuthMsg{
Identity: &message.Identity{
Name: r.rollerName,
Timestamp: time.Now().UnixNano(),
Timestamp: uint32(time.Now().Unix()),
},
}
_ = authMsg.Sign(r.privKey)

View File

@@ -15,5 +15,5 @@ func TestRunDatabase(t *testing.T) {
// wait result
bridge.ExpectWithTimeout(true, time.Second*3, fmt.Sprintf("db_cli version %s", version.Version))
bridge.RunApp(false)
bridge.RunApp(nil)
}

View File

@@ -14,10 +14,10 @@ libzkp:
cp -r ../common/libzkp/interface ./prover/lib
roller: libzkp ## Build the Roller instance.
GOBIN=$(PWD)/build/bin go build -ldflags "-X scroll-tech/common/version.ZK_VERSION=${ZK_VERSION}" -o $(PWD)/build/bin/roller ./cmd
GOBIN=$(PWD)/build/bin go build -ldflags "-X scroll-tech/common/version.ZkVersion=${ZK_VERSION}" -o $(PWD)/build/bin/roller ./cmd
gpu-roller: libzkp ## Build the GPU Roller instance.
GOBIN=$(PWD)/build/bin go build -ldflags "-X scroll-tech/common/version.ZK_VERSION=${ZK_VERSION}" -tags gpu -o $(PWD)/build/bin/roller ./cmd
GOBIN=$(PWD)/build/bin go build -ldflags "-X scroll-tech/common/version.ZkVersion=${ZK_VERSION}" -tags gpu -o $(PWD)/build/bin/roller ./cmd
mock_roller:
GOBIN=$(PWD)/build/bin go build -tags mock_prover -o $(PWD)/build/bin/roller $(PWD)/cmd

View File

@@ -15,5 +15,5 @@ func TestRunRoller(t *testing.T) {
// wait result
roller.ExpectWithTimeout(true, time.Second*3, fmt.Sprintf("roller version %s", version.Version))
roller.RunApp(false)
roller.RunApp(nil)
}

View File

@@ -5,6 +5,7 @@ import (
"crypto/ecdsa"
"errors"
"fmt"
"math"
"sort"
"sync/atomic"
"time"
@@ -104,10 +105,16 @@ func (r *Roller) Start() {
// Register registers Roller to the coordinator through Websocket.
func (r *Roller) Register() error {
timestamp := time.Now().Unix()
if timestamp < 0 || timestamp > math.MaxUint32 {
panic("Expected current time to be between the years 1970 and 2106")
}
authMsg := &message.AuthMsg{
Identity: &message.Identity{
Name: r.cfg.RollerName,
Timestamp: time.Now().UnixMilli(),
Timestamp: uint32(timestamp),
PublicKey: r.PublicKey(),
Version: version.Version,
},

View File

@@ -80,7 +80,8 @@ func free(t *testing.T) {
}
type appAPI interface {
RunApp(parallel bool)
WaitResult(timeout time.Duration, keyword string) bool
RunApp(waitResult func() bool)
WaitExit()
ExpectWithTimeout(parallel bool, timeout time.Duration, keyword string)
}
@@ -103,7 +104,7 @@ func runDBCliApp(t *testing.T, option, keyword string) {
// Wait expect result.
app.ExpectWithTimeout(true, time.Second*3, keyword)
app.RunApp(false)
app.RunApp(nil)
}
func runRollerApp(t *testing.T, args ...string) appAPI {

View File

@@ -28,19 +28,16 @@ func testStartProcess(t *testing.T) {
// Start bridge process.
bridgeCmd := runBridgeApp(t)
bridgeCmd.ExpectWithTimeout(true, time.Second*20, "Start bridge successfully")
bridgeCmd.RunApp(true)
bridgeCmd.RunApp(func() bool { return bridgeCmd.WaitResult(time.Second*20, "Start bridge successfully") })
// Start coordinator process.
coordinatorCmd := runCoordinatorApp(t, "--ws", "--ws.port", "8391")
coordinatorCmd.ExpectWithTimeout(true, time.Second*20, "Start coordinator successfully")
coordinatorCmd.RunApp(true)
coordinatorCmd.RunApp(func() bool { return coordinatorCmd.WaitResult(time.Second*20, "Start coordinator successfully") })
// Start roller process.
rollerCmd := runRollerApp(t)
rollerCmd.ExpectWithTimeout(true, time.Second*40, "roller start successfully")
rollerCmd.ExpectWithTimeout(true, time.Second*60, "register to coordinator successfully!")
rollerCmd.RunApp(true)
rollerCmd.RunApp(func() bool { return rollerCmd.WaitResult(time.Second*40, "roller start successfully") })
rollerCmd.WaitExit()
bridgeCmd.WaitExit()