mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-22 04:18:07 -05:00
302 lines
8.3 KiB
Go
302 lines
8.3 KiB
Go
package coordinator_test
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
mathrand "math/rand"
|
|
"net/url"
|
|
"os"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/scroll-tech/go-ethereum/core/types"
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
"scroll-tech/common/docker"
|
|
"scroll-tech/common/message"
|
|
"scroll-tech/common/utils"
|
|
db_config "scroll-tech/database"
|
|
"scroll-tech/database/orm"
|
|
|
|
"scroll-tech/bridge/mock"
|
|
|
|
"scroll-tech/coordinator"
|
|
"scroll-tech/coordinator/config"
|
|
)
|
|
|
|
const managerAddr = "localhost:8132"
|
|
const managerPort = ":8132"
|
|
|
|
var (
|
|
DB_CONFIG = &db_config.DBConfig{
|
|
DriverName: utils.GetEnvWithDefault("TEST_DB_DRIVER", "postgres"),
|
|
DSN: utils.GetEnvWithDefault("TEST_DB_DSN", "postgres://postgres:123456@localhost:5436/testmanager_db?sslmode=disable"),
|
|
}
|
|
|
|
TEST_CONFIG = &mock.TestConfig{
|
|
L2GethTestConfig: mock.L2GethTestConfig{
|
|
HPort: 8536,
|
|
WPort: 0,
|
|
},
|
|
DbTestconfig: mock.DbTestconfig{
|
|
DbName: "testmanager_db",
|
|
DbPort: 5436,
|
|
},
|
|
}
|
|
l2gethImg docker.ImgInstance
|
|
dbImg docker.ImgInstance
|
|
)
|
|
|
|
func setupEnv(t *testing.T) {
|
|
// initialize l2geth docker image
|
|
l2gethImg = mock.NewTestL2Docker(t, TEST_CONFIG)
|
|
// initialize db docker image
|
|
dbImg = mock.GetDbDocker(t, TEST_CONFIG)
|
|
}
|
|
|
|
func TestFunction(t *testing.T) {
|
|
// Setup
|
|
setupEnv(t)
|
|
|
|
t.Run("TestHandshake", func(t *testing.T) {
|
|
verifierEndpoint := setupMockVerifier(t)
|
|
|
|
rollerManager := setupRollerManager(t, verifierEndpoint, nil)
|
|
defer rollerManager.Stop()
|
|
|
|
// Set up client
|
|
u := url.URL{Scheme: "ws", Host: managerAddr, Path: "/"}
|
|
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
|
|
assert.NoError(t, err)
|
|
defer c.Close()
|
|
|
|
mock.PerformHandshake(t, c)
|
|
|
|
// Roller manager should send a Websocket over the GetRollerChan
|
|
select {
|
|
case <-rollerManager.GetRollerChan():
|
|
// Test succeeded
|
|
case <-time.After(coordinator.HandshakeTimeout):
|
|
t.Fail()
|
|
}
|
|
})
|
|
|
|
t.Run("TestHandshakeTimeout", func(t *testing.T) {
|
|
verifierEndpoint := setupMockVerifier(t)
|
|
|
|
rollerManager := setupRollerManager(t, verifierEndpoint, nil)
|
|
defer rollerManager.Stop()
|
|
|
|
// Set up client
|
|
u := url.URL{Scheme: "ws", Host: managerAddr, Path: "/"}
|
|
|
|
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
|
|
assert.NoError(t, err)
|
|
defer c.Close()
|
|
|
|
// Wait for the handshake timeout to pass
|
|
<-time.After(coordinator.HandshakeTimeout + 1*time.Second)
|
|
|
|
mock.PerformHandshake(t, c)
|
|
|
|
// No websocket should be received
|
|
select {
|
|
case <-rollerManager.GetRollerChan():
|
|
t.Fail()
|
|
case <-time.After(1 * time.Second):
|
|
// Test succeeded
|
|
}
|
|
})
|
|
|
|
t.Run("TestTwoConnections", func(t *testing.T) {
|
|
verifierEndpoint := setupMockVerifier(t)
|
|
rollerManager := setupRollerManager(t, verifierEndpoint, nil)
|
|
defer rollerManager.Stop()
|
|
|
|
// Set up and register 2 clients
|
|
for i := 0; i < 2; i++ {
|
|
u := url.URL{Scheme: "ws", Host: managerAddr, Path: "/"}
|
|
|
|
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
|
|
assert.NoError(t, err)
|
|
defer c.Close()
|
|
|
|
mock.PerformHandshake(t, c)
|
|
|
|
// Roller manager should send a Websocket over the GetRollerChan
|
|
select {
|
|
case <-rollerManager.GetRollerChan():
|
|
// Test succeeded
|
|
case <-time.After(coordinator.HandshakeTimeout):
|
|
t.Fail()
|
|
}
|
|
}
|
|
})
|
|
|
|
t.Run("TestTriggerProofGenerationSession", func(t *testing.T) {
|
|
// prepare DB
|
|
mock.ClearDB(t, DB_CONFIG)
|
|
db := mock.PrepareDB(t, DB_CONFIG)
|
|
|
|
// Test with two clients to make sure traces messages aren't duplicated
|
|
// to rollers.
|
|
numClients := uint8(2)
|
|
verifierEndpoint := setupMockVerifier(t)
|
|
rollerManager := setupRollerManager(t, verifierEndpoint, db)
|
|
|
|
// Set up and register `numClients` clients
|
|
conns := make([]*websocket.Conn, numClients)
|
|
for i := 0; i < int(numClients); i++ {
|
|
u := url.URL{Scheme: "ws", Host: managerAddr, Path: "/"}
|
|
|
|
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
|
|
assert.NoError(t, err)
|
|
defer c.Close()
|
|
|
|
mock.PerformHandshake(t, c)
|
|
|
|
conns[i] = c
|
|
}
|
|
|
|
var results []*types.BlockResult
|
|
|
|
templateBlockResult, err := os.ReadFile("../common/testdata/blockResult_orm.json")
|
|
assert.NoError(t, err)
|
|
blockResult := &types.BlockResult{}
|
|
err = json.Unmarshal(templateBlockResult, blockResult)
|
|
assert.NoError(t, err)
|
|
results = append(results, blockResult)
|
|
templateBlockResult, err = os.ReadFile("../common/testdata/blockResult_delegate.json")
|
|
assert.NoError(t, err)
|
|
blockResult = &types.BlockResult{}
|
|
err = json.Unmarshal(templateBlockResult, blockResult)
|
|
assert.NoError(t, err)
|
|
results = append(results, blockResult)
|
|
|
|
err = db.InsertBlockResultsWithStatus(context.Background(), results, orm.BlockUnassigned)
|
|
assert.NoError(t, err)
|
|
|
|
// Need to send a tx to trigger block committed
|
|
// Sleep for a little bit, so that we can avoid prematurely fetching connections. Trigger for manager is 3 seconds
|
|
time.Sleep(4 * time.Second)
|
|
|
|
// Both rollers should now receive a `BlockTraces` message and should send something back.
|
|
for _, c := range conns {
|
|
mt, payload, err := c.ReadMessage()
|
|
assert.NoError(t, err)
|
|
|
|
assert.Equal(t, mt, websocket.BinaryMessage)
|
|
|
|
msg := &message.Msg{}
|
|
assert.NoError(t, json.Unmarshal(payload, msg))
|
|
assert.Equal(t, msg.Type, message.BlockTrace)
|
|
|
|
traces := &message.BlockTraces{}
|
|
assert.NoError(t, json.Unmarshal(payload, traces))
|
|
|
|
}
|
|
|
|
rollerManager.Stop()
|
|
})
|
|
|
|
t.Run("TestIdleRollerSelection", func(t *testing.T) {
|
|
// Test with two clients to make sure traces messages aren't duplicated
|
|
// to rollers.
|
|
numClients := uint8(2)
|
|
verifierEndpoint := setupMockVerifier(t)
|
|
|
|
mock.ClearDB(t, DB_CONFIG)
|
|
db := mock.PrepareDB(t, DB_CONFIG)
|
|
|
|
// Ensure only one roller is picked per session.
|
|
rollerManager := setupRollerManager(t, verifierEndpoint, db)
|
|
defer rollerManager.Stop()
|
|
|
|
// Set up and register `numClients` clients
|
|
conns := make([]*websocket.Conn, numClients)
|
|
for i := 0; i < int(numClients); i++ {
|
|
u := url.URL{Scheme: "ws", Host: managerAddr, Path: "/"}
|
|
|
|
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
|
|
assert.NoError(t, err)
|
|
c.SetReadDeadline(time.Now().Add(100 * time.Second))
|
|
|
|
mock.PerformHandshake(t, c)
|
|
time.Sleep(1 * time.Second)
|
|
conns[i] = c
|
|
}
|
|
defer func() {
|
|
for _, conn := range conns {
|
|
assert.NoError(t, conn.Close())
|
|
}
|
|
}()
|
|
|
|
assert.Equal(t, 2, rollerManager.GetNumberOfIdleRollers())
|
|
|
|
templateBlockResult, err := os.ReadFile("../common/testdata/blockResult_orm.json")
|
|
assert.NoError(t, err)
|
|
blockResult := &types.BlockResult{}
|
|
err = json.Unmarshal(templateBlockResult, blockResult)
|
|
assert.NoError(t, err)
|
|
err = db.InsertBlockResultsWithStatus(context.Background(), []*types.BlockResult{blockResult}, orm.BlockUnassigned)
|
|
assert.NoError(t, err)
|
|
|
|
// Sleep for a little bit, so that we can avoid prematurely fetching connections.
|
|
// Test first roller and check if we have one roller idle one roller busy
|
|
time.Sleep(4 * time.Second)
|
|
|
|
assert.Equal(t, 1, rollerManager.GetNumberOfIdleRollers())
|
|
|
|
templateBlockResult, err = os.ReadFile("../common/testdata/blockResult_delegate.json")
|
|
assert.NoError(t, err)
|
|
blockResult = &types.BlockResult{}
|
|
err = json.Unmarshal(templateBlockResult, blockResult)
|
|
assert.NoError(t, err)
|
|
err = db.InsertBlockResultsWithStatus(context.Background(), []*types.BlockResult{blockResult}, orm.BlockUnassigned)
|
|
assert.NoError(t, err)
|
|
|
|
// Sleep for a little bit, so that we can avoid prematurely fetching connections.
|
|
// Test Second roller and check if we have all rollers busy
|
|
time.Sleep(4 * time.Second)
|
|
|
|
for _, c := range conns {
|
|
c.ReadMessage()
|
|
}
|
|
|
|
assert.Equal(t, 0, rollerManager.GetNumberOfIdleRollers())
|
|
})
|
|
// Teardown
|
|
t.Cleanup(func() {
|
|
assert.NoError(t, l2gethImg.Stop())
|
|
assert.NoError(t, dbImg.Stop())
|
|
})
|
|
|
|
}
|
|
|
|
func setupRollerManager(t *testing.T, verifierEndpoint string, orm orm.BlockResultOrm) *coordinator.Manager {
|
|
rollerManager, err := coordinator.New(context.Background(), &config.RollerManagerConfig{
|
|
Endpoint: managerPort,
|
|
RollersPerSession: 1,
|
|
VerifierEndpoint: verifierEndpoint,
|
|
CollectionTime: 1,
|
|
}, orm)
|
|
assert.NoError(t, err)
|
|
|
|
assert.NoError(t, rollerManager.Start())
|
|
|
|
return rollerManager
|
|
}
|
|
|
|
func setupMockVerifier(t *testing.T) string {
|
|
id := strconv.Itoa(mathrand.Int())
|
|
verifierEndpoint := "/tmp/" + id + ".sock"
|
|
err := os.RemoveAll(verifierEndpoint)
|
|
assert.NoError(t, err)
|
|
|
|
mock.SetupMockVerifier(t, verifierEndpoint)
|
|
|
|
return verifierEndpoint
|
|
}
|