mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-11 23:18:07 -05:00
Compare commits
41 Commits
docs/bridg
...
rename/pro
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
90637857f3 | ||
|
|
9cc3499309 | ||
|
|
e23ac6df47 | ||
|
|
c4980b5cdc | ||
|
|
6e8f222eff | ||
|
|
c5255df06f | ||
|
|
307d7911f3 | ||
|
|
1ab824c623 | ||
|
|
60f635d689 | ||
|
|
b3e618c2d5 | ||
|
|
f62f66397b | ||
|
|
85a229e8c3 | ||
|
|
bfc88f4e85 | ||
|
|
4ef0f43499 | ||
|
|
d376f2c516 | ||
|
|
81a547d1b0 | ||
|
|
f25c978db3 | ||
|
|
74489c5f07 | ||
|
|
9de780c5d7 | ||
|
|
6bc2ef471a | ||
|
|
c4a6eb35e6 | ||
|
|
6c47ad9a61 | ||
|
|
6219486f7d | ||
|
|
5745bf2ab7 | ||
|
|
8ce7d26fe8 | ||
|
|
1c2625c23a | ||
|
|
707d51f693 | ||
|
|
93a190142f | ||
|
|
0db7c8a424 | ||
|
|
bbaeed9150 | ||
|
|
f06b3b40a2 | ||
|
|
4e74ef769f | ||
|
|
5cae15bcca | ||
|
|
c2d48228bc | ||
|
|
8e7efa9b9e | ||
|
|
ec5bb23f77 | ||
|
|
5d8b969b5d | ||
|
|
635b93f297 | ||
|
|
8d52f061f4 | ||
|
|
cf0343d6b0 | ||
|
|
3a6005c650 |
2
.github/workflows/integration.yaml
vendored
2
.github/workflows/integration.yaml
vendored
@@ -22,7 +22,7 @@ jobs:
|
||||
- name: Install Go
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: 1.18.x
|
||||
go-version: 1.19.x
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v2
|
||||
- name: Install Solc
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
name: Roller
|
||||
name: Prover
|
||||
|
||||
on:
|
||||
push:
|
||||
@@ -8,8 +8,8 @@ on:
|
||||
- develop
|
||||
- alpha
|
||||
paths:
|
||||
- 'roller/**'
|
||||
- '.github/workflows/roller.yml'
|
||||
- 'prover/**'
|
||||
- '.github/workflows/prover.yml'
|
||||
pull_request:
|
||||
types:
|
||||
- opened
|
||||
@@ -17,12 +17,12 @@ on:
|
||||
- synchronize
|
||||
- ready_for_review
|
||||
paths:
|
||||
- 'roller/**'
|
||||
- '.github/workflows/roller.yml'
|
||||
- 'prover/**'
|
||||
- '.github/workflows/prover.yml'
|
||||
|
||||
defaults:
|
||||
run:
|
||||
working-directory: 'roller'
|
||||
working-directory: 'prover'
|
||||
|
||||
jobs:
|
||||
test:
|
||||
@@ -43,7 +43,7 @@ jobs:
|
||||
env:
|
||||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||
with:
|
||||
flags: roller
|
||||
flags: prover
|
||||
compile:
|
||||
if: github.event_name == 'push' # will only be triggered when pushing to main & staging & develop & alpha
|
||||
runs-on: ubuntu-latest
|
||||
@@ -65,7 +65,7 @@ jobs:
|
||||
workspaces: "common/libzkp/impl -> target"
|
||||
- name: Test
|
||||
run: |
|
||||
make roller
|
||||
make prover
|
||||
check:
|
||||
if: github.event.pull_request.draft == false
|
||||
runs-on: ubuntu-latest
|
||||
@@ -92,7 +92,7 @@ jobs:
|
||||
uses: actions/checkout@v2
|
||||
- name: Install goimports
|
||||
run: go install golang.org/x/tools/cmd/goimports
|
||||
- run: goimports -local scroll-tech/roller/ -w .
|
||||
- run: goimports -local scroll-tech/prover/ -w .
|
||||
- run: go mod tidy
|
||||
# If there are any diffs from goimports or go mod tidy, fail.
|
||||
- name: Verify no changes from goimports and go mod tidy
|
||||
6
Makefile
6
Makefile
@@ -10,7 +10,7 @@ lint: ## The code's format and security checks.
|
||||
make -C common lint
|
||||
make -C coordinator lint
|
||||
make -C database lint
|
||||
make -C roller lint
|
||||
make -C prover lint
|
||||
make -C bridge-history-api lint
|
||||
|
||||
update: ## update dependencies
|
||||
@@ -20,13 +20,13 @@ update: ## update dependencies
|
||||
cd $(PWD)/common/ && go get -u github.com/scroll-tech/go-ethereum@scroll-v4.1.0 && go mod tidy
|
||||
cd $(PWD)/coordinator/ && go get -u github.com/scroll-tech/go-ethereum@scroll-v4.1.0 && go mod tidy
|
||||
cd $(PWD)/database/ && go get -u github.com/scroll-tech/go-ethereum@scroll-v4.1.0 && go mod tidy
|
||||
cd $(PWD)/roller/ && go get -u github.com/scroll-tech/go-ethereum@scroll-v4.1.0 && go mod tidy
|
||||
cd $(PWD)/prover/ && go get -u github.com/scroll-tech/go-ethereum@scroll-v4.1.0 && go mod tidy
|
||||
goimports -local $(PWD)/bridge/ -w .
|
||||
goimports -local $(PWD)/bridge-history-api/ -w .
|
||||
goimports -local $(PWD)/common/ -w .
|
||||
goimports -local $(PWD)/coordinator/ -w .
|
||||
goimports -local $(PWD)/database/ -w .
|
||||
goimports -local $(PWD)/roller/ -w .
|
||||
goimports -local $(PWD)/prover/ -w .
|
||||
|
||||
dev_docker: ## build docker images for development/testing usages
|
||||
docker build -t scroll_l1geth ./common/docker/l1geth/
|
||||
|
||||
@@ -109,7 +109,7 @@ func (r *RelayerConfig) UnmarshalJSON(input []byte) error {
|
||||
for _, privStr := range jsonConfig.RollupSenderPrivateKeys {
|
||||
priv, err := crypto.ToECDSA(common.FromHex(privStr))
|
||||
if err != nil {
|
||||
return fmt.Errorf("incorrect roller_private_key format, err: %v", err)
|
||||
return fmt.Errorf("incorrect prover_private_key format, err: %v", err)
|
||||
}
|
||||
r.RollupSenderPrivateKeys = append(r.RollupSenderPrivateKeys, priv)
|
||||
}
|
||||
|
||||
@@ -417,7 +417,7 @@ func (r *Layer2Relayer) ProcessCommittedBatches() {
|
||||
// The proof for this block is not ready yet.
|
||||
return
|
||||
case types.ProvingTaskProved:
|
||||
// It's an intermediate state. The roller manager received the proof but has not verified
|
||||
// It's an intermediate state. The prover manager received the proof but has not verified
|
||||
// the proof yet. We don't roll up the proof until it's verified.
|
||||
return
|
||||
case types.ProvingTaskVerified:
|
||||
|
||||
@@ -24,7 +24,7 @@ COPY ./bridge/go.* ./bridge/
|
||||
COPY ./common/go.* ./common/
|
||||
COPY ./coordinator/go.* ./coordinator/
|
||||
COPY ./database/go.* ./database/
|
||||
COPY ./roller/go.* ./roller/
|
||||
COPY ./prover/go.* ./prover/
|
||||
COPY ./tests/integration-test/go.* ./tests/integration-test/
|
||||
COPY ./bridge-history-api/go.* ./bridge-history-api/
|
||||
RUN go mod download -x
|
||||
|
||||
@@ -7,7 +7,7 @@ COPY ./bridge/go.* ./bridge/
|
||||
COPY ./common/go.* ./common/
|
||||
COPY ./coordinator/go.* ./coordinator/
|
||||
COPY ./database/go.* ./database/
|
||||
COPY ./roller/go.* ./roller/
|
||||
COPY ./prover/go.* ./prover/
|
||||
COPY ./tests/integration-test/go.* ./tests/integration-test/
|
||||
COPY ./bridge-history-api/go.* ./bridge-history-api/
|
||||
RUN go mod download -x
|
||||
|
||||
@@ -7,7 +7,7 @@ COPY ./bridge/go.* ./bridge/
|
||||
COPY ./common/go.* ./common/
|
||||
COPY ./coordinator/go.* ./coordinator/
|
||||
COPY ./database/go.* ./database/
|
||||
COPY ./roller/go.* ./roller/
|
||||
COPY ./prover/go.* ./prover/
|
||||
COPY ./tests/integration-test/go.* ./tests/integration-test/
|
||||
COPY ./bridge-history-api/go.* ./bridge-history-api/
|
||||
RUN go mod download -x
|
||||
|
||||
@@ -7,7 +7,7 @@ COPY ./bridge/go.* ./bridge/
|
||||
COPY ./common/go.* ./common/
|
||||
COPY ./coordinator/go.* ./coordinator/
|
||||
COPY ./database/go.* ./database/
|
||||
COPY ./roller/go.* ./roller/
|
||||
COPY ./prover/go.* ./prover/
|
||||
COPY ./tests/integration-test/go.* ./tests/integration-test/
|
||||
COPY ./bridge-history-api/go.* ./bridge-history-api/
|
||||
RUN go mod download -x
|
||||
|
||||
@@ -7,7 +7,7 @@ COPY ./bridge/go.* ./bridge/
|
||||
COPY ./common/go.* ./common/
|
||||
COPY ./coordinator/go.* ./coordinator/
|
||||
COPY ./database/go.* ./database/
|
||||
COPY ./roller/go.* ./roller/
|
||||
COPY ./prover/go.* ./prover/
|
||||
COPY ./tests/integration-test/go.* ./tests/integration-test/
|
||||
COPY ./bridge-history-api/go.* ./bridge-history-api/
|
||||
RUN go mod download -x
|
||||
|
||||
@@ -7,7 +7,7 @@ COPY ./bridge/go.* ./bridge/
|
||||
COPY ./common/go.* ./common/
|
||||
COPY ./coordinator/go.* ./coordinator/
|
||||
COPY ./database/go.* ./database/
|
||||
COPY ./roller/go.* ./roller/
|
||||
COPY ./prover/go.* ./prover/
|
||||
COPY ./tests/integration-test/go.* ./tests/integration-test/
|
||||
COPY ./bridge-history-api/go.* ./bridge-history-api/
|
||||
RUN go mod download -x
|
||||
|
||||
@@ -31,7 +31,7 @@ flag_management:
|
||||
- type: project
|
||||
target: auto
|
||||
threshold: 1%
|
||||
- name: roller
|
||||
- name: prover
|
||||
statuses:
|
||||
- type: project
|
||||
target: auto
|
||||
|
||||
@@ -4,6 +4,7 @@ go 1.19
|
||||
|
||||
require (
|
||||
github.com/docker/docker v23.0.6+incompatible
|
||||
github.com/golang-jwt/jwt v3.2.2+incompatible
|
||||
github.com/jmoiron/sqlx v1.3.5
|
||||
github.com/lib/pq v1.10.9
|
||||
github.com/mattn/go-colorable v0.1.13
|
||||
|
||||
@@ -131,6 +131,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
|
||||
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
|
||||
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
|
||||
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
|
||||
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec/go.mod h1:QZ0nwyI2jOfgRAoBvP+ab5aRr7c9x7lhGEJrKvBwjWI=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
|
||||
@@ -68,28 +68,28 @@ const (
|
||||
MsgRelayFailed
|
||||
)
|
||||
|
||||
// RollerProveStatus is the roller prove status of a block batch (session)
|
||||
type RollerProveStatus int32
|
||||
// ProverProveStatus is the prover prove status of a block batch (session)
|
||||
type ProverProveStatus int32
|
||||
|
||||
const (
|
||||
// RollerProveStatusUndefined indicates an unknown roller proving status
|
||||
RollerProveStatusUndefined RollerProveStatus = iota
|
||||
// RollerAssigned indicates roller assigned but has not submitted proof
|
||||
RollerAssigned
|
||||
// RollerProofValid indicates roller has submitted valid proof
|
||||
RollerProofValid
|
||||
// RollerProofInvalid indicates roller has submitted invalid proof
|
||||
RollerProofInvalid
|
||||
// ProverProveStatusUndefined indicates an unknown prover proving status
|
||||
ProverProveStatusUndefined ProverProveStatus = iota
|
||||
// ProverAssigned indicates prover assigned but has not submitted proof
|
||||
ProverAssigned
|
||||
// ProverProofValid indicates prover has submitted valid proof
|
||||
ProverProofValid
|
||||
// ProverProofInvalid indicates prover has submitted invalid proof
|
||||
ProverProofInvalid
|
||||
)
|
||||
|
||||
func (s RollerProveStatus) String() string {
|
||||
func (s ProverProveStatus) String() string {
|
||||
switch s {
|
||||
case RollerAssigned:
|
||||
return "RollerAssigned"
|
||||
case RollerProofValid:
|
||||
return "RollerProofValid"
|
||||
case RollerProofInvalid:
|
||||
return "RollerProofInvalid"
|
||||
case ProverAssigned:
|
||||
return "ProverAssigned"
|
||||
case ProverProofValid:
|
||||
return "ProverProofValid"
|
||||
case ProverProofInvalid:
|
||||
return "ProverProofInvalid"
|
||||
default:
|
||||
return fmt.Sprintf("Bad Value: %d", int32(s))
|
||||
}
|
||||
@@ -99,7 +99,7 @@ func (s RollerProveStatus) String() string {
|
||||
type ProverTaskFailureType int
|
||||
|
||||
const (
|
||||
// ProverTaskFailureTypeUndefined indicates an unknown roller failure type
|
||||
// ProverTaskFailureTypeUndefined indicates an unknown prover failure type
|
||||
ProverTaskFailureTypeUndefined ProverTaskFailureType = iota
|
||||
// ProverTaskFailureTypeTimeout prover task failure of timeout
|
||||
ProverTaskFailureTypeTimeout
|
||||
|
||||
@@ -6,30 +6,30 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestRollerProveStatus(t *testing.T) {
|
||||
func TestProverProveStatus(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
s RollerProveStatus
|
||||
s ProverProveStatus
|
||||
want string
|
||||
}{
|
||||
{
|
||||
"RollerAssigned",
|
||||
RollerAssigned,
|
||||
"RollerAssigned",
|
||||
"ProverAssigned",
|
||||
ProverAssigned,
|
||||
"ProverAssigned",
|
||||
},
|
||||
{
|
||||
"RollerProofValid",
|
||||
RollerProofValid,
|
||||
"RollerProofValid",
|
||||
"ProverProofValid",
|
||||
ProverProofValid,
|
||||
"ProverProofValid",
|
||||
},
|
||||
{
|
||||
"RollerProofInvalid",
|
||||
RollerProofInvalid,
|
||||
"RollerProofInvalid",
|
||||
"ProverProofInvalid",
|
||||
ProverProofInvalid,
|
||||
"ProverProofInvalid",
|
||||
},
|
||||
{
|
||||
"Bad Value",
|
||||
RollerProveStatus(999), // Invalid value.
|
||||
ProverProveStatus(999), // Invalid value.
|
||||
"Bad Value: 999",
|
||||
},
|
||||
}
|
||||
|
||||
@@ -2,10 +2,11 @@ package message
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang-jwt/jwt"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/common/hexutil"
|
||||
@@ -13,7 +14,7 @@ import (
|
||||
"github.com/scroll-tech/go-ethereum/rlp"
|
||||
)
|
||||
|
||||
// RespStatus represents status code from roller to scroll
|
||||
// RespStatus represents status code from prover to scroll
|
||||
type RespStatus uint32
|
||||
|
||||
const (
|
||||
@@ -23,7 +24,7 @@ const (
|
||||
StatusProofError
|
||||
)
|
||||
|
||||
// ProofType represents the type of roller.
|
||||
// ProofType represents the type of prover.
|
||||
type ProofType uint8
|
||||
|
||||
func (r ProofType) String() string {
|
||||
@@ -40,28 +41,30 @@ func (r ProofType) String() string {
|
||||
const (
|
||||
// ProofTypeUndefined is an unknown proof type
|
||||
ProofTypeUndefined ProofType = iota
|
||||
// ProofTypeChunk is default roller, it only generates zk proof from traces.
|
||||
// ProofTypeChunk is default prover, it only generates zk proof from traces.
|
||||
ProofTypeChunk
|
||||
// ProofTypeBatch generates zk proof from other zk proofs and aggregate them into one proof.
|
||||
ProofTypeBatch
|
||||
)
|
||||
|
||||
// AuthMsg is the first message exchanged from the Roller to the Sequencer.
|
||||
// It effectively acts as a registration, and makes the Roller identification
|
||||
// AuthMsg is the first message exchanged from the Prover to the Sequencer.
|
||||
// It effectively acts as a registration, and makes the Prover identification
|
||||
// known to the Sequencer.
|
||||
type AuthMsg struct {
|
||||
// Message fields
|
||||
Identity *Identity `json:"message"`
|
||||
// Roller signature
|
||||
// Prover signature
|
||||
Signature string `json:"signature"`
|
||||
// Jwt claims
|
||||
JwtClaims jwt.StandardClaims `json:"jwt_claims,omitempty"`
|
||||
}
|
||||
|
||||
// Identity contains all the fields to be signed by the roller.
|
||||
// Identity contains all the fields to be signed by the prover.
|
||||
type Identity struct {
|
||||
// Roller name
|
||||
// Prover name
|
||||
Name string `json:"name"`
|
||||
// Roller RollerType
|
||||
RollerType ProofType `json:"roller_type,omitempty"`
|
||||
// Prover ProverType
|
||||
ProverType ProofType `json:"prover_type,omitempty"`
|
||||
// Version is common.Version+ZkVersion. Use the following to check the latest ZkVersion version.
|
||||
// curl -sL https://api.github.com/repos/scroll-tech/scroll-prover/commits | jq -r ".[0].sha"
|
||||
Version string `json:"version"`
|
||||
@@ -70,12 +73,25 @@ type Identity struct {
|
||||
}
|
||||
|
||||
// GenerateToken generates token
|
||||
func GenerateToken() (string, error) {
|
||||
b := make([]byte, 16)
|
||||
if _, err := rand.Read(b); err != nil {
|
||||
return "", err
|
||||
func GenerateToken(tokenExpire time.Duration, secret []byte) (string, error) {
|
||||
var auth AuthMsg
|
||||
auth.JwtClaims.ExpiresAt = time.Now().Add(tokenExpire).Unix()
|
||||
token := jwt.NewWithClaims(jwt.SigningMethodHS256, auth.JwtClaims)
|
||||
return token.SignedString(secret)
|
||||
}
|
||||
|
||||
// VerifyToken verifies token
|
||||
func VerifyToken(secret []byte, tokenStr string) (bool, error) {
|
||||
token, err := jwt.Parse(tokenStr, func(token *jwt.Token) (interface{}, error) {
|
||||
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
|
||||
return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
|
||||
}
|
||||
return secret, nil
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return hex.EncodeToString(b), nil
|
||||
return token.Valid, nil
|
||||
}
|
||||
|
||||
// SignWithKey auth message with private key and set public key in auth message's Identity
|
||||
@@ -140,11 +156,14 @@ func (i *Identity) Hash() ([]byte, error) {
|
||||
// ProofMsg is the data structure sent to the coordinator.
|
||||
type ProofMsg struct {
|
||||
*ProofDetail `json:"zkProof"`
|
||||
// Roller signature
|
||||
// Prover signature
|
||||
Signature string `json:"signature"`
|
||||
|
||||
// Roller public key
|
||||
// Prover public key
|
||||
publicKey string
|
||||
|
||||
// jwt
|
||||
Token string `json:"token"`
|
||||
}
|
||||
|
||||
// Sign signs the ProofMsg.
|
||||
@@ -204,13 +223,13 @@ func (a *ProofMsg) PublicKey() (string, error) {
|
||||
type TaskMsg struct {
|
||||
ID string `json:"id"`
|
||||
Type ProofType `json:"type,omitempty"`
|
||||
// For decentralization, basic rollers will get block hashes from the coordinator. So that they can refer to the block hashes and fetch traces locally. Only applicable for basic rollers.
|
||||
// For decentralization, basic provers will get block hashes from the coordinator. So that they can refer to the block hashes and fetch traces locally. Only applicable for basic provers.
|
||||
BlockHashes []common.Hash `json:"block_hashes,omitempty"`
|
||||
// Only applicable for aggregator rollers.
|
||||
// Only applicable for aggregator provers.
|
||||
SubProofs []*AggProof `json:"sub_proofs,omitempty"`
|
||||
}
|
||||
|
||||
// ProofDetail is the message received from rollers that contains zk proof, the status of
|
||||
// ProofDetail is the message received from provers that contains zk proof, the status of
|
||||
// the proof generation succeeded, and an error message if proof generation failed.
|
||||
type ProofDetail struct {
|
||||
ID string `json:"id"`
|
||||
|
||||
@@ -3,6 +3,7 @@ package message
|
||||
import (
|
||||
"encoding/hex"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/crypto"
|
||||
@@ -38,16 +39,19 @@ func TestAuthMessageSignAndVerify(t *testing.T) {
|
||||
assert.Equal(t, pub, common.Bytes2Hex(pubkey))
|
||||
}
|
||||
|
||||
func TestGenerateToken(t *testing.T) {
|
||||
token, err := GenerateToken()
|
||||
func TestJwtToken(t *testing.T) {
|
||||
var secret = []byte("secret")
|
||||
token, err := GenerateToken(time.Duration(1), secret)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 32, len(token))
|
||||
ok, err := VerifyToken(secret, token)
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, ok)
|
||||
}
|
||||
|
||||
func TestIdentityHash(t *testing.T) {
|
||||
identity := &Identity{
|
||||
Name: "testName",
|
||||
RollerType: ProofTypeChunk,
|
||||
ProverType: ProofTypeChunk,
|
||||
Version: "testVersion",
|
||||
Token: "testToken",
|
||||
}
|
||||
|
||||
@@ -24,8 +24,8 @@ var (
|
||||
CoordinatorApp MockAppName = "coordinator-test"
|
||||
// DBCliApp the name of mock database app.
|
||||
DBCliApp MockAppName = "db_cli-test"
|
||||
// RollerApp the name of mock roller app.
|
||||
RollerApp MockAppName = "roller-test"
|
||||
// ProverApp the name of mock prover app.
|
||||
ProverApp MockAppName = "prover-test"
|
||||
)
|
||||
|
||||
// RegisterSimulation register initializer function for integration-test.
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
var tag = "v4.0.26"
|
||||
var tag = "v4.1.02"
|
||||
|
||||
var commit = func() string {
|
||||
if info, ok := debug.ReadBuildInfo(); ok {
|
||||
@@ -25,5 +25,5 @@ var commit = func() string {
|
||||
// ZkVersion is commit-id of common/libzkp/impl/cargo.lock/scroll-prover
|
||||
var ZkVersion string
|
||||
|
||||
// Version denote the version of scroll protocol, including the l2geth, relayer, coordinator, roller, contracts and etc.
|
||||
// Version denote the version of scroll protocol, including the l2geth, relayer, coordinator, prover, contracts and etc.
|
||||
var Version = fmt.Sprintf("%s-%s-%s", tag, commit, ZkVersion)
|
||||
|
||||
@@ -24,7 +24,7 @@ The execution in layer 2 may be failed due to out of gas problem. In such case,
|
||||
|
||||
### Send Message from L2 to L1
|
||||
|
||||
Similar to sending message from L1 to L2, you should call `L2ScrollMessenger.sendMessage` first in layer 2. The `L2ScrollMessenger` contract will emit a `SentMessage` event, which will be notified by the Sequencer. Unlike above, the Sequencer will first batch submit layer 2 transactions (or block) to `ZKRollup` contract in layer 1. Then the Sequencer will wait the proof generated by roller and submit the proof to `ZKRollup` contract in layer 1 again. Finally, anyone can call `L1ScrollMessenger.relayMessageWithProof` with correct proof to execute the message in layer 1.
|
||||
Similar to sending message from L1 to L2, you should call `L2ScrollMessenger.sendMessage` first in layer 2. The `L2ScrollMessenger` contract will emit a `SentMessage` event, which will be notified by the Sequencer. Unlike above, the Sequencer will first batch submit layer 2 transactions (or block) to `ZKRollup` contract in layer 1. Then the Sequencer will wait the proof generated by prover and submit the proof to `ZKRollup` contract in layer 1 again. Finally, anyone can call `L1ScrollMessenger.relayMessageWithProof` with correct proof to execute the message in layer 1.
|
||||
|
||||
Currently, for the safety reason, we only allow privileged contracts to send cross domain messages. And only privileged accounts can call `L2ScrollMessenger.relayMessage`.
|
||||
|
||||
|
||||
@@ -37,7 +37,7 @@ make lint
|
||||
|
||||
## Configure
|
||||
|
||||
The coordinator behavior can be configured using [`config.json`](config.json). Check the code comments under `RollerManagerConfig` in [`config/config.go`](config/config.go) for more details.
|
||||
The coordinator behavior can be configured using [`config.json`](config.json). Check the code comments under `ProverManagerConfig` in [`config/config.go`](config/config.go) for more details.
|
||||
|
||||
|
||||
## Start
|
||||
@@ -59,22 +59,22 @@ The coordinator behavior can be configured using [`config.json`](config.json). C
|
||||
|
||||
### cmd/app/app.go
|
||||
|
||||
This file defines the main entry point for the coordinator application, setting up the necessary modules, and handling graceful shutdowns. Upon loading config.json file, the coordinator (`cmd/app/app.go`) sets up and starts the HTTP and WebSocket servers using the configured ports and addresses. `flags.go` is used to parse the flags. Then, it creates a new `RollerManager` (`manager.go`) and starts listening.
|
||||
This file defines the main entry point for the coordinator application, setting up the necessary modules, and handling graceful shutdowns. Upon loading config.json file, the coordinator (`cmd/app/app.go`) sets up and starts the HTTP and WebSocket servers using the configured ports and addresses. `flags.go` is used to parse the flags, then starts listening.
|
||||
|
||||
### manager.go
|
||||
|
||||
`manager.go` calls `rollers.go` for prover (aka "roller") management functions. In the process, `rollers.go` calls `client.go`, initializing a prover client. For communication between prover clients and the coordinator manager, `api.go` is used.
|
||||
`manager.go` calls `provers.go` for prover (aka "prover") management functions. In the process, `provers.go` calls `client.go`, initializing a prover client. For communication between prover clients and the coordinator manager, `api.go` is used.
|
||||
|
||||
`manager.go` uses either `verifier.go` or `mock.go` (for development/testing purposes) to verify the proofs submitted by provers. After verification, `manager.go` will call `roller.go` to update the state of the prover, and then return the result (whether the proof verification process was successful) to the prover.
|
||||
`manager.go` uses either `verifier.go` or `mock.go` (for development/testing purposes) to verify the proofs submitted by provers. After verification, `manager.go` will call `prover.go` to update the state of the prover, and then return the result (whether the proof verification process was successful) to the prover.
|
||||
|
||||
### api.go
|
||||
|
||||
This file contains the implementation of the RPC API for the coordinator manager. The API allows prover clients to interact with the coordinator manager through functions such as `requestToken`, `register`, and `submitProof`.
|
||||
|
||||
### rollers.go
|
||||
### provers.go
|
||||
|
||||
This file contains the logic for handling prover-specific tasks, such as assigning tasks to provers, handling completed tasks, and managing prover metrics.
|
||||
|
||||
### client/client.go
|
||||
|
||||
This file contains the `Client` struct that is callable on the prover side and responsible for communicating with the coordinator through RPC. `RequestToken`, `RegisterAndSubscribe`, and `SubmitProof` are used by `rollers.go`.
|
||||
This file contains the `Client` struct that is callable on the prover side and responsible for communicating with the coordinator through RPC. `RequestToken`, `RegisterAndSubscribe`, and `SubmitProof` are used by `provers.go`.
|
||||
|
||||
@@ -33,19 +33,19 @@ func NewClient(c *rpc.Client) *Client {
|
||||
return &Client{client: c}
|
||||
}
|
||||
|
||||
// RequestToken generates token for roller
|
||||
// RequestToken generates token for prover
|
||||
func (c *Client) RequestToken(ctx context.Context, authMsg *message.AuthMsg) (string, error) {
|
||||
var token string
|
||||
err := c.client.CallContext(ctx, &token, "roller_requestToken", authMsg)
|
||||
err := c.client.CallContext(ctx, &token, "prover_requestToken", authMsg)
|
||||
return token, err
|
||||
}
|
||||
|
||||
// RegisterAndSubscribe subscribe roller and register, verified by sign data.
|
||||
// RegisterAndSubscribe subscribe prover and register, verified by sign data.
|
||||
func (c *Client) RegisterAndSubscribe(ctx context.Context, taskCh chan *message.TaskMsg, authMsg *message.AuthMsg) (ethereum.Subscription, error) {
|
||||
return c.client.Subscribe(ctx, "roller", taskCh, "register", authMsg)
|
||||
return c.client.Subscribe(ctx, "prover", taskCh, "register", authMsg)
|
||||
}
|
||||
|
||||
// SubmitProof get proof from roller.
|
||||
// SubmitProof get proof from prover.
|
||||
func (c *Client) SubmitProof(ctx context.Context, proof *message.ProofMsg) error {
|
||||
return c.client.CallContext(ctx, nil, "roller_submitProof", proof)
|
||||
return c.client.CallContext(ctx, nil, "prover_submitProof", proof)
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ import (
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/controller/api"
|
||||
"scroll-tech/coordinator/internal/controller/cron"
|
||||
"scroll-tech/coordinator/internal/logic/rollermanager"
|
||||
"scroll-tech/coordinator/internal/logic/provermanager"
|
||||
)
|
||||
|
||||
var app *cli.App
|
||||
@@ -56,7 +56,7 @@ func action(ctx *cli.Context) error {
|
||||
|
||||
proofCollector := cron.NewCollector(subCtx, db, cfg)
|
||||
|
||||
rollermanager.InitRollerManager(db)
|
||||
provermanager.InitProverManager(db)
|
||||
|
||||
defer func() {
|
||||
proofCollector.Stop()
|
||||
@@ -84,7 +84,7 @@ func action(ctx *cli.Context) error {
|
||||
}
|
||||
// Register api and start ws service.
|
||||
if ctx.Bool(wsEnabledFlag.Name) {
|
||||
handler, addr, err := utils.StartWSEndpoint(fmt.Sprintf("%s:%d", ctx.String(wsListenAddrFlag.Name), ctx.Int(wsPortFlag.Name)), apis, cfg.RollerManagerConfig.CompressionLevel)
|
||||
handler, addr, err := utils.StartWSEndpoint(fmt.Sprintf("%s:%d", ctx.String(wsListenAddrFlag.Name), ctx.Int(wsPortFlag.Name)), apis, cfg.ProverManagerConfig.CompressionLevel)
|
||||
if err != nil {
|
||||
log.Crit("Could not start WS api", "error", err)
|
||||
}
|
||||
|
||||
@@ -79,9 +79,9 @@ func (c *CoordinatorApp) MockConfig(store bool) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Reset roller manager config for manager test cases.
|
||||
cfg.RollerManagerConfig = &coordinatorConfig.RollerManagerConfig{
|
||||
RollersPerSession: 1,
|
||||
// Reset prover manager config for manager test cases.
|
||||
cfg.ProverManagerConfig = &coordinatorConfig.ProverManagerConfig{
|
||||
ProversPerSession: 1,
|
||||
Verifier: &coordinatorConfig.VerifierConfig{MockMode: true},
|
||||
CollectionTime: 1,
|
||||
TokenTimeToLive: 1,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"roller_manager_config": {
|
||||
"prover_manager_config": {
|
||||
"compression_level": 9,
|
||||
"rollers_per_session": 1,
|
||||
"provers_per_session": 1,
|
||||
"session_attempts": 2,
|
||||
"collection_time": 180,
|
||||
"token_time_to_live": 60,
|
||||
@@ -11,7 +11,8 @@
|
||||
"agg_vk_path": ""
|
||||
},
|
||||
"max_verifier_workers": 10,
|
||||
"order_session": "ASC"
|
||||
"order_session": "ASC",
|
||||
"jwt_secret": "jwt"
|
||||
},
|
||||
"db_config": {
|
||||
"driver_name": "postgres",
|
||||
|
||||
@@ -5,7 +5,6 @@ go 1.19
|
||||
require (
|
||||
github.com/agiledragon/gomonkey/v2 v2.9.0
|
||||
github.com/orcaman/concurrent-map v1.0.0
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/scroll-tech/go-ethereum v1.10.14-0.20230613025759-f055f50f9d56
|
||||
github.com/shopspring/decimal v1.3.1
|
||||
github.com/stretchr/testify v1.8.3
|
||||
|
||||
@@ -63,8 +63,6 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
|
||||
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
|
||||
github.com/orcaman/concurrent-map v1.0.0 h1:I/2A2XPCb4IuQWcQhBhSwGfiuybl/J0ev9HDbW65HOY=
|
||||
github.com/orcaman/concurrent-map v1.0.0/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
|
||||
@@ -15,13 +15,13 @@ const (
|
||||
defaultNumberOfSessionRetryAttempts = 2
|
||||
)
|
||||
|
||||
// RollerManagerConfig loads sequencer configuration items.
|
||||
type RollerManagerConfig struct {
|
||||
// ProverManagerConfig loads sequencer configuration items.
|
||||
type ProverManagerConfig struct {
|
||||
CompressionLevel int `json:"compression_level,omitempty"`
|
||||
// asc or desc (default: asc)
|
||||
OrderSession string `json:"order_session,omitempty"`
|
||||
// The amount of rollers to pick per proof generation session.
|
||||
RollersPerSession uint8 `json:"rollers_per_session"`
|
||||
// The amount of provers to pick per proof generation session.
|
||||
ProversPerSession uint8 `json:"provers_per_session"`
|
||||
// Number of attempts that a session can be retried if previous attempts failed.
|
||||
// Currently we only consider proving timeout as failure here.
|
||||
SessionAttempts uint8 `json:"session_attempts,omitempty"`
|
||||
@@ -29,10 +29,12 @@ type RollerManagerConfig struct {
|
||||
Verifier *VerifierConfig `json:"verifier,omitempty"`
|
||||
// Proof collection time (in minutes).
|
||||
CollectionTime int `json:"collection_time"`
|
||||
// Token time to live (in seconds)
|
||||
// Token time to live (in Seconds)
|
||||
TokenTimeToLive int `json:"token_time_to_live"`
|
||||
// Max number of workers in verifier worker pool
|
||||
MaxVerifierWorkers int `json:"max_verifier_workers,omitempty"`
|
||||
// jwt secret
|
||||
JwtSecret string `json:"jwt_secret"`
|
||||
}
|
||||
|
||||
// L2Config loads l2geth configuration items.
|
||||
@@ -43,7 +45,7 @@ type L2Config struct {
|
||||
|
||||
// Config load configuration items.
|
||||
type Config struct {
|
||||
RollerManagerConfig *RollerManagerConfig `json:"roller_manager_config"`
|
||||
ProverManagerConfig *ProverManagerConfig `json:"prover_manager_config"`
|
||||
DBConfig *database.Config `json:"db_config"`
|
||||
L2Config *L2Config `json:"l2_config"`
|
||||
}
|
||||
@@ -68,18 +70,18 @@ func NewConfig(file string) (*Config, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Check roller's order session
|
||||
order := strings.ToUpper(cfg.RollerManagerConfig.OrderSession)
|
||||
// Check prover's order session
|
||||
order := strings.ToUpper(cfg.ProverManagerConfig.OrderSession)
|
||||
if len(order) > 0 && !(order == "ASC" || order == "DESC") {
|
||||
return nil, errors.New("roller config's order session is invalid")
|
||||
return nil, errors.New("prover manager config's order session is invalid")
|
||||
}
|
||||
cfg.RollerManagerConfig.OrderSession = order
|
||||
cfg.ProverManagerConfig.OrderSession = order
|
||||
|
||||
if cfg.RollerManagerConfig.MaxVerifierWorkers == 0 {
|
||||
cfg.RollerManagerConfig.MaxVerifierWorkers = defaultNumberOfVerifierWorkers
|
||||
if cfg.ProverManagerConfig.MaxVerifierWorkers == 0 {
|
||||
cfg.ProverManagerConfig.MaxVerifierWorkers = defaultNumberOfVerifierWorkers
|
||||
}
|
||||
if cfg.RollerManagerConfig.SessionAttempts == 0 {
|
||||
cfg.RollerManagerConfig.SessionAttempts = defaultNumberOfSessionRetryAttempts
|
||||
if cfg.ProverManagerConfig.SessionAttempts == 0 {
|
||||
cfg.ProverManagerConfig.SessionAttempts = defaultNumberOfSessionRetryAttempts
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
|
||||
@@ -12,9 +12,9 @@ import (
|
||||
|
||||
func TestConfig(t *testing.T) {
|
||||
configTemplate := `{
|
||||
"roller_manager_config": {
|
||||
"prover_manager_config": {
|
||||
"compression_level": 9,
|
||||
"rollers_per_session": 1,
|
||||
"provers_per_session": 1,
|
||||
"session_attempts": %d,
|
||||
"collection_time": 180,
|
||||
"token_time_to_live": 60,
|
||||
@@ -100,7 +100,7 @@ func TestConfig(t *testing.T) {
|
||||
|
||||
_, err = NewConfig(tmpFile.Name())
|
||||
assert.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "roller config's order session is invalid")
|
||||
assert.Contains(t, err.Error(), "prover manager config's order session is invalid")
|
||||
})
|
||||
|
||||
t.Run("Default MaxVerifierWorkers", func(t *testing.T) {
|
||||
@@ -116,7 +116,7 @@ func TestConfig(t *testing.T) {
|
||||
|
||||
cfg, err := NewConfig(tmpFile.Name())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, defaultNumberOfVerifierWorkers, cfg.RollerManagerConfig.MaxVerifierWorkers)
|
||||
assert.Equal(t, defaultNumberOfVerifierWorkers, cfg.ProverManagerConfig.MaxVerifierWorkers)
|
||||
})
|
||||
|
||||
t.Run("Default SessionAttempts", func(t *testing.T) {
|
||||
@@ -132,6 +132,6 @@ func TestConfig(t *testing.T) {
|
||||
|
||||
cfg, err := NewConfig(tmpFile.Name())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, uint8(defaultNumberOfSessionRetryAttempts), cfg.RollerManagerConfig.SessionAttempts)
|
||||
assert.Equal(t, uint8(defaultNumberOfSessionRetryAttempts), cfg.ProverManagerConfig.SessionAttempts)
|
||||
})
|
||||
}
|
||||
|
||||
94
coordinator/internal/controller/api/prover.go
Normal file
94
coordinator/internal/controller/api/prover.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"github.com/scroll-tech/go-ethereum/rpc"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"scroll-tech/common/types/message"
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/logic/proof"
|
||||
)
|
||||
|
||||
// ProverController the prover api controller
|
||||
type ProverController struct {
|
||||
proofReceiver *proof.ZKProofReceiver
|
||||
taskWorker *proof.TaskWorker
|
||||
tokenExpire time.Duration
|
||||
jwtSecret []byte
|
||||
}
|
||||
|
||||
// NewProverController create a prover controller
|
||||
func NewProverController(cfg *config.ProverManagerConfig, db *gorm.DB) *ProverController {
|
||||
tokenExpire := time.Duration(cfg.TokenTimeToLive) * time.Second
|
||||
return &ProverController{
|
||||
proofReceiver: proof.NewZKProofReceiver(cfg, db),
|
||||
taskWorker: proof.NewTaskWorker(),
|
||||
tokenExpire: tokenExpire,
|
||||
jwtSecret: []byte(cfg.JwtSecret),
|
||||
}
|
||||
}
|
||||
|
||||
// RequestToken get request token of authMsg
|
||||
func (r *ProverController) RequestToken(authMsg *message.AuthMsg) (string, error) {
|
||||
if ok, err := authMsg.Verify(); !ok {
|
||||
if err != nil {
|
||||
log.Error("failed to verify auth message", "error", err)
|
||||
}
|
||||
return "", errors.New("signature verification failed")
|
||||
}
|
||||
token, err := message.GenerateToken(r.tokenExpire, r.jwtSecret)
|
||||
if err != nil {
|
||||
return "", errors.New("token generation failed")
|
||||
}
|
||||
return token, nil
|
||||
}
|
||||
|
||||
// VerifyToken verifies JWT for token and expiration time
|
||||
func (r *ProverController) verifyToken(tokenStr string) (bool, error) {
|
||||
return message.VerifyToken(r.jwtSecret, tokenStr)
|
||||
}
|
||||
|
||||
// Register register api for prover
|
||||
func (r *ProverController) Register(ctx context.Context, authMsg *message.AuthMsg) (*rpc.Subscription, error) {
|
||||
// Verify register message.
|
||||
if ok, err := authMsg.Verify(); !ok {
|
||||
if err != nil {
|
||||
log.Error("failed to verify auth message", "error", err)
|
||||
}
|
||||
return nil, errors.New("signature verification failed")
|
||||
}
|
||||
// Verify the jwt
|
||||
if ok, err := r.verifyToken(authMsg.Identity.Token); !ok {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rpcSub, err := r.taskWorker.AllocTaskWorker(ctx, authMsg)
|
||||
if err != nil {
|
||||
return rpcSub, err
|
||||
}
|
||||
return rpcSub, nil
|
||||
}
|
||||
|
||||
// SubmitProof prover pull proof
|
||||
func (r *ProverController) SubmitProof(proof *message.ProofMsg) error {
|
||||
// Verify the signature
|
||||
if ok, err := proof.Verify(); !ok {
|
||||
if err != nil {
|
||||
log.Error("failed to verify proof message", "error", err)
|
||||
}
|
||||
return errors.New("auth signature verify fail")
|
||||
}
|
||||
|
||||
err := r.proofReceiver.HandleZkProof(context.Background(), proof)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/logic/proof"
|
||||
"scroll-tech/coordinator/internal/logic/rollermanager"
|
||||
"scroll-tech/coordinator/internal/logic/provermanager"
|
||||
"scroll-tech/coordinator/internal/logic/verifier"
|
||||
"scroll-tech/coordinator/internal/orm"
|
||||
coordinatorType "scroll-tech/coordinator/internal/types"
|
||||
@@ -30,7 +30,7 @@ import (
|
||||
func geneAuthMsg(t *testing.T) (*message.AuthMsg, *ecdsa.PrivateKey) {
|
||||
authMsg := &message.AuthMsg{
|
||||
Identity: &message.Identity{
|
||||
Name: "roller_test1",
|
||||
Name: "prover_test1",
|
||||
},
|
||||
}
|
||||
privKey, err := crypto.GenerateKey()
|
||||
@@ -39,37 +39,33 @@ func geneAuthMsg(t *testing.T) (*message.AuthMsg, *ecdsa.PrivateKey) {
|
||||
return authMsg, privKey
|
||||
}
|
||||
|
||||
var rollerController *RollerController
|
||||
var proverController *ProverController
|
||||
|
||||
func init() {
|
||||
conf := &config.RollerManagerConfig{
|
||||
conf := &config.ProverManagerConfig{
|
||||
TokenTimeToLive: 120,
|
||||
}
|
||||
conf.Verifier = &config.VerifierConfig{MockMode: true}
|
||||
rollerController = NewRollerController(conf, nil)
|
||||
proverController = NewProverController(conf, nil)
|
||||
}
|
||||
|
||||
func TestRoller_RequestToken(t *testing.T) {
|
||||
func TestProver_RequestToken(t *testing.T) {
|
||||
convey.Convey("auth msg verify failure", t, func() {
|
||||
tmpAuthMsg := &message.AuthMsg{
|
||||
Identity: &message.Identity{
|
||||
Name: "roller_test_request_token",
|
||||
Name: "prover_test_request_token",
|
||||
},
|
||||
}
|
||||
token, err := rollerController.RequestToken(tmpAuthMsg)
|
||||
token, err := proverController.RequestToken(tmpAuthMsg)
|
||||
assert.Error(t, err)
|
||||
assert.Empty(t, token)
|
||||
})
|
||||
|
||||
convey.Convey("token has already been distributed", t, func() {
|
||||
tmpAuthMsg, _ := geneAuthMsg(t)
|
||||
key, err := tmpAuthMsg.PublicKey()
|
||||
token, err := proverController.RequestToken(tmpAuthMsg)
|
||||
assert.NoError(t, err)
|
||||
tokenCacheStored := "c393987bb791dd285dd3d8ffbd770ed1"
|
||||
rollerController.tokenCache.Set(key, tokenCacheStored, time.Hour)
|
||||
token, err := rollerController.RequestToken(tmpAuthMsg)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, token, tokenCacheStored)
|
||||
t.Log("token is ", token)
|
||||
})
|
||||
|
||||
convey.Convey("token generation failure", t, func() {
|
||||
@@ -78,7 +74,7 @@ func TestRoller_RequestToken(t *testing.T) {
|
||||
return "", errors.New("token generation failed")
|
||||
})
|
||||
defer patchGuard.Reset()
|
||||
token, err := rollerController.RequestToken(tmpAuthMsg)
|
||||
token, err := proverController.RequestToken(tmpAuthMsg)
|
||||
assert.Error(t, err)
|
||||
assert.Empty(t, token)
|
||||
})
|
||||
@@ -90,45 +86,45 @@ func TestRoller_RequestToken(t *testing.T) {
|
||||
return tokenCacheStored, nil
|
||||
})
|
||||
defer patchGuard.Reset()
|
||||
token, err := rollerController.RequestToken(tmpAuthMsg)
|
||||
token, err := proverController.RequestToken(tmpAuthMsg)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tokenCacheStored, token)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRoller_Register(t *testing.T) {
|
||||
func TestProver_Register(t *testing.T) {
|
||||
convey.Convey("auth msg verify failure", t, func() {
|
||||
tmpAuthMsg := &message.AuthMsg{
|
||||
Identity: &message.Identity{
|
||||
Name: "roller_test_register",
|
||||
Name: "prover_test_register",
|
||||
},
|
||||
}
|
||||
subscription, err := rollerController.Register(context.Background(), tmpAuthMsg)
|
||||
subscription, err := proverController.Register(context.Background(), tmpAuthMsg)
|
||||
assert.Error(t, err)
|
||||
assert.Empty(t, subscription)
|
||||
})
|
||||
|
||||
convey.Convey("verify token failure", t, func() {
|
||||
tmpAuthMsg, _ := geneAuthMsg(t)
|
||||
patchGuard := gomonkey.ApplyPrivateMethod(rollerController, "verifyToken", func(tmpAuthMsg *message.AuthMsg) (bool, error) {
|
||||
patchGuard := gomonkey.ApplyPrivateMethod(proverController, "verifyToken", func(tmpAuthMsg *message.AuthMsg) (bool, error) {
|
||||
return false, errors.New("verify token failure")
|
||||
})
|
||||
defer patchGuard.Reset()
|
||||
subscription, err := rollerController.Register(context.Background(), tmpAuthMsg)
|
||||
subscription, err := proverController.Register(context.Background(), tmpAuthMsg)
|
||||
assert.Error(t, err)
|
||||
assert.Empty(t, subscription)
|
||||
})
|
||||
|
||||
convey.Convey("notifier failure", t, func() {
|
||||
tmpAuthMsg, _ := geneAuthMsg(t)
|
||||
patchGuard := gomonkey.ApplyPrivateMethod(rollerController, "verifyToken", func(tmpAuthMsg *message.AuthMsg) (bool, error) {
|
||||
patchGuard := gomonkey.ApplyPrivateMethod(proverController, "verifyToken", func(tmpAuthMsg *message.AuthMsg) (bool, error) {
|
||||
return true, nil
|
||||
})
|
||||
defer patchGuard.Reset()
|
||||
patchGuard.ApplyFunc(rpc.NotifierFromContext, func(ctx context.Context) (*rpc.Notifier, bool) {
|
||||
return nil, false
|
||||
})
|
||||
subscription, err := rollerController.Register(context.Background(), tmpAuthMsg)
|
||||
subscription, err := proverController.Register(context.Background(), tmpAuthMsg)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, err, rpc.ErrNotificationsUnsupported)
|
||||
assert.Equal(t, *subscription, rpc.Subscription{})
|
||||
@@ -136,7 +132,7 @@ func TestRoller_Register(t *testing.T) {
|
||||
|
||||
convey.Convey("register failure", t, func() {
|
||||
tmpAuthMsg, _ := geneAuthMsg(t)
|
||||
patchGuard := gomonkey.ApplyPrivateMethod(rollerController, "verifyToken", func(tmpAuthMsg *message.AuthMsg) (bool, error) {
|
||||
patchGuard := gomonkey.ApplyPrivateMethod(proverController, "verifyToken", func(tmpAuthMsg *message.AuthMsg) (bool, error) {
|
||||
return true, nil
|
||||
})
|
||||
defer patchGuard.Reset()
|
||||
@@ -145,14 +141,14 @@ func TestRoller_Register(t *testing.T) {
|
||||
patchGuard.ApplyPrivateMethod(taskWorker, "AllocTaskWorker", func(ctx context.Context, authMsg *message.AuthMsg) (*rpc.Subscription, error) {
|
||||
return nil, errors.New("register error")
|
||||
})
|
||||
subscription, err := rollerController.Register(context.Background(), tmpAuthMsg)
|
||||
subscription, err := proverController.Register(context.Background(), tmpAuthMsg)
|
||||
assert.Error(t, err)
|
||||
assert.Empty(t, subscription)
|
||||
})
|
||||
|
||||
convey.Convey("register success", t, func() {
|
||||
tmpAuthMsg, _ := geneAuthMsg(t)
|
||||
patchGuard := gomonkey.ApplyPrivateMethod(rollerController, "verifyToken", func(tmpAuthMsg *message.AuthMsg) (bool, error) {
|
||||
patchGuard := gomonkey.ApplyPrivateMethod(proverController, "verifyToken", func(tmpAuthMsg *message.AuthMsg) (bool, error) {
|
||||
return true, nil
|
||||
})
|
||||
defer patchGuard.Reset()
|
||||
@@ -161,17 +157,17 @@ func TestRoller_Register(t *testing.T) {
|
||||
patchGuard.ApplyPrivateMethod(taskWorker, "AllocTaskWorker", func(ctx context.Context, authMsg *message.AuthMsg) (*rpc.Subscription, error) {
|
||||
return nil, nil
|
||||
})
|
||||
_, err := rollerController.Register(context.Background(), tmpAuthMsg)
|
||||
_, err := proverController.Register(context.Background(), tmpAuthMsg)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRoller_SubmitProof(t *testing.T) {
|
||||
func TestProver_SubmitProof(t *testing.T) {
|
||||
tmpAuthMsg, prvKey := geneAuthMsg(t)
|
||||
pubKey, err := tmpAuthMsg.PublicKey()
|
||||
assert.NoError(t, err)
|
||||
|
||||
id := "rollers_info_test"
|
||||
id := "provers_info_test"
|
||||
tmpProof := &message.ProofMsg{
|
||||
ProofDetail: &message.ProofDetail{
|
||||
Type: message.ProofTypeChunk,
|
||||
@@ -191,9 +187,9 @@ func TestRoller_SubmitProof(t *testing.T) {
|
||||
})
|
||||
defer patchGuard.Reset()
|
||||
|
||||
rollermanager.InitRollerManager(nil)
|
||||
provermanager.InitProverManager(nil)
|
||||
|
||||
taskChan, err := rollermanager.Manager.Register(context.Background(), pubKey, tmpAuthMsg.Identity)
|
||||
taskChan, err := provermanager.Manager.Register(context.Background(), pubKey, tmpAuthMsg.Identity)
|
||||
assert.NotNil(t, taskChan)
|
||||
assert.NoError(t, err)
|
||||
|
||||
@@ -202,7 +198,7 @@ func TestRoller_SubmitProof(t *testing.T) {
|
||||
patchGuard.ApplyMethodFunc(s, "Verify", func() (bool, error) {
|
||||
return false, errors.New("proof verify error")
|
||||
})
|
||||
err = rollerController.SubmitProof(tmpProof)
|
||||
err = proverController.SubmitProof(tmpProof)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@@ -227,7 +223,7 @@ func TestRoller_SubmitProof(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
|
||||
convey.Convey("get none rollers of prover task", t, func() {
|
||||
convey.Convey("get none provers of prover task", t, func() {
|
||||
patchGuard.ApplyMethodFunc(proverTaskOrm, "GetProverTaskByTaskIDAndPubKey", func(ctx context.Context, hash, pubKey string) (*orm.ProverTask, error) {
|
||||
return nil, nil
|
||||
})
|
||||
@@ -243,7 +239,7 @@ func TestRoller_SubmitProof(t *testing.T) {
|
||||
tmpProof1.Sign(privKey)
|
||||
_, err1 := tmpProof1.PublicKey()
|
||||
assert.NoError(t, err1)
|
||||
err2 := rollerController.SubmitProof(tmpProof1)
|
||||
err2 := proverController.SubmitProof(tmpProof1)
|
||||
fmt.Println(err2)
|
||||
targetErr := fmt.Errorf("validator failure get none prover task for the proof")
|
||||
assert.Equal(t, err2.Error(), targetErr.Error())
|
||||
@@ -255,23 +251,23 @@ func TestRoller_SubmitProof(t *testing.T) {
|
||||
TaskID: id,
|
||||
ProverPublicKey: proofPubKey,
|
||||
TaskType: int16(message.ProofTypeChunk),
|
||||
ProverName: "rollers_info_test",
|
||||
ProvingStatus: int16(types.RollerAssigned),
|
||||
ProverName: "provers_info_test",
|
||||
ProvingStatus: int16(types.ProverAssigned),
|
||||
CreatedAt: now,
|
||||
}
|
||||
return s, nil
|
||||
})
|
||||
|
||||
patchGuard.ApplyMethodFunc(proverTaskOrm, "UpdateProverTaskProvingStatus", func(ctx context.Context, proofType message.ProofType, taskID string, pk string, status types.RollerProveStatus, dbTX ...*gorm.DB) error {
|
||||
patchGuard.ApplyMethodFunc(proverTaskOrm, "UpdateProverTaskProvingStatus", func(ctx context.Context, proofType message.ProofType, taskID string, pk string, status types.ProverProveStatus, dbTX ...*gorm.DB) error {
|
||||
return nil
|
||||
})
|
||||
|
||||
patchGuard.ApplyPrivateMethod(rollerController.proofReceiver, "proofFailure", func(hash string, pubKey string, proofMsgType message.ProofType) {
|
||||
patchGuard.ApplyPrivateMethod(proverController.proofReceiver, "proofFailure", func(hash string, pubKey string, proofMsgType message.ProofType) {
|
||||
})
|
||||
|
||||
convey.Convey("proof msg status is not ok", t, func() {
|
||||
tmpProof.Status = message.StatusProofError
|
||||
err1 := rollerController.SubmitProof(tmpProof)
|
||||
err1 := proverController.SubmitProof(tmpProof)
|
||||
assert.NoError(t, err1)
|
||||
})
|
||||
tmpProof.Status = message.StatusOk
|
||||
@@ -287,7 +283,7 @@ func TestRoller_SubmitProof(t *testing.T) {
|
||||
patchGuard.ApplyMethodFunc(tmpVerifier, "VerifyProof", func(proof *message.AggProof) (bool, error) {
|
||||
return false, targetErr
|
||||
})
|
||||
err1 := rollerController.SubmitProof(tmpProof)
|
||||
err1 := proverController.SubmitProof(tmpProof)
|
||||
assert.Nil(t, err1)
|
||||
})
|
||||
|
||||
@@ -295,10 +291,10 @@ func TestRoller_SubmitProof(t *testing.T) {
|
||||
return true, nil
|
||||
})
|
||||
|
||||
patchGuard.ApplyPrivateMethod(rollerController.proofReceiver, "closeProofTask", func(hash string, pubKey string, proofMsg *message.ProofMsg, rollersInfo *coordinatorType.RollersInfo) error {
|
||||
patchGuard.ApplyPrivateMethod(proverController.proofReceiver, "closeProofTask", func(hash string, pubKey string, proofMsg *message.ProofMsg, proversInfo *coordinatorType.ProversInfo) error {
|
||||
return nil
|
||||
})
|
||||
|
||||
err1 := rollerController.SubmitProof(tmpProof)
|
||||
err1 := proverController.SubmitProof(tmpProof)
|
||||
assert.Nil(t, err1)
|
||||
}
|
||||
@@ -1,115 +0,0 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/patrickmn/go-cache"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"github.com/scroll-tech/go-ethereum/rpc"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"scroll-tech/common/types/message"
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/logic/proof"
|
||||
)
|
||||
|
||||
// RollerController the roller api controller
|
||||
type RollerController struct {
|
||||
tokenCache *cache.Cache
|
||||
proofReceiver *proof.ZKProofReceiver
|
||||
taskWorker *proof.TaskWorker
|
||||
}
|
||||
|
||||
// NewRollerController create a roller controller
|
||||
func NewRollerController(cfg *config.RollerManagerConfig, db *gorm.DB) *RollerController {
|
||||
return &RollerController{
|
||||
proofReceiver: proof.NewZKProofReceiver(cfg, db),
|
||||
taskWorker: proof.NewTaskWorker(),
|
||||
tokenCache: cache.New(time.Duration(cfg.TokenTimeToLive)*time.Second, 1*time.Hour),
|
||||
}
|
||||
}
|
||||
|
||||
// RequestToken get request token of authMsg
|
||||
func (r *RollerController) RequestToken(authMsg *message.AuthMsg) (string, error) {
|
||||
if ok, err := authMsg.Verify(); !ok {
|
||||
if err != nil {
|
||||
log.Error("failed to verify auth message", "error", err)
|
||||
}
|
||||
return "", errors.New("signature verification failed")
|
||||
}
|
||||
pubkey, err := authMsg.PublicKey()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("RequestToken auth msg public key error:%w", err)
|
||||
}
|
||||
if token, ok := r.tokenCache.Get(pubkey); ok {
|
||||
return token.(string), nil
|
||||
}
|
||||
token, err := message.GenerateToken()
|
||||
if err != nil {
|
||||
return "", errors.New("token generation failed")
|
||||
}
|
||||
r.tokenCache.SetDefault(pubkey, token)
|
||||
return token, nil
|
||||
}
|
||||
|
||||
// VerifyToken verifies pubkey for token and expiration time
|
||||
func (r *RollerController) verifyToken(authMsg *message.AuthMsg) (bool, error) {
|
||||
pubkey, err := authMsg.PublicKey()
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("verify token auth msg public key error:%w", err)
|
||||
}
|
||||
// GetValue returns nil if value is expired
|
||||
if token, ok := r.tokenCache.Get(pubkey); !ok || token != authMsg.Identity.Token {
|
||||
return false, fmt.Errorf("failed to find corresponding token. roller name: %s roller pk: %s", authMsg.Identity.Name, pubkey)
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Register register api for roller
|
||||
func (r *RollerController) Register(ctx context.Context, authMsg *message.AuthMsg) (*rpc.Subscription, error) {
|
||||
// Verify register message.
|
||||
if ok, err := authMsg.Verify(); !ok {
|
||||
if err != nil {
|
||||
log.Error("failed to verify auth message", "error", err)
|
||||
}
|
||||
return nil, errors.New("signature verification failed")
|
||||
}
|
||||
// Lock here to avoid malicious roller message replay before cleanup of token
|
||||
if ok, err := r.verifyToken(authMsg); !ok {
|
||||
return nil, err
|
||||
}
|
||||
pubkey, err := authMsg.PublicKey()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("register auth msg public key error:%w", err)
|
||||
}
|
||||
// roller successfully registered, remove token associated with this roller
|
||||
r.tokenCache.Delete(pubkey)
|
||||
|
||||
rpcSub, err := r.taskWorker.AllocTaskWorker(ctx, authMsg)
|
||||
if err != nil {
|
||||
return rpcSub, err
|
||||
}
|
||||
return rpcSub, nil
|
||||
}
|
||||
|
||||
// SubmitProof roller pull proof
|
||||
func (r *RollerController) SubmitProof(proof *message.ProofMsg) error {
|
||||
// Verify the signature
|
||||
if ok, err := proof.Verify(); !ok {
|
||||
if err != nil {
|
||||
log.Error("failed to verify proof message", "error", err)
|
||||
}
|
||||
return errors.New("auth signature verify fail")
|
||||
}
|
||||
|
||||
err := r.proofReceiver.HandleZkProof(context.Background(), proof)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -11,8 +11,8 @@ import (
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
)
|
||||
|
||||
// RollerAPI for rollers inorder to register and submit proof
|
||||
type RollerAPI interface {
|
||||
// ProverAPI for provers inorder to register and submit proof
|
||||
type ProverAPI interface {
|
||||
RequestToken(authMsg *message.AuthMsg) (string, error)
|
||||
Register(ctx context.Context, authMsg *message.AuthMsg) (*rpc.Subscription, error)
|
||||
SubmitProof(proof *message.ProofMsg) error
|
||||
@@ -22,8 +22,8 @@ type RollerAPI interface {
|
||||
func RegisterAPIs(cfg *config.Config, db *gorm.DB) []rpc.API {
|
||||
return []rpc.API{
|
||||
{
|
||||
Namespace: "roller",
|
||||
Service: RollerAPI(NewRollerController(cfg.RollerManagerConfig, db)),
|
||||
Namespace: "prover",
|
||||
Service: ProverAPI(NewProverController(cfg.ProverManagerConfig, db)),
|
||||
Public: true,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -114,16 +114,16 @@ func (c *Collector) timeoutProofTask() {
|
||||
}
|
||||
|
||||
for _, assignedProverTask := range assignedProverTasks {
|
||||
timeoutDuration := time.Duration(c.cfg.RollerManagerConfig.CollectionTime) * time.Minute
|
||||
timeoutDuration := time.Duration(c.cfg.ProverManagerConfig.CollectionTime) * time.Minute
|
||||
// here not update the block batch proving status failed, because the collector loop will check
|
||||
// the attempt times. if reach the times, the collector will set the block batch proving status.
|
||||
if time.Since(assignedProverTask.AssignedAt) >= timeoutDuration {
|
||||
log.Warn("proof task have reach the timeout", "task id", assignedProverTask.TaskID,
|
||||
"prover public key", assignedProverTask.ProverPublicKey, "prover name", assignedProverTask.ProverName, "task type", assignedProverTask.TaskType)
|
||||
err = c.db.Transaction(func(tx *gorm.DB) error {
|
||||
// update prover task proving status as RollerProofInvalid
|
||||
// update prover task proving status as ProverProofInvalid
|
||||
if err = c.proverTaskOrm.UpdateProverTaskProvingStatus(c.ctx, message.ProofType(assignedProverTask.TaskType),
|
||||
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.RollerProofInvalid, tx); err != nil {
|
||||
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverProofInvalid, tx); err != nil {
|
||||
log.Error("update prover task proving status failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
"scroll-tech/common/utils"
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/logic/rollermanager"
|
||||
"scroll-tech/coordinator/internal/logic/provermanager"
|
||||
"scroll-tech/coordinator/internal/orm"
|
||||
coordinatorType "scroll-tech/coordinator/internal/types"
|
||||
)
|
||||
@@ -59,15 +59,15 @@ func (bp *BatchProofCollector) Collect(ctx context.Context) error {
|
||||
batchTask := batchTasks[0]
|
||||
log.Info("start batch proof generation session", "id", batchTask.Hash)
|
||||
|
||||
if rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeBatch) == 0 {
|
||||
return fmt.Errorf("no idle common roller when starting proof generation session, id:%s", batchTask.Hash)
|
||||
if provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeBatch) == 0 {
|
||||
return fmt.Errorf("no idle common prover when starting proof generation session, id:%s", batchTask.Hash)
|
||||
}
|
||||
|
||||
if !bp.checkAttemptsExceeded(batchTask.Hash, message.ProofTypeBatch) {
|
||||
return fmt.Errorf("the batch task id:%s check attempts have reach the maximum", batchTask.Hash)
|
||||
}
|
||||
|
||||
rollerStatusList, err := bp.sendTask(ctx, batchTask.Hash)
|
||||
proverStatusList, err := bp.sendTask(ctx, batchTask.Hash)
|
||||
if err != nil {
|
||||
return fmt.Errorf("send batch task id:%s err:%w", batchTask.Hash, err)
|
||||
}
|
||||
@@ -78,13 +78,13 @@ func (bp *BatchProofCollector) Collect(ctx context.Context) error {
|
||||
return fmt.Errorf("failed to update task status, id:%s, error:%w", batchTask.Hash, err)
|
||||
}
|
||||
|
||||
for _, rollerStatus := range rollerStatusList {
|
||||
for _, proverStatus := range proverStatusList {
|
||||
proverTask := orm.ProverTask{
|
||||
TaskID: batchTask.Hash,
|
||||
ProverPublicKey: rollerStatus.PublicKey,
|
||||
ProverPublicKey: proverStatus.PublicKey,
|
||||
TaskType: int16(message.ProofTypeBatch),
|
||||
ProverName: rollerStatus.Name,
|
||||
ProvingStatus: int16(types.RollerAssigned),
|
||||
ProverName: proverStatus.Name,
|
||||
ProvingStatus: int16(types.ProverAssigned),
|
||||
FailureType: int16(types.ProverTaskFailureTypeUndefined),
|
||||
// here why need use UTC time. see scroll/common/databased/db.go
|
||||
AssignedAt: utils.NowUTC(),
|
||||
@@ -100,7 +100,7 @@ func (bp *BatchProofCollector) Collect(ctx context.Context) error {
|
||||
return transErr
|
||||
}
|
||||
|
||||
func (bp *BatchProofCollector) sendTask(ctx context.Context, taskID string) ([]*coordinatorType.RollerStatus, error) {
|
||||
func (bp *BatchProofCollector) sendTask(ctx context.Context, taskID string) ([]*coordinatorType.ProverStatus, error) {
|
||||
// get chunk proofs from db
|
||||
chunkProofs, err := bp.chunkOrm.GetProofsByBatchHash(ctx, taskID)
|
||||
if err != nil {
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
"scroll-tech/common/utils"
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/logic/rollermanager"
|
||||
"scroll-tech/coordinator/internal/logic/provermanager"
|
||||
"scroll-tech/coordinator/internal/orm"
|
||||
coordinatorType "scroll-tech/coordinator/internal/types"
|
||||
)
|
||||
@@ -66,11 +66,11 @@ func (cp *ChunkProofCollector) Collect(ctx context.Context) error {
|
||||
return fmt.Errorf("chunk proof hash id:%s check attempts have reach the maximum", chunkTask.Hash)
|
||||
}
|
||||
|
||||
if rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeChunk) == 0 {
|
||||
return fmt.Errorf("no idle chunk roller when starting proof generation session, id:%s", chunkTask.Hash)
|
||||
if provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeChunk) == 0 {
|
||||
return fmt.Errorf("no idle chunk prover when starting proof generation session, id:%s", chunkTask.Hash)
|
||||
}
|
||||
|
||||
rollerStatusList, err := cp.sendTask(ctx, chunkTask.Hash)
|
||||
proverStatusList, err := cp.sendTask(ctx, chunkTask.Hash)
|
||||
if err != nil {
|
||||
return fmt.Errorf("send task failure, id:%s error:%w", chunkTask.Hash, err)
|
||||
}
|
||||
@@ -82,19 +82,19 @@ func (cp *ChunkProofCollector) Collect(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, rollerStatus := range rollerStatusList {
|
||||
for _, proverStatus := range proverStatusList {
|
||||
proverTask := orm.ProverTask{
|
||||
TaskID: chunkTask.Hash,
|
||||
ProverPublicKey: rollerStatus.PublicKey,
|
||||
ProverPublicKey: proverStatus.PublicKey,
|
||||
TaskType: int16(message.ProofTypeChunk),
|
||||
ProverName: rollerStatus.Name,
|
||||
ProvingStatus: int16(types.RollerAssigned),
|
||||
ProverName: proverStatus.Name,
|
||||
ProvingStatus: int16(types.ProverAssigned),
|
||||
FailureType: int16(types.ProverTaskFailureTypeUndefined),
|
||||
// here why need use UTC time. see scroll/common/databased/db.go
|
||||
AssignedAt: utils.NowUTC(),
|
||||
}
|
||||
if err = cp.proverTaskOrm.SetProverTask(ctx, &proverTask, tx); err != nil {
|
||||
return fmt.Errorf("db set session info fail, session id:%s , public key:%s, err:%w", chunkTask.Hash, rollerStatus.PublicKey, err)
|
||||
return fmt.Errorf("db set session info fail, session id:%s , public key:%s, err:%w", chunkTask.Hash, proverStatus.PublicKey, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -102,7 +102,7 @@ func (cp *ChunkProofCollector) Collect(ctx context.Context) error {
|
||||
return transErr
|
||||
}
|
||||
|
||||
func (cp *ChunkProofCollector) sendTask(ctx context.Context, hash string) ([]*coordinatorType.RollerStatus, error) {
|
||||
func (cp *ChunkProofCollector) sendTask(ctx context.Context, hash string) ([]*coordinatorType.ProverStatus, error) {
|
||||
// Get block hashes.
|
||||
wrappedBlocks, err := cp.blockOrm.GetL2BlocksByChunkHash(ctx, hash)
|
||||
if err != nil {
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
"scroll-tech/common/types/message"
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/logic/rollermanager"
|
||||
"scroll-tech/coordinator/internal/logic/provermanager"
|
||||
"scroll-tech/coordinator/internal/orm"
|
||||
coordinatorType "scroll-tech/coordinator/internal/types"
|
||||
)
|
||||
@@ -56,14 +56,14 @@ func (b *BaseCollector) checkAttemptsExceeded(hash string, taskType message.Proo
|
||||
return true
|
||||
}
|
||||
|
||||
if len(proverTasks) >= int(b.cfg.RollerManagerConfig.SessionAttempts) {
|
||||
if len(proverTasks) >= int(b.cfg.ProverManagerConfig.SessionAttempts) {
|
||||
coordinatorSessionsTimeoutTotalCounter.Inc(1)
|
||||
|
||||
log.Warn("proof generation prover task %s ended because reach the max attempts", hash)
|
||||
|
||||
for _, proverTask := range proverTasks {
|
||||
if types.ProvingStatus(proverTask.ProvingStatus) == types.ProvingTaskFailed {
|
||||
rollermanager.Manager.FreeTaskIDForRoller(proverTask.ProverPublicKey, hash)
|
||||
provermanager.Manager.FreeTaskIDForProver(proverTask.ProverPublicKey, hash)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,7 +79,7 @@ func (b *BaseCollector) checkAttemptsExceeded(hash string, taskType message.Proo
|
||||
}
|
||||
}
|
||||
// update the prover task status to let timeout checker don't check it.
|
||||
if err := b.proverTaskOrm.UpdateAllProverTaskProvingStatusOfTaskID(b.ctx, message.ProofType(proverTasks[0].TaskType), hash, types.RollerProofInvalid, tx); err != nil {
|
||||
if err := b.proverTaskOrm.UpdateAllProverTaskProvingStatusOfTaskID(b.ctx, message.ProofType(proverTasks[0].TaskType), hash, types.ProverProofInvalid, tx); err != nil {
|
||||
log.Error("failed to update prover task proving_status as failed", "msg.ID", hash, "error", err)
|
||||
}
|
||||
return nil
|
||||
@@ -91,7 +91,7 @@ func (b *BaseCollector) checkAttemptsExceeded(hash string, taskType message.Proo
|
||||
return true
|
||||
}
|
||||
|
||||
func (b *BaseCollector) sendTask(proveType message.ProofType, hash string, blockHashes []common.Hash, subProofs []*message.AggProof) ([]*coordinatorType.RollerStatus, error) {
|
||||
func (b *BaseCollector) sendTask(proveType message.ProofType, hash string, blockHashes []common.Hash, subProofs []*message.AggProof) ([]*coordinatorType.ProverStatus, error) {
|
||||
sendMsg := &message.TaskMsg{
|
||||
ID: hash,
|
||||
Type: proveType,
|
||||
@@ -100,26 +100,26 @@ func (b *BaseCollector) sendTask(proveType message.ProofType, hash string, block
|
||||
}
|
||||
|
||||
var err error
|
||||
var rollerStatusList []*coordinatorType.RollerStatus
|
||||
for i := uint8(0); i < b.cfg.RollerManagerConfig.RollersPerSession; i++ {
|
||||
rollerPubKey, rollerName, sendErr := rollermanager.Manager.SendTask(proveType, sendMsg)
|
||||
var proverStatusList []*coordinatorType.ProverStatus
|
||||
for i := uint8(0); i < b.cfg.ProverManagerConfig.ProversPerSession; i++ {
|
||||
proverPubKey, proverName, sendErr := provermanager.Manager.SendTask(proveType, sendMsg)
|
||||
if sendErr != nil {
|
||||
err = sendErr
|
||||
continue
|
||||
}
|
||||
|
||||
rollermanager.Manager.UpdateMetricRollerProofsLastAssignedTimestampGauge(rollerPubKey)
|
||||
provermanager.Manager.UpdateMetricProverProofsLastAssignedTimestampGauge(proverPubKey)
|
||||
|
||||
rollerStatus := &coordinatorType.RollerStatus{
|
||||
PublicKey: rollerPubKey,
|
||||
Name: rollerName,
|
||||
Status: types.RollerAssigned,
|
||||
proverStatus := &coordinatorType.ProverStatus{
|
||||
PublicKey: proverPubKey,
|
||||
Name: proverName,
|
||||
Status: types.ProverAssigned,
|
||||
}
|
||||
rollerStatusList = append(rollerStatusList, rollerStatus)
|
||||
proverStatusList = append(proverStatusList, proverStatus)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rollerStatusList, nil
|
||||
return proverStatusList, nil
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
"scroll-tech/common/types/message"
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/logic/rollermanager"
|
||||
"scroll-tech/coordinator/internal/logic/provermanager"
|
||||
"scroll-tech/coordinator/internal/logic/verifier"
|
||||
"scroll-tech/coordinator/internal/orm"
|
||||
)
|
||||
@@ -33,8 +33,8 @@ var (
|
||||
ErrValidatorFailureProofMsgStatusNotOk = errors.New("validator failure proof msg status not ok")
|
||||
// ErrValidatorFailureProverTaskEmpty get none prover task
|
||||
ErrValidatorFailureProverTaskEmpty = errors.New("validator failure get none prover task for the proof")
|
||||
// ErrValidatorFailureRollerInfoHasProofValid proof is vaild
|
||||
ErrValidatorFailureRollerInfoHasProofValid = errors.New("validator failure prover task info has proof valid")
|
||||
// ErrValidatorFailureProverInfoHasProofValid proof is vaild
|
||||
ErrValidatorFailureProverInfoHasProofValid = errors.New("validator failure prover task info has proof valid")
|
||||
)
|
||||
|
||||
// ZKProofReceiver the proof receiver
|
||||
@@ -44,13 +44,13 @@ type ZKProofReceiver struct {
|
||||
proverTaskOrm *orm.ProverTask
|
||||
|
||||
db *gorm.DB
|
||||
cfg *config.RollerManagerConfig
|
||||
cfg *config.ProverManagerConfig
|
||||
|
||||
verifier *verifier.Verifier
|
||||
}
|
||||
|
||||
// NewZKProofReceiver create a proof receiver
|
||||
func NewZKProofReceiver(cfg *config.RollerManagerConfig, db *gorm.DB) *ZKProofReceiver {
|
||||
func NewZKProofReceiver(cfg *config.ProverManagerConfig, db *gorm.DB) *ZKProofReceiver {
|
||||
vf, err := verifier.NewVerifier(cfg.Verifier)
|
||||
if err != nil {
|
||||
panic("proof receiver new verifier failure")
|
||||
@@ -67,12 +67,12 @@ func NewZKProofReceiver(cfg *config.RollerManagerConfig, db *gorm.DB) *ZKProofRe
|
||||
}
|
||||
}
|
||||
|
||||
// HandleZkProof handle a ZkProof submitted from a roller.
|
||||
// HandleZkProof handle a ZkProof submitted from a prover.
|
||||
// For now only proving/verifying error will lead to setting status as skipped.
|
||||
// db/unmarshal errors will not because they are errors on the business logic side.
|
||||
func (m *ZKProofReceiver) HandleZkProof(ctx context.Context, proofMsg *message.ProofMsg) error {
|
||||
pk, _ := proofMsg.PublicKey()
|
||||
rollermanager.Manager.UpdateMetricRollerProofsLastFinishedTimestampGauge(pk)
|
||||
provermanager.Manager.UpdateMetricProverProofsLastFinishedTimestampGauge(pk)
|
||||
|
||||
proverTask, err := m.proverTaskOrm.GetProverTaskByTaskIDAndPubKey(ctx, proofMsg.ID, pk)
|
||||
if proverTask == nil || err != nil {
|
||||
@@ -126,18 +126,18 @@ func (m *ZKProofReceiver) HandleZkProof(ctx context.Context, proofMsg *message.P
|
||||
if verifyErr != nil || !success {
|
||||
if verifyErr != nil {
|
||||
// TODO: this is only a temp workaround for testnet, we should return err in real cases
|
||||
log.Error("failed to verify zk proof", "proof id", proofMsg.ID, "roller pk", pk, "prove type",
|
||||
log.Error("failed to verify zk proof", "proof id", proofMsg.ID, "prover pk", pk, "prove type",
|
||||
proofMsg.Type, "proof time", proofTimeSec, "error", verifyErr)
|
||||
}
|
||||
m.proofFailure(ctx, proofMsg.ID, pk, proofMsg.Type)
|
||||
|
||||
// TODO: Roller needs to be slashed if proof is invalid.
|
||||
// TODO: Prover needs to be slashed if proof is invalid.
|
||||
coordinatorProofsVerifiedFailedTimeTimer.Update(proofTime)
|
||||
|
||||
rollermanager.Manager.UpdateMetricRollerProofsVerifiedFailedTimeTimer(pk, proofTime)
|
||||
provermanager.Manager.UpdateMetricProverProofsVerifiedFailedTimeTimer(pk, proofTime)
|
||||
|
||||
log.Info("proof verified by coordinator failed", "proof id", proofMsg.ID, "roller name", proverTask.ProverName,
|
||||
"roller pk", pk, "prove type", proofMsg.Type, "proof time", proofTimeSec, "error", verifyErr)
|
||||
log.Info("proof verified by coordinator failed", "proof id", proofMsg.ID, "prover name", proverTask.ProverName,
|
||||
"prover pk", pk, "prove type", proofMsg.Type, "proof time", proofTimeSec, "error", verifyErr)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -146,7 +146,7 @@ func (m *ZKProofReceiver) HandleZkProof(ctx context.Context, proofMsg *message.P
|
||||
}
|
||||
|
||||
coordinatorProofsVerifiedSuccessTimeTimer.Update(proofTime)
|
||||
rollermanager.Manager.UpdateMetricRollerProofsVerifiedSuccessTimeTimer(pk, proofTime)
|
||||
provermanager.Manager.UpdateMetricProverProofsVerifiedSuccessTimeTimer(pk, proofTime)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -171,30 +171,30 @@ func (m *ZKProofReceiver) checkAreAllChunkProofsReady(ctx context.Context, chunk
|
||||
}
|
||||
|
||||
func (m *ZKProofReceiver) validator(proverTask *orm.ProverTask, pk string, proofMsg *message.ProofMsg) error {
|
||||
// Ensure this roller is eligible to participate in the prover task.
|
||||
if types.RollerProveStatus(proverTask.ProvingStatus) == types.RollerProofValid {
|
||||
// Ensure this prover is eligible to participate in the prover task.
|
||||
if types.ProverProveStatus(proverTask.ProvingStatus) == types.ProverProofValid {
|
||||
// In order to prevent DoS attacks, it is forbidden to repeatedly submit valid proofs.
|
||||
// TODO: Defend invalid proof resubmissions by one of the following two methods:
|
||||
// (i) slash the roller for each submission of invalid proof
|
||||
// (i) slash the prover for each submission of invalid proof
|
||||
// (ii) set the maximum failure retry times
|
||||
log.Warn("roller has already submitted valid proof in proof session", "roller name", proverTask.ProverName,
|
||||
"roller pk", proverTask.ProverPublicKey, "proof type", proverTask.TaskType, "proof id", proofMsg.ProofDetail.ID)
|
||||
return ErrValidatorFailureRollerInfoHasProofValid
|
||||
log.Warn("prover has already submitted valid proof in proof session", "prover name", proverTask.ProverName,
|
||||
"prover pk", proverTask.ProverPublicKey, "proof type", proverTask.TaskType, "proof id", proofMsg.ProofDetail.ID)
|
||||
return ErrValidatorFailureProverInfoHasProofValid
|
||||
}
|
||||
|
||||
proofTime := time.Since(proverTask.CreatedAt)
|
||||
proofTimeSec := uint64(proofTime.Seconds())
|
||||
|
||||
log.Info("handling zk proof", "proof id", proofMsg.ID, "roller name", proverTask.ProverName,
|
||||
"roller pk", pk, "prove type", proverTask.TaskType, "proof time", proofTimeSec)
|
||||
log.Info("handling zk proof", "proof id", proofMsg.ID, "prover name", proverTask.ProverName,
|
||||
"prover pk", pk, "prove type", proverTask.TaskType, "proof time", proofTimeSec)
|
||||
|
||||
if proofMsg.Status != message.StatusOk {
|
||||
coordinatorProofsGeneratedFailedTimeTimer.Update(proofTime)
|
||||
|
||||
rollermanager.Manager.UpdateMetricRollerProofsGeneratedFailedTimeTimer(pk, proofTime)
|
||||
provermanager.Manager.UpdateMetricProverProofsGeneratedFailedTimeTimer(pk, proofTime)
|
||||
|
||||
log.Info("proof generated by roller failed", "proof id", proofMsg.ID, "roller name", proverTask.ProverName,
|
||||
"roller pk", pk, "prove type", proofMsg.Type, "proof time", proofTimeSec, "error", proofMsg.Error)
|
||||
log.Info("proof generated by prover failed", "proof id", proofMsg.ID, "prover name", proverTask.ProverName,
|
||||
"prover pk", pk, "prove type", proofMsg.Type, "proof time", proofTimeSec, "error", proofMsg.Error)
|
||||
return ErrValidatorFailureProofMsgStatusNotOk
|
||||
}
|
||||
return nil
|
||||
@@ -219,7 +219,7 @@ func (m *ZKProofReceiver) closeProofTask(ctx context.Context, hash string, pubKe
|
||||
return err
|
||||
}
|
||||
|
||||
rollermanager.Manager.FreeTaskIDForRoller(pubKey, hash)
|
||||
provermanager.Manager.FreeTaskIDForProver(pubKey, hash)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -231,12 +231,12 @@ func (m *ZKProofReceiver) updateProofStatus(ctx context.Context, hash string, pr
|
||||
return nil
|
||||
}
|
||||
|
||||
var proverTaskStatus types.RollerProveStatus
|
||||
var proverTaskStatus types.ProverProveStatus
|
||||
switch status {
|
||||
case types.ProvingTaskFailed, types.ProvingTaskUnassigned:
|
||||
proverTaskStatus = types.RollerProofInvalid
|
||||
proverTaskStatus = types.ProverProofInvalid
|
||||
case types.ProvingTaskVerified:
|
||||
proverTaskStatus = types.RollerProofValid
|
||||
proverTaskStatus = types.ProverProofValid
|
||||
}
|
||||
|
||||
err := m.db.Transaction(func(tx *gorm.DB) error {
|
||||
|
||||
@@ -11,12 +11,12 @@ import (
|
||||
"scroll-tech/common/metrics"
|
||||
"scroll-tech/common/types/message"
|
||||
|
||||
"scroll-tech/coordinator/internal/logic/rollermanager"
|
||||
"scroll-tech/coordinator/internal/logic/provermanager"
|
||||
)
|
||||
|
||||
var coordinatorRollersDisconnectsTotalCounter = gethMetrics.NewRegisteredCounter("coordinator/rollers/disconnects/total", metrics.ScrollRegistry)
|
||||
var coordinatorProversDisconnectsTotalCounter = gethMetrics.NewRegisteredCounter("coordinator/provers/disconnects/total", metrics.ScrollRegistry)
|
||||
|
||||
// TaskWorker held the roller task connection
|
||||
// TaskWorker held the prover task connection
|
||||
type TaskWorker struct{}
|
||||
|
||||
// NewTaskWorker create a task worker
|
||||
@@ -38,8 +38,8 @@ func (t *TaskWorker) AllocTaskWorker(ctx context.Context, authMsg *message.AuthM
|
||||
|
||||
identity := authMsg.Identity
|
||||
|
||||
// create or get the roller message channel
|
||||
taskCh, err := rollermanager.Manager.Register(ctx, pubKey, identity)
|
||||
// create or get the prover message channel
|
||||
taskCh, err := provermanager.Manager.Register(ctx, pubKey, identity)
|
||||
if err != nil {
|
||||
return &rpc.Subscription{}, err
|
||||
}
|
||||
@@ -48,7 +48,7 @@ func (t *TaskWorker) AllocTaskWorker(ctx context.Context, authMsg *message.AuthM
|
||||
|
||||
go t.worker(rpcSub, notifier, pubKey, identity, taskCh)
|
||||
|
||||
log.Info("roller register", "name", identity.Name, "pubKey", pubKey, "version", identity.Version)
|
||||
log.Info("prover register", "name", identity.Name, "pubKey", pubKey, "version", identity.Version)
|
||||
|
||||
return rpcSub, nil
|
||||
}
|
||||
@@ -60,8 +60,8 @@ func (t *TaskWorker) worker(rpcSub *rpc.Subscription, notifier *rpc.Notifier, pu
|
||||
log.Error("task worker subId:%d panic for:%v", err)
|
||||
}
|
||||
|
||||
rollermanager.Manager.FreeRoller(pubKey)
|
||||
log.Info("roller unregister", "name", identity.Name, "pubKey", pubKey)
|
||||
provermanager.Manager.FreeProver(pubKey)
|
||||
log.Info("prover unregister", "name", identity.Name, "pubKey", pubKey)
|
||||
}()
|
||||
|
||||
for {
|
||||
@@ -69,7 +69,7 @@ func (t *TaskWorker) worker(rpcSub *rpc.Subscription, notifier *rpc.Notifier, pu
|
||||
case task := <-taskCh:
|
||||
notifier.Notify(rpcSub.ID, task) //nolint
|
||||
case err := <-rpcSub.Err():
|
||||
coordinatorRollersDisconnectsTotalCounter.Inc(1)
|
||||
coordinatorProversDisconnectsTotalCounter.Inc(1)
|
||||
log.Warn("client stopped the ws connection", "name", identity.Name, "pubkey", pubKey, "err", err)
|
||||
return
|
||||
case <-notifier.Closed():
|
||||
|
||||
60
coordinator/internal/logic/provermanager/metrics.go
Normal file
60
coordinator/internal/logic/provermanager/metrics.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package provermanager
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
gethMetrics "github.com/scroll-tech/go-ethereum/metrics"
|
||||
)
|
||||
|
||||
type proverMetrics struct {
|
||||
proverProofsVerifiedSuccessTimeTimer gethMetrics.Timer
|
||||
proverProofsVerifiedFailedTimeTimer gethMetrics.Timer
|
||||
proverProofsGeneratedFailedTimeTimer gethMetrics.Timer
|
||||
proverProofsLastAssignedTimestampGauge gethMetrics.Gauge
|
||||
proverProofsLastFinishedTimestampGauge gethMetrics.Gauge
|
||||
}
|
||||
|
||||
func (r *proverManager) UpdateMetricProverProofsLastFinishedTimestampGauge(pk string) {
|
||||
if node, ok := r.proverPool.Get(pk); ok {
|
||||
rMs := node.(*proverNode).metrics
|
||||
if rMs != nil {
|
||||
rMs.proverProofsLastFinishedTimestampGauge.Update(time.Now().Unix())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *proverManager) UpdateMetricProverProofsLastAssignedTimestampGauge(pk string) {
|
||||
if node, ok := r.proverPool.Get(pk); ok {
|
||||
rMs := node.(*proverNode).metrics
|
||||
if rMs != nil {
|
||||
rMs.proverProofsLastAssignedTimestampGauge.Update(time.Now().Unix())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *proverManager) UpdateMetricProverProofsVerifiedSuccessTimeTimer(pk string, d time.Duration) {
|
||||
if node, ok := r.proverPool.Get(pk); ok {
|
||||
rMs := node.(*proverNode).metrics
|
||||
if rMs != nil {
|
||||
rMs.proverProofsVerifiedSuccessTimeTimer.Update(d)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *proverManager) UpdateMetricProverProofsVerifiedFailedTimeTimer(pk string, d time.Duration) {
|
||||
if node, ok := r.proverPool.Get(pk); ok {
|
||||
rMs := node.(*proverNode).metrics
|
||||
if rMs != nil {
|
||||
rMs.proverProofsVerifiedFailedTimeTimer.Update(d)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *proverManager) UpdateMetricProverProofsGeneratedFailedTimeTimer(pk string, d time.Duration) {
|
||||
if node, ok := r.proverPool.Get(pk); ok {
|
||||
rMs := node.(*proverNode).metrics
|
||||
if rMs != nil {
|
||||
rMs.proverProofsGeneratedFailedTimeTimer.Update(d)
|
||||
}
|
||||
}
|
||||
}
|
||||
203
coordinator/internal/logic/provermanager/prover_manager.go
Normal file
203
coordinator/internal/logic/provermanager/prover_manager.go
Normal file
@@ -0,0 +1,203 @@
|
||||
package provermanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cmap "github.com/orcaman/concurrent-map"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
gethMetrics "github.com/scroll-tech/go-ethereum/metrics"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"scroll-tech/common/metrics"
|
||||
"scroll-tech/common/types"
|
||||
"scroll-tech/common/types/message"
|
||||
|
||||
"scroll-tech/coordinator/internal/orm"
|
||||
)
|
||||
|
||||
var (
|
||||
once sync.Once
|
||||
// Manager the global prover manager
|
||||
Manager *proverManager
|
||||
)
|
||||
|
||||
// proverNode is the interface that controls the Provers
|
||||
type proverNode struct {
|
||||
// Core name
|
||||
Name string
|
||||
// Core type
|
||||
Type message.ProofType
|
||||
// Core public key
|
||||
PublicKey string
|
||||
// Core version
|
||||
Version string
|
||||
|
||||
// task channel
|
||||
taskChan chan *message.TaskMsg
|
||||
// session id list which delivered to prover.
|
||||
TaskIDs cmap.ConcurrentMap
|
||||
|
||||
// Time of message creation
|
||||
registerTime time.Time
|
||||
|
||||
metrics *proverMetrics
|
||||
}
|
||||
|
||||
type proverManager struct {
|
||||
proverPool cmap.ConcurrentMap
|
||||
proverTaskOrm *orm.ProverTask
|
||||
}
|
||||
|
||||
// InitProverManager init a prover manager
|
||||
func InitProverManager(db *gorm.DB) {
|
||||
once.Do(func() {
|
||||
Manager = &proverManager{
|
||||
proverPool: cmap.New(),
|
||||
proverTaskOrm: orm.NewProverTask(db),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Register the identity message to prover manager with the public key
|
||||
func (r *proverManager) Register(ctx context.Context, proverPublicKey string, identity *message.Identity) (<-chan *message.TaskMsg, error) {
|
||||
node, ok := r.proverPool.Get(proverPublicKey)
|
||||
if !ok {
|
||||
taskIDs, err := r.reloadProverAssignedTasks(ctx, proverPublicKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("register error:%w", err)
|
||||
}
|
||||
|
||||
rMs := &proverMetrics{
|
||||
proverProofsVerifiedSuccessTimeTimer: gethMetrics.GetOrRegisterTimer(fmt.Sprintf("prover/proofs/verified/success/time/%s", proverPublicKey), metrics.ScrollRegistry),
|
||||
proverProofsVerifiedFailedTimeTimer: gethMetrics.GetOrRegisterTimer(fmt.Sprintf("prover/proofs/verified/failed/time/%s", proverPublicKey), metrics.ScrollRegistry),
|
||||
proverProofsGeneratedFailedTimeTimer: gethMetrics.GetOrRegisterTimer(fmt.Sprintf("prover/proofs/generated/failed/time/%s", proverPublicKey), metrics.ScrollRegistry),
|
||||
proverProofsLastAssignedTimestampGauge: gethMetrics.GetOrRegisterGauge(fmt.Sprintf("prover/proofs/last/assigned/timestamp/%s", proverPublicKey), metrics.ScrollRegistry),
|
||||
proverProofsLastFinishedTimestampGauge: gethMetrics.GetOrRegisterGauge(fmt.Sprintf("prover/proofs/last/finished/timestamp/%s", proverPublicKey), metrics.ScrollRegistry),
|
||||
}
|
||||
node = &proverNode{
|
||||
Name: identity.Name,
|
||||
Type: identity.ProverType,
|
||||
Version: identity.Version,
|
||||
PublicKey: proverPublicKey,
|
||||
TaskIDs: *taskIDs,
|
||||
taskChan: make(chan *message.TaskMsg, 4),
|
||||
metrics: rMs,
|
||||
}
|
||||
r.proverPool.Set(proverPublicKey, node)
|
||||
}
|
||||
prover := node.(*proverNode)
|
||||
// avoid reconnection too frequently.
|
||||
if time.Since(prover.registerTime) < 60 {
|
||||
log.Warn("prover reconnect too frequently", "prover_name", identity.Name, "prover_type", identity.ProverType, "public key", proverPublicKey)
|
||||
return nil, fmt.Errorf("prover reconnect too frequently")
|
||||
}
|
||||
// update register time and status
|
||||
prover.registerTime = time.Now()
|
||||
|
||||
return prover.taskChan, nil
|
||||
}
|
||||
|
||||
func (r *proverManager) reloadProverAssignedTasks(ctx context.Context, proverPublicKey string) (*cmap.ConcurrentMap, error) {
|
||||
var assignedProverTasks []orm.ProverTask
|
||||
page := 0
|
||||
limit := 100
|
||||
for {
|
||||
page++
|
||||
whereFields := make(map[string]interface{})
|
||||
whereFields["proving_status"] = int16(types.ProverAssigned)
|
||||
orderBy := []string{"id asc"}
|
||||
offset := (page - 1) * limit
|
||||
batchAssignedProverTasks, err := r.proverTaskOrm.GetProverTasks(ctx, whereFields, orderBy, offset, limit)
|
||||
if err != nil {
|
||||
log.Warn("reloadProverAssignedTasks get all assigned failure", "error", err)
|
||||
return nil, fmt.Errorf("reloadProverAssignedTasks error:%w", err)
|
||||
}
|
||||
if len(batchAssignedProverTasks) < limit {
|
||||
break
|
||||
}
|
||||
assignedProverTasks = append(assignedProverTasks, batchAssignedProverTasks...)
|
||||
}
|
||||
|
||||
taskIDs := cmap.New()
|
||||
for _, assignedProverTask := range assignedProverTasks {
|
||||
if assignedProverTask.ProverPublicKey == proverPublicKey && assignedProverTask.ProvingStatus == int16(types.ProverAssigned) {
|
||||
taskIDs.Set(assignedProverTask.TaskID, struct{}{})
|
||||
}
|
||||
}
|
||||
return &taskIDs, nil
|
||||
}
|
||||
|
||||
// SendTask send the need proved message to prover
|
||||
func (r *proverManager) SendTask(proofType message.ProofType, msg *message.TaskMsg) (string, string, error) {
|
||||
tmpProver := r.selectProver(proofType)
|
||||
if tmpProver == nil {
|
||||
return "", "", errors.New("selectProver returns nil")
|
||||
}
|
||||
|
||||
select {
|
||||
case tmpProver.taskChan <- msg:
|
||||
tmpProver.TaskIDs.Set(msg.ID, struct{}{})
|
||||
default:
|
||||
err := fmt.Errorf("prover channel is full, ProverName:%s, publicKey:%s", tmpProver.Name, tmpProver.PublicKey)
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
r.UpdateMetricProverProofsLastAssignedTimestampGauge(tmpProver.PublicKey)
|
||||
|
||||
return tmpProver.PublicKey, tmpProver.Name, nil
|
||||
}
|
||||
|
||||
// ExistTaskIDForProver check the task exist
|
||||
func (r *proverManager) ExistTaskIDForProver(pk string, id string) bool {
|
||||
node, ok := r.proverPool.Get(pk)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
prover := node.(*proverNode)
|
||||
return prover.TaskIDs.Has(id)
|
||||
}
|
||||
|
||||
// FreeProver free the prover with the pk key
|
||||
func (r *proverManager) FreeProver(pk string) {
|
||||
r.proverPool.Pop(pk)
|
||||
}
|
||||
|
||||
// FreeTaskIDForProver free a task of the pk prover
|
||||
func (r *proverManager) FreeTaskIDForProver(pk string, id string) {
|
||||
if node, ok := r.proverPool.Get(pk); ok {
|
||||
prover := node.(*proverNode)
|
||||
prover.TaskIDs.Pop(id)
|
||||
}
|
||||
}
|
||||
|
||||
// GetNumberOfIdleProvers return the count of idle provers.
|
||||
func (r *proverManager) GetNumberOfIdleProvers(proofType message.ProofType) (count int) {
|
||||
for item := range r.proverPool.IterBuffered() {
|
||||
prover := item.Val.(*proverNode)
|
||||
if prover.TaskIDs.Count() == 0 && prover.Type == proofType {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
func (r *proverManager) selectProver(proofType message.ProofType) *proverNode {
|
||||
pubkeys := r.proverPool.Keys()
|
||||
for len(pubkeys) > 0 {
|
||||
idx, _ := rand.Int(rand.Reader, big.NewInt(int64(len(pubkeys))))
|
||||
if val, ok := r.proverPool.Get(pubkeys[idx.Int64()]); ok {
|
||||
rn := val.(*proverNode)
|
||||
if rn.TaskIDs.Count() == 0 && rn.Type == proofType {
|
||||
return rn
|
||||
}
|
||||
}
|
||||
pubkeys[idx.Int64()], pubkeys = pubkeys[0], pubkeys[1:]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1,60 +0,0 @@
|
||||
package rollermanager
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
gethMetrics "github.com/scroll-tech/go-ethereum/metrics"
|
||||
)
|
||||
|
||||
type rollerMetrics struct {
|
||||
rollerProofsVerifiedSuccessTimeTimer gethMetrics.Timer
|
||||
rollerProofsVerifiedFailedTimeTimer gethMetrics.Timer
|
||||
rollerProofsGeneratedFailedTimeTimer gethMetrics.Timer
|
||||
rollerProofsLastAssignedTimestampGauge gethMetrics.Gauge
|
||||
rollerProofsLastFinishedTimestampGauge gethMetrics.Gauge
|
||||
}
|
||||
|
||||
func (r *rollerManager) UpdateMetricRollerProofsLastFinishedTimestampGauge(pk string) {
|
||||
if node, ok := r.rollerPool.Get(pk); ok {
|
||||
rMs := node.(*rollerNode).metrics
|
||||
if rMs != nil {
|
||||
rMs.rollerProofsLastFinishedTimestampGauge.Update(time.Now().Unix())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rollerManager) UpdateMetricRollerProofsLastAssignedTimestampGauge(pk string) {
|
||||
if node, ok := r.rollerPool.Get(pk); ok {
|
||||
rMs := node.(*rollerNode).metrics
|
||||
if rMs != nil {
|
||||
rMs.rollerProofsLastAssignedTimestampGauge.Update(time.Now().Unix())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rollerManager) UpdateMetricRollerProofsVerifiedSuccessTimeTimer(pk string, d time.Duration) {
|
||||
if node, ok := r.rollerPool.Get(pk); ok {
|
||||
rMs := node.(*rollerNode).metrics
|
||||
if rMs != nil {
|
||||
rMs.rollerProofsVerifiedSuccessTimeTimer.Update(d)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rollerManager) UpdateMetricRollerProofsVerifiedFailedTimeTimer(pk string, d time.Duration) {
|
||||
if node, ok := r.rollerPool.Get(pk); ok {
|
||||
rMs := node.(*rollerNode).metrics
|
||||
if rMs != nil {
|
||||
rMs.rollerProofsVerifiedFailedTimeTimer.Update(d)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rollerManager) UpdateMetricRollerProofsGeneratedFailedTimeTimer(pk string, d time.Duration) {
|
||||
if node, ok := r.rollerPool.Get(pk); ok {
|
||||
rMs := node.(*rollerNode).metrics
|
||||
if rMs != nil {
|
||||
rMs.rollerProofsGeneratedFailedTimeTimer.Update(d)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,203 +0,0 @@
|
||||
package rollermanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cmap "github.com/orcaman/concurrent-map"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
gethMetrics "github.com/scroll-tech/go-ethereum/metrics"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"scroll-tech/common/metrics"
|
||||
"scroll-tech/common/types"
|
||||
"scroll-tech/common/types/message"
|
||||
|
||||
"scroll-tech/coordinator/internal/orm"
|
||||
)
|
||||
|
||||
var (
|
||||
once sync.Once
|
||||
// Manager the global roller manager
|
||||
Manager *rollerManager
|
||||
)
|
||||
|
||||
// RollerNode is the interface that controls the rollers
|
||||
type rollerNode struct {
|
||||
// Roller name
|
||||
Name string
|
||||
// Roller type
|
||||
Type message.ProofType
|
||||
// Roller public key
|
||||
PublicKey string
|
||||
// Roller version
|
||||
Version string
|
||||
|
||||
// task channel
|
||||
taskChan chan *message.TaskMsg
|
||||
// session id list which delivered to roller.
|
||||
TaskIDs cmap.ConcurrentMap
|
||||
|
||||
// Time of message creation
|
||||
registerTime time.Time
|
||||
|
||||
metrics *rollerMetrics
|
||||
}
|
||||
|
||||
type rollerManager struct {
|
||||
rollerPool cmap.ConcurrentMap
|
||||
proverTaskOrm *orm.ProverTask
|
||||
}
|
||||
|
||||
// InitRollerManager init a roller manager
|
||||
func InitRollerManager(db *gorm.DB) {
|
||||
once.Do(func() {
|
||||
Manager = &rollerManager{
|
||||
rollerPool: cmap.New(),
|
||||
proverTaskOrm: orm.NewProverTask(db),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Register the identity message to roller manager with the public key
|
||||
func (r *rollerManager) Register(ctx context.Context, proverPublicKey string, identity *message.Identity) (<-chan *message.TaskMsg, error) {
|
||||
node, ok := r.rollerPool.Get(proverPublicKey)
|
||||
if !ok {
|
||||
taskIDs, err := r.reloadRollerAssignedTasks(ctx, proverPublicKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("register error:%w", err)
|
||||
}
|
||||
|
||||
rMs := &rollerMetrics{
|
||||
rollerProofsVerifiedSuccessTimeTimer: gethMetrics.GetOrRegisterTimer(fmt.Sprintf("roller/proofs/verified/success/time/%s", proverPublicKey), metrics.ScrollRegistry),
|
||||
rollerProofsVerifiedFailedTimeTimer: gethMetrics.GetOrRegisterTimer(fmt.Sprintf("roller/proofs/verified/failed/time/%s", proverPublicKey), metrics.ScrollRegistry),
|
||||
rollerProofsGeneratedFailedTimeTimer: gethMetrics.GetOrRegisterTimer(fmt.Sprintf("roller/proofs/generated/failed/time/%s", proverPublicKey), metrics.ScrollRegistry),
|
||||
rollerProofsLastAssignedTimestampGauge: gethMetrics.GetOrRegisterGauge(fmt.Sprintf("roller/proofs/last/assigned/timestamp/%s", proverPublicKey), metrics.ScrollRegistry),
|
||||
rollerProofsLastFinishedTimestampGauge: gethMetrics.GetOrRegisterGauge(fmt.Sprintf("roller/proofs/last/finished/timestamp/%s", proverPublicKey), metrics.ScrollRegistry),
|
||||
}
|
||||
node = &rollerNode{
|
||||
Name: identity.Name,
|
||||
Type: identity.RollerType,
|
||||
Version: identity.Version,
|
||||
PublicKey: proverPublicKey,
|
||||
TaskIDs: *taskIDs,
|
||||
taskChan: make(chan *message.TaskMsg, 4),
|
||||
metrics: rMs,
|
||||
}
|
||||
r.rollerPool.Set(proverPublicKey, node)
|
||||
}
|
||||
roller := node.(*rollerNode)
|
||||
// avoid reconnection too frequently.
|
||||
if time.Since(roller.registerTime) < 60 {
|
||||
log.Warn("roller reconnect too frequently", "prover_name", identity.Name, "roller_type", identity.RollerType, "public key", proverPublicKey)
|
||||
return nil, fmt.Errorf("roller reconnect too frequently")
|
||||
}
|
||||
// update register time and status
|
||||
roller.registerTime = time.Now()
|
||||
|
||||
return roller.taskChan, nil
|
||||
}
|
||||
|
||||
func (r *rollerManager) reloadRollerAssignedTasks(ctx context.Context, proverPublicKey string) (*cmap.ConcurrentMap, error) {
|
||||
var assignedProverTasks []orm.ProverTask
|
||||
page := 0
|
||||
limit := 100
|
||||
for {
|
||||
page++
|
||||
whereFields := make(map[string]interface{})
|
||||
whereFields["proving_status"] = int16(types.RollerAssigned)
|
||||
orderBy := []string{"id asc"}
|
||||
offset := (page - 1) * limit
|
||||
batchAssignedProverTasks, err := r.proverTaskOrm.GetProverTasks(ctx, whereFields, orderBy, offset, limit)
|
||||
if err != nil {
|
||||
log.Warn("reloadRollerAssignedTasks get all assigned failure", "error", err)
|
||||
return nil, fmt.Errorf("reloadRollerAssignedTasks error:%w", err)
|
||||
}
|
||||
if len(batchAssignedProverTasks) < limit {
|
||||
break
|
||||
}
|
||||
assignedProverTasks = append(assignedProverTasks, batchAssignedProverTasks...)
|
||||
}
|
||||
|
||||
taskIDs := cmap.New()
|
||||
for _, assignedProverTask := range assignedProverTasks {
|
||||
if assignedProverTask.ProverPublicKey == proverPublicKey && assignedProverTask.ProvingStatus == int16(types.RollerAssigned) {
|
||||
taskIDs.Set(assignedProverTask.TaskID, struct{}{})
|
||||
}
|
||||
}
|
||||
return &taskIDs, nil
|
||||
}
|
||||
|
||||
// SendTask send the need proved message to roller
|
||||
func (r *rollerManager) SendTask(rollerType message.ProofType, msg *message.TaskMsg) (string, string, error) {
|
||||
tmpRoller := r.selectRoller(rollerType)
|
||||
if tmpRoller == nil {
|
||||
return "", "", errors.New("selectRoller returns nil")
|
||||
}
|
||||
|
||||
select {
|
||||
case tmpRoller.taskChan <- msg:
|
||||
tmpRoller.TaskIDs.Set(msg.ID, struct{}{})
|
||||
default:
|
||||
err := fmt.Errorf("roller channel is full, rollerName:%s, publicKey:%s", tmpRoller.Name, tmpRoller.PublicKey)
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
r.UpdateMetricRollerProofsLastAssignedTimestampGauge(tmpRoller.PublicKey)
|
||||
|
||||
return tmpRoller.PublicKey, tmpRoller.Name, nil
|
||||
}
|
||||
|
||||
// ExistTaskIDForRoller check the task exist
|
||||
func (r *rollerManager) ExistTaskIDForRoller(pk string, id string) bool {
|
||||
node, ok := r.rollerPool.Get(pk)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
roller := node.(*rollerNode)
|
||||
return roller.TaskIDs.Has(id)
|
||||
}
|
||||
|
||||
// FreeRoller free the roller with the pk key
|
||||
func (r *rollerManager) FreeRoller(pk string) {
|
||||
r.rollerPool.Pop(pk)
|
||||
}
|
||||
|
||||
// FreeTaskIDForRoller free a task of the pk roller
|
||||
func (r *rollerManager) FreeTaskIDForRoller(pk string, id string) {
|
||||
if node, ok := r.rollerPool.Get(pk); ok {
|
||||
roller := node.(*rollerNode)
|
||||
roller.TaskIDs.Pop(id)
|
||||
}
|
||||
}
|
||||
|
||||
// GetNumberOfIdleRollers return the count of idle rollers.
|
||||
func (r *rollerManager) GetNumberOfIdleRollers(rollerType message.ProofType) (count int) {
|
||||
for item := range r.rollerPool.IterBuffered() {
|
||||
roller := item.Val.(*rollerNode)
|
||||
if roller.TaskIDs.Count() == 0 && roller.Type == rollerType {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
func (r *rollerManager) selectRoller(rollerType message.ProofType) *rollerNode {
|
||||
pubkeys := r.rollerPool.Keys()
|
||||
for len(pubkeys) > 0 {
|
||||
idx, _ := rand.Int(rand.Reader, big.NewInt(int64(len(pubkeys))))
|
||||
if val, ok := r.rollerPool.Get(pubkeys[idx.Int64()]); ok {
|
||||
rn := val.(*rollerNode)
|
||||
if rn.TaskIDs.Count() == 0 && rn.Type == rollerType {
|
||||
return rn
|
||||
}
|
||||
}
|
||||
pubkeys[idx.Int64()], pubkeys = pubkeys[0], pubkeys[1:]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -68,9 +68,9 @@ func TestProverTaskOrm(t *testing.T) {
|
||||
|
||||
proverTask := ProverTask{
|
||||
TaskID: "test-hash",
|
||||
ProverName: "roller-0",
|
||||
ProverName: "prover-0",
|
||||
ProverPublicKey: "0",
|
||||
ProvingStatus: int16(types.RollerAssigned),
|
||||
ProvingStatus: int16(types.ProverAssigned),
|
||||
Reward: decimal.NewFromBigInt(reward, 0),
|
||||
AssignedAt: utils.NowUTC(),
|
||||
}
|
||||
@@ -87,7 +87,7 @@ func TestProverTaskOrm(t *testing.T) {
|
||||
assert.Equal(t, resultReward, reward)
|
||||
assert.Equal(t, resultReward.String(), "18446744073709551616")
|
||||
|
||||
proverTask.ProvingStatus = int16(types.RollerProofValid)
|
||||
proverTask.ProvingStatus = int16(types.ProverProofValid)
|
||||
proverTask.AssignedAt = utils.NowUTC()
|
||||
err = proverTaskOrm.SetProverTask(context.Background(), &proverTask)
|
||||
assert.NoError(t, err)
|
||||
@@ -107,9 +107,9 @@ func TestProverTaskOrmUint256(t *testing.T) {
|
||||
rewardUint256.SetString("115792089237316195423570985008687907853269984665640564039457584007913129639935", 10)
|
||||
proverTask := ProverTask{
|
||||
TaskID: "test-hash",
|
||||
ProverName: "roller-0",
|
||||
ProverName: "prover-0",
|
||||
ProverPublicKey: "0",
|
||||
ProvingStatus: int16(types.RollerAssigned),
|
||||
ProvingStatus: int16(types.ProverAssigned),
|
||||
Reward: decimal.NewFromBigInt(rewardUint256, 0),
|
||||
AssignedAt: utils.NowUTC(),
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
"scroll-tech/common/types/message"
|
||||
)
|
||||
|
||||
// ProverTask is assigned rollers info of chunk/batch proof prover task
|
||||
// ProverTask is assigned provers info of chunk/batch proof prover task
|
||||
type ProverTask struct {
|
||||
db *gorm.DB `gorm:"column:-"`
|
||||
|
||||
@@ -115,7 +115,7 @@ func (o *ProverTask) GetProverTaskByTaskIDAndPubKey(ctx context.Context, taskID,
|
||||
func (o *ProverTask) GetAssignedProverTasks(ctx context.Context, limit int) ([]ProverTask, error) {
|
||||
db := o.db.WithContext(ctx)
|
||||
db = db.Model(&ProverTask{})
|
||||
db = db.Where("proving_status", int(types.RollerAssigned))
|
||||
db = db.Where("proving_status", int(types.ProverAssigned))
|
||||
db = db.Limit(limit)
|
||||
|
||||
var proverTasks []ProverTask
|
||||
@@ -146,7 +146,7 @@ func (o *ProverTask) SetProverTask(ctx context.Context, proverTask *ProverTask,
|
||||
}
|
||||
|
||||
// UpdateProverTaskProvingStatus updates the proving_status of a specific ProverTask record.
|
||||
func (o *ProverTask) UpdateProverTaskProvingStatus(ctx context.Context, proofType message.ProofType, taskID string, pk string, status types.RollerProveStatus, dbTX ...*gorm.DB) error {
|
||||
func (o *ProverTask) UpdateProverTaskProvingStatus(ctx context.Context, proofType message.ProofType, taskID string, pk string, status types.ProverProveStatus, dbTX ...*gorm.DB) error {
|
||||
db := o.db
|
||||
if len(dbTX) > 0 && dbTX[0] != nil {
|
||||
db = dbTX[0]
|
||||
@@ -162,7 +162,7 @@ func (o *ProverTask) UpdateProverTaskProvingStatus(ctx context.Context, proofTyp
|
||||
}
|
||||
|
||||
// UpdateAllProverTaskProvingStatusOfTaskID updates all the proving_status of a specific task id.
|
||||
func (o *ProverTask) UpdateAllProverTaskProvingStatusOfTaskID(ctx context.Context, proofType message.ProofType, taskID string, status types.RollerProveStatus, dbTX ...*gorm.DB) error {
|
||||
func (o *ProverTask) UpdateAllProverTaskProvingStatusOfTaskID(ctx context.Context, proofType message.ProofType, taskID string, status types.ProverProveStatus, dbTX ...*gorm.DB) error {
|
||||
db := o.db
|
||||
if len(dbTX) > 0 && dbTX[0] != nil {
|
||||
db = dbTX[0]
|
||||
|
||||
@@ -5,17 +5,17 @@ import (
|
||||
"scroll-tech/common/types/message"
|
||||
)
|
||||
|
||||
// RollersInfo is assigned rollers info of a task (session)
|
||||
type RollersInfo struct {
|
||||
// ProversInfo is assigned provers info of a task (session)
|
||||
type ProversInfo struct {
|
||||
ID string `json:"id"`
|
||||
RollerStatusList []*RollerStatus `json:"rollers"`
|
||||
ProverStatusList []*ProverStatus `json:"provers"`
|
||||
StartTimestamp int64 `json:"start_timestamp"`
|
||||
ProveType message.ProofType `json:"prove_type,omitempty"`
|
||||
}
|
||||
|
||||
// RollerStatus is the roller name and roller prove status
|
||||
type RollerStatus struct {
|
||||
// ProverStatus is the prover name and prover prove status
|
||||
type ProverStatus struct {
|
||||
PublicKey string `json:"public_key"`
|
||||
Name string `json:"name"`
|
||||
Status types.RollerProveStatus `json:"status"`
|
||||
Status types.ProverProveStatus `json:"status"`
|
||||
}
|
||||
@@ -31,7 +31,7 @@ import (
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/controller/api"
|
||||
"scroll-tech/coordinator/internal/controller/cron"
|
||||
"scroll-tech/coordinator/internal/logic/rollermanager"
|
||||
"scroll-tech/coordinator/internal/logic/provermanager"
|
||||
"scroll-tech/coordinator/internal/orm"
|
||||
)
|
||||
|
||||
@@ -61,7 +61,7 @@ func randomURL() string {
|
||||
return fmt.Sprintf("localhost:%d", 10000+2000+id.Int64())
|
||||
}
|
||||
|
||||
func setupCoordinator(t *testing.T, rollersPerSession uint8, wsURL string, resetDB bool) (*http.Server, *cron.Collector) {
|
||||
func setupCoordinator(t *testing.T, proversPerSession uint8, wsURL string, resetDB bool) (*http.Server, *cron.Collector) {
|
||||
var err error
|
||||
db, err = database.InitDB(dbCfg)
|
||||
assert.NoError(t, err)
|
||||
@@ -72,8 +72,8 @@ func setupCoordinator(t *testing.T, rollersPerSession uint8, wsURL string, reset
|
||||
}
|
||||
|
||||
conf := config.Config{
|
||||
RollerManagerConfig: &config.RollerManagerConfig{
|
||||
RollersPerSession: rollersPerSession,
|
||||
ProverManagerConfig: &config.ProverManagerConfig{
|
||||
ProversPerSession: proversPerSession,
|
||||
Verifier: &config.VerifierConfig{MockMode: true},
|
||||
CollectionTime: 1,
|
||||
TokenTimeToLive: 5,
|
||||
@@ -85,7 +85,7 @@ func setupCoordinator(t *testing.T, rollersPerSession uint8, wsURL string, reset
|
||||
tmpAPI := api.RegisterAPIs(&conf, db)
|
||||
handler, _, err := utils.StartWSEndpoint(strings.Split(wsURL, "//")[1], tmpAPI, flate.NoCompression)
|
||||
assert.NoError(t, err)
|
||||
rollermanager.InitRollerManager(db)
|
||||
provermanager.InitProverManager(db)
|
||||
return handler, proofCollector
|
||||
}
|
||||
|
||||
@@ -139,7 +139,7 @@ func TestApis(t *testing.T) {
|
||||
t.Run("TestInvalidProof", testInvalidProof)
|
||||
t.Run("TestProofGeneratedFailed", testProofGeneratedFailed)
|
||||
t.Run("TestTimeoutProof", testTimeoutProof)
|
||||
t.Run("TestIdleRollerSelection", testIdleRollerSelection)
|
||||
t.Run("TestIdleProverSelection", testIdleProverSelection)
|
||||
t.Run("TestGracefulRestart", testGracefulRestart)
|
||||
|
||||
// Teardown
|
||||
@@ -157,14 +157,14 @@ func testHandshake(t *testing.T) {
|
||||
proofCollector.Stop()
|
||||
}()
|
||||
|
||||
roller1 := newMockRoller(t, "roller_test", wsURL, message.ProofTypeChunk)
|
||||
defer roller1.close()
|
||||
prover1 := newMockProver(t, "prover_test", wsURL, message.ProofTypeChunk)
|
||||
defer prover1.close()
|
||||
|
||||
roller2 := newMockRoller(t, "roller_test", wsURL, message.ProofTypeBatch)
|
||||
defer roller2.close()
|
||||
prover2 := newMockProver(t, "prover_test", wsURL, message.ProofTypeBatch)
|
||||
defer prover2.close()
|
||||
|
||||
assert.Equal(t, 1, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeChunk))
|
||||
assert.Equal(t, 1, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeBatch))
|
||||
assert.Equal(t, 1, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeChunk))
|
||||
assert.Equal(t, 1, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeBatch))
|
||||
}
|
||||
|
||||
func testFailedHandshake(t *testing.T) {
|
||||
@@ -177,7 +177,7 @@ func testFailedHandshake(t *testing.T) {
|
||||
}()
|
||||
|
||||
// prepare
|
||||
name := "roller_test"
|
||||
name := "prover_test"
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
@@ -222,7 +222,7 @@ func testFailedHandshake(t *testing.T) {
|
||||
_, err = c.RegisterAndSubscribe(ctx, make(chan *message.TaskMsg, 4), authMsg)
|
||||
assert.Error(t, err)
|
||||
|
||||
assert.Equal(t, 0, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeChunk))
|
||||
assert.Equal(t, 0, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeChunk))
|
||||
}
|
||||
|
||||
func testSeveralConnections(t *testing.T) {
|
||||
@@ -236,25 +236,25 @@ func testSeveralConnections(t *testing.T) {
|
||||
var (
|
||||
batch = 200
|
||||
eg = errgroup.Group{}
|
||||
rollers = make([]*mockRoller, batch)
|
||||
provers = make([]*mockProver, batch)
|
||||
)
|
||||
for i := 0; i < batch; i += 2 {
|
||||
idx := i
|
||||
eg.Go(func() error {
|
||||
rollers[idx] = newMockRoller(t, "roller_test_"+strconv.Itoa(idx), wsURL, message.ProofTypeChunk)
|
||||
rollers[idx+1] = newMockRoller(t, "roller_test_"+strconv.Itoa(idx+1), wsURL, message.ProofTypeBatch)
|
||||
provers[idx] = newMockProver(t, "prover_test_"+strconv.Itoa(idx), wsURL, message.ProofTypeChunk)
|
||||
provers[idx+1] = newMockProver(t, "prover_test_"+strconv.Itoa(idx+1), wsURL, message.ProofTypeBatch)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
assert.NoError(t, eg.Wait())
|
||||
|
||||
// check roller's idle connections
|
||||
assert.Equal(t, batch/2, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeChunk))
|
||||
assert.Equal(t, batch/2, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeBatch))
|
||||
// check prover's idle connections
|
||||
assert.Equal(t, batch/2, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeChunk))
|
||||
assert.Equal(t, batch/2, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeBatch))
|
||||
|
||||
// close connection
|
||||
for _, roller := range rollers {
|
||||
roller.close()
|
||||
for _, prover := range provers {
|
||||
prover.close()
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -264,11 +264,11 @@ func testSeveralConnections(t *testing.T) {
|
||||
for {
|
||||
select {
|
||||
case <-tick:
|
||||
if rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeChunk) == 0 {
|
||||
if provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeChunk) == 0 {
|
||||
return
|
||||
}
|
||||
case <-tickStop:
|
||||
t.Error("roller connect is blocked")
|
||||
t.Error("prover connect is blocked")
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -282,33 +282,33 @@ func testValidProof(t *testing.T) {
|
||||
collector.Stop()
|
||||
}()
|
||||
|
||||
// create mock rollers.
|
||||
rollers := make([]*mockRoller, 6)
|
||||
for i := 0; i < len(rollers); i++ {
|
||||
// create mock provers.
|
||||
provers := make([]*mockProver, 6)
|
||||
for i := 0; i < len(provers); i++ {
|
||||
var proofType message.ProofType
|
||||
if i%2 == 0 {
|
||||
proofType = message.ProofTypeChunk
|
||||
} else {
|
||||
proofType = message.ProofTypeBatch
|
||||
}
|
||||
rollers[i] = newMockRoller(t, "roller_test"+strconv.Itoa(i), wsURL, proofType)
|
||||
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), wsURL, proofType)
|
||||
|
||||
// only roller 0 & 1 submit valid proofs.
|
||||
// only prover 0 & 1 submit valid proofs.
|
||||
proofStatus := generatedFailed
|
||||
if i <= 1 {
|
||||
proofStatus = verifiedSuccess
|
||||
}
|
||||
rollers[i].waitTaskAndSendProof(t, time.Second, false, proofStatus)
|
||||
provers[i].waitTaskAndSendProof(t, time.Second, false, proofStatus)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// close connection
|
||||
for _, roller := range rollers {
|
||||
roller.close()
|
||||
for _, prover := range provers {
|
||||
prover.close()
|
||||
}
|
||||
}()
|
||||
assert.Equal(t, 3, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeChunk))
|
||||
assert.Equal(t, 3, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeBatch))
|
||||
assert.Equal(t, 3, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeChunk))
|
||||
assert.Equal(t, 3, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeBatch))
|
||||
|
||||
err := l2BlockOrm.InsertL2Blocks(context.Background(), []*types.WrappedBlock{wrappedBlock1, wrappedBlock2})
|
||||
assert.NoError(t, err)
|
||||
@@ -350,26 +350,26 @@ func testInvalidProof(t *testing.T) {
|
||||
collector.Stop()
|
||||
}()
|
||||
|
||||
// create mock rollers.
|
||||
rollers := make([]*mockRoller, 6)
|
||||
for i := 0; i < len(rollers); i++ {
|
||||
// create mock provers.
|
||||
provers := make([]*mockProver, 6)
|
||||
for i := 0; i < len(provers); i++ {
|
||||
var proofType message.ProofType
|
||||
if i%2 == 0 {
|
||||
proofType = message.ProofTypeChunk
|
||||
} else {
|
||||
proofType = message.ProofTypeBatch
|
||||
}
|
||||
rollers[i] = newMockRoller(t, "roller_test"+strconv.Itoa(i), wsURL, proofType)
|
||||
rollers[i].waitTaskAndSendProof(t, time.Second, false, verifiedFailed)
|
||||
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), wsURL, proofType)
|
||||
provers[i].waitTaskAndSendProof(t, time.Second, false, verifiedFailed)
|
||||
}
|
||||
defer func() {
|
||||
// close connection
|
||||
for _, roller := range rollers {
|
||||
roller.close()
|
||||
for _, prover := range provers {
|
||||
prover.close()
|
||||
}
|
||||
}()
|
||||
assert.Equal(t, 3, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeChunk))
|
||||
assert.Equal(t, 3, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeBatch))
|
||||
assert.Equal(t, 3, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeChunk))
|
||||
assert.Equal(t, 3, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeBatch))
|
||||
|
||||
err := l2BlockOrm.InsertL2Blocks(context.Background(), []*types.WrappedBlock{wrappedBlock1, wrappedBlock2})
|
||||
assert.NoError(t, err)
|
||||
@@ -411,26 +411,26 @@ func testProofGeneratedFailed(t *testing.T) {
|
||||
collector.Stop()
|
||||
}()
|
||||
|
||||
// create mock rollers.
|
||||
rollers := make([]*mockRoller, 6)
|
||||
for i := 0; i < len(rollers); i++ {
|
||||
// create mock provers.
|
||||
provers := make([]*mockProver, 6)
|
||||
for i := 0; i < len(provers); i++ {
|
||||
var proofType message.ProofType
|
||||
if i%2 == 0 {
|
||||
proofType = message.ProofTypeChunk
|
||||
} else {
|
||||
proofType = message.ProofTypeBatch
|
||||
}
|
||||
rollers[i] = newMockRoller(t, "roller_test"+strconv.Itoa(i), wsURL, proofType)
|
||||
rollers[i].waitTaskAndSendProof(t, time.Second, false, generatedFailed)
|
||||
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), wsURL, proofType)
|
||||
provers[i].waitTaskAndSendProof(t, time.Second, false, generatedFailed)
|
||||
}
|
||||
defer func() {
|
||||
// close connection
|
||||
for _, roller := range rollers {
|
||||
roller.close()
|
||||
for _, prover := range provers {
|
||||
prover.close()
|
||||
}
|
||||
}()
|
||||
assert.Equal(t, 3, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeChunk))
|
||||
assert.Equal(t, 3, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeBatch))
|
||||
assert.Equal(t, 3, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeChunk))
|
||||
assert.Equal(t, 3, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeBatch))
|
||||
|
||||
err := l2BlockOrm.InsertL2Blocks(context.Background(), []*types.WrappedBlock{wrappedBlock1, wrappedBlock2})
|
||||
assert.NoError(t, err)
|
||||
@@ -472,16 +472,16 @@ func testTimeoutProof(t *testing.T) {
|
||||
collector.Stop()
|
||||
}()
|
||||
|
||||
// create first chunk & batch mock roller, that will not send any proof.
|
||||
chunkRoller1 := newMockRoller(t, "roller_test"+strconv.Itoa(0), wsURL, message.ProofTypeChunk)
|
||||
batchRoller1 := newMockRoller(t, "roller_test"+strconv.Itoa(1), wsURL, message.ProofTypeBatch)
|
||||
// create first chunk & batch mock prover, that will not send any proof.
|
||||
chunkProver1 := newMockProver(t, "prover_test"+strconv.Itoa(0), wsURL, message.ProofTypeChunk)
|
||||
batchProver1 := newMockProver(t, "prover_test"+strconv.Itoa(1), wsURL, message.ProofTypeBatch)
|
||||
defer func() {
|
||||
// close connection
|
||||
chunkRoller1.close()
|
||||
batchRoller1.close()
|
||||
chunkProver1.close()
|
||||
batchProver1.close()
|
||||
}()
|
||||
assert.Equal(t, 1, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeChunk))
|
||||
assert.Equal(t, 1, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeBatch))
|
||||
assert.Equal(t, 1, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeChunk))
|
||||
assert.Equal(t, 1, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeBatch))
|
||||
|
||||
err := l2BlockOrm.InsertL2Blocks(context.Background(), []*types.WrappedBlock{wrappedBlock1, wrappedBlock2})
|
||||
assert.NoError(t, err)
|
||||
@@ -492,7 +492,7 @@ func testTimeoutProof(t *testing.T) {
|
||||
err = batchOrm.UpdateChunkProofsStatusByBatchHash(context.Background(), batch.Hash, types.ChunkProofsStatusReady)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// verify proof status, it should be assigned, because roller didn't send any proof
|
||||
// verify proof status, it should be assigned, because prover didn't send any proof
|
||||
ok := utils.TryTimes(30, func() bool {
|
||||
chunkProofStatus, err := chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
|
||||
if err != nil {
|
||||
@@ -506,20 +506,20 @@ func testTimeoutProof(t *testing.T) {
|
||||
})
|
||||
assert.Falsef(t, !ok, "failed to check proof status")
|
||||
|
||||
// create second mock roller, that will send valid proof.
|
||||
chunkRoller2 := newMockRoller(t, "roller_test"+strconv.Itoa(2), wsURL, message.ProofTypeChunk)
|
||||
chunkRoller2.waitTaskAndSendProof(t, time.Second, false, verifiedSuccess)
|
||||
batchRoller2 := newMockRoller(t, "roller_test"+strconv.Itoa(3), wsURL, message.ProofTypeBatch)
|
||||
batchRoller2.waitTaskAndSendProof(t, time.Second, false, verifiedSuccess)
|
||||
// create second mock prover, that will send valid proof.
|
||||
chunkProver2 := newMockProver(t, "prover_test"+strconv.Itoa(2), wsURL, message.ProofTypeChunk)
|
||||
chunkProver2.waitTaskAndSendProof(t, time.Second, false, verifiedSuccess)
|
||||
batchProver2 := newMockProver(t, "prover_test"+strconv.Itoa(3), wsURL, message.ProofTypeBatch)
|
||||
batchProver2.waitTaskAndSendProof(t, time.Second, false, verifiedSuccess)
|
||||
defer func() {
|
||||
// close connection
|
||||
chunkRoller2.close()
|
||||
batchRoller2.close()
|
||||
chunkProver2.close()
|
||||
batchProver2.close()
|
||||
}()
|
||||
assert.Equal(t, 1, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeChunk))
|
||||
assert.Equal(t, 1, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeBatch))
|
||||
assert.Equal(t, 1, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeChunk))
|
||||
assert.Equal(t, 1, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeBatch))
|
||||
|
||||
// verify proof status, it should be verified now, because second roller sent valid proof
|
||||
// verify proof status, it should be verified now, because second prover sent valid proof
|
||||
ok = utils.TryTimes(200, func() bool {
|
||||
chunkProofStatus, err := chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
|
||||
if err != nil {
|
||||
@@ -534,7 +534,7 @@ func testTimeoutProof(t *testing.T) {
|
||||
assert.Falsef(t, !ok, "failed to check proof status")
|
||||
}
|
||||
|
||||
func testIdleRollerSelection(t *testing.T) {
|
||||
func testIdleProverSelection(t *testing.T) {
|
||||
// Setup coordinator and ws server.
|
||||
wsURL := "ws://" + randomURL()
|
||||
handler, collector := setupCoordinator(t, 1, wsURL, true)
|
||||
@@ -543,27 +543,27 @@ func testIdleRollerSelection(t *testing.T) {
|
||||
collector.Stop()
|
||||
}()
|
||||
|
||||
// create mock rollers.
|
||||
rollers := make([]*mockRoller, 20)
|
||||
for i := 0; i < len(rollers); i++ {
|
||||
// create mock provers.
|
||||
provers := make([]*mockProver, 20)
|
||||
for i := 0; i < len(provers); i++ {
|
||||
var proofType message.ProofType
|
||||
if i%2 == 0 {
|
||||
proofType = message.ProofTypeChunk
|
||||
} else {
|
||||
proofType = message.ProofTypeBatch
|
||||
}
|
||||
rollers[i] = newMockRoller(t, "roller_test"+strconv.Itoa(i), wsURL, proofType)
|
||||
rollers[i].waitTaskAndSendProof(t, time.Second, false, verifiedSuccess)
|
||||
provers[i] = newMockProver(t, "prover_test"+strconv.Itoa(i), wsURL, proofType)
|
||||
provers[i].waitTaskAndSendProof(t, time.Second, false, verifiedSuccess)
|
||||
}
|
||||
defer func() {
|
||||
// close connection
|
||||
for _, roller := range rollers {
|
||||
roller.close()
|
||||
for _, prover := range provers {
|
||||
prover.close()
|
||||
}
|
||||
}()
|
||||
|
||||
assert.Equal(t, len(rollers)/2, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeChunk))
|
||||
assert.Equal(t, len(rollers)/2, rollermanager.Manager.GetNumberOfIdleRollers(message.ProofTypeBatch))
|
||||
assert.Equal(t, len(provers)/2, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeChunk))
|
||||
assert.Equal(t, len(provers)/2, provermanager.Manager.GetNumberOfIdleProvers(message.ProofTypeBatch))
|
||||
|
||||
err := l2BlockOrm.InsertL2Blocks(context.Background(), []*types.WrappedBlock{wrappedBlock1, wrappedBlock2})
|
||||
assert.NoError(t, err)
|
||||
@@ -610,24 +610,24 @@ func testGracefulRestart(t *testing.T) {
|
||||
err = chunkOrm.UpdateBatchHashInRange(context.Background(), 0, 0, batch.Hash)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// create mock roller
|
||||
chunkRoller := newMockRoller(t, "roller_test", wsURL, message.ProofTypeChunk)
|
||||
batchRoller := newMockRoller(t, "roller_test", wsURL, message.ProofTypeBatch)
|
||||
// wait 10 seconds, coordinator restarts before roller submits proof
|
||||
chunkRoller.waitTaskAndSendProof(t, 10*time.Second, false, verifiedSuccess)
|
||||
batchRoller.waitTaskAndSendProof(t, 10*time.Second, false, verifiedSuccess)
|
||||
// create mock prover
|
||||
chunkProver := newMockProver(t, "prover_test", wsURL, message.ProofTypeChunk)
|
||||
batchProver := newMockProver(t, "prover_test", wsURL, message.ProofTypeBatch)
|
||||
// wait 10 seconds, coordinator restarts before prover submits proof
|
||||
chunkProver.waitTaskAndSendProof(t, 10*time.Second, false, verifiedSuccess)
|
||||
batchProver.waitTaskAndSendProof(t, 10*time.Second, false, verifiedSuccess)
|
||||
|
||||
// wait for coordinator to dispatch task
|
||||
<-time.After(5 * time.Second)
|
||||
// the coordinator will delete the roller if the subscription is closed.
|
||||
chunkRoller.close()
|
||||
batchRoller.close()
|
||||
// the coordinator will delete the prover if the subscription is closed.
|
||||
chunkProver.close()
|
||||
batchProver.close()
|
||||
|
||||
provingStatus, err := chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, types.ProvingTaskAssigned, provingStatus)
|
||||
|
||||
// Close rollerManager and ws handler.
|
||||
// Close proverManager and ws handler.
|
||||
handler.Shutdown(context.Background())
|
||||
collector.Stop()
|
||||
|
||||
@@ -638,7 +638,7 @@ func testGracefulRestart(t *testing.T) {
|
||||
newCollector.Stop()
|
||||
}()
|
||||
|
||||
// at this point, roller haven't submitted
|
||||
// at this point, prover haven't submitted
|
||||
status, err := chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, types.ProvingTaskAssigned, status)
|
||||
@@ -646,12 +646,12 @@ func testGracefulRestart(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, types.ProvingTaskUnassigned, status) // chunk proofs not ready yet
|
||||
|
||||
// will overwrite the roller client for `SubmitProof`
|
||||
chunkRoller.waitTaskAndSendProof(t, time.Second, true, verifiedSuccess)
|
||||
batchRoller.waitTaskAndSendProof(t, time.Second, true, verifiedSuccess)
|
||||
// will overwrite the prover client for `SubmitProof`
|
||||
chunkProver.waitTaskAndSendProof(t, time.Second, true, verifiedSuccess)
|
||||
batchProver.waitTaskAndSendProof(t, time.Second, true, verifiedSuccess)
|
||||
defer func() {
|
||||
chunkRoller.close()
|
||||
batchRoller.close()
|
||||
chunkProver.close()
|
||||
batchProver.close()
|
||||
}()
|
||||
|
||||
// verify proof status
|
||||
@@ -662,8 +662,8 @@ func testGracefulRestart(t *testing.T) {
|
||||
for {
|
||||
select {
|
||||
case <-tick:
|
||||
// this proves that the roller submits to the new coordinator,
|
||||
// because the roller client for `submitProof` has been overwritten
|
||||
// this proves that the prover submits to the new coordinator,
|
||||
// because the prover client for `submitProof` has been overwritten
|
||||
chunkProofStatus, err := chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
|
||||
assert.NoError(t, err)
|
||||
batchProofStatus, err := batchOrm.GetProvingStatusByHash(context.Background(), batch.Hash)
|
||||
|
||||
@@ -25,8 +25,8 @@ const (
|
||||
generatedFailed
|
||||
)
|
||||
|
||||
type mockRoller struct {
|
||||
rollerName string
|
||||
type mockProver struct {
|
||||
proverName string
|
||||
privKey *ecdsa.PrivateKey
|
||||
proofType message.ProofType
|
||||
|
||||
@@ -40,26 +40,26 @@ type mockRoller struct {
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
func newMockRoller(t *testing.T, rollerName string, wsURL string, proofType message.ProofType) *mockRoller {
|
||||
func newMockProver(t *testing.T, proverName string, wsURL string, proofType message.ProofType) *mockProver {
|
||||
privKey, err := crypto.GenerateKey()
|
||||
assert.NoError(t, err)
|
||||
|
||||
roller := &mockRoller{
|
||||
rollerName: rollerName,
|
||||
prover := &mockProver{
|
||||
proverName: proverName,
|
||||
privKey: privKey,
|
||||
proofType: proofType,
|
||||
wsURL: wsURL,
|
||||
taskCh: make(chan *message.TaskMsg, 4),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
roller.client, roller.sub, err = roller.connectToCoordinator()
|
||||
prover.client, prover.sub, err = prover.connectToCoordinator()
|
||||
assert.NoError(t, err)
|
||||
|
||||
return roller
|
||||
return prover
|
||||
}
|
||||
|
||||
// connectToCoordinator sets up a websocket client to connect to the roller manager.
|
||||
func (r *mockRoller) connectToCoordinator() (*client2.Client, ethereum.Subscription, error) {
|
||||
// connectToCoordinator sets up a websocket client to connect to the prover manager.
|
||||
func (r *mockProver) connectToCoordinator() (*client2.Client, ethereum.Subscription, error) {
|
||||
// Create connection.
|
||||
client, err := client2.Dial(r.wsURL)
|
||||
if err != nil {
|
||||
@@ -69,8 +69,8 @@ func (r *mockRoller) connectToCoordinator() (*client2.Client, ethereum.Subscript
|
||||
// create a new ws connection
|
||||
authMsg := &message.AuthMsg{
|
||||
Identity: &message.Identity{
|
||||
Name: r.rollerName,
|
||||
RollerType: r.proofType,
|
||||
Name: r.proverName,
|
||||
ProverType: r.proofType,
|
||||
},
|
||||
}
|
||||
_ = authMsg.SignWithKey(r.privKey)
|
||||
@@ -90,7 +90,7 @@ func (r *mockRoller) connectToCoordinator() (*client2.Client, ethereum.Subscript
|
||||
return client, sub, nil
|
||||
}
|
||||
|
||||
func (r *mockRoller) releaseTasks() {
|
||||
func (r *mockProver) releaseTasks() {
|
||||
r.taskCache.Range(func(key, value any) bool {
|
||||
r.taskCh <- value.(*message.TaskMsg)
|
||||
r.taskCache.Delete(key)
|
||||
@@ -98,10 +98,10 @@ func (r *mockRoller) releaseTasks() {
|
||||
})
|
||||
}
|
||||
|
||||
// Wait for the proof task, after receiving the proof task, roller submits proof after proofTime secs.
|
||||
func (r *mockRoller) waitTaskAndSendProof(t *testing.T, proofTime time.Duration, reconnect bool, proofStatus proofStatus) {
|
||||
// simulating the case that the roller first disconnects and then reconnects to the coordinator
|
||||
// the Subscription and its `Err()` channel will be closed, and the coordinator will `freeRoller()`
|
||||
// Wait for the proof task, after receiving the proof task, prover submits proof after proofTime secs.
|
||||
func (r *mockProver) waitTaskAndSendProof(t *testing.T, proofTime time.Duration, reconnect bool, proofStatus proofStatus) {
|
||||
// simulating the case that the prover first disconnects and then reconnects to the coordinator
|
||||
// the Subscription and its `Err()` channel will be closed, and the coordinator will `freeProver()`
|
||||
if reconnect {
|
||||
var err error
|
||||
r.client, r.sub, err = r.connectToCoordinator()
|
||||
@@ -118,7 +118,7 @@ func (r *mockRoller) waitTaskAndSendProof(t *testing.T, proofTime time.Duration,
|
||||
go r.loop(t, r.client, proofTime, proofStatus, r.stopCh)
|
||||
}
|
||||
|
||||
func (r *mockRoller) loop(t *testing.T, client *client2.Client, proofTime time.Duration, proofStatus proofStatus, stopCh chan struct{}) {
|
||||
func (r *mockProver) loop(t *testing.T, client *client2.Client, proofTime time.Duration, proofStatus proofStatus, stopCh chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
case task := <-r.taskCh:
|
||||
@@ -150,7 +150,7 @@ func (r *mockRoller) loop(t *testing.T, client *client2.Client, proofTime time.D
|
||||
}
|
||||
}
|
||||
|
||||
func (r *mockRoller) close() {
|
||||
func (r *mockProver) close() {
|
||||
close(r.stopCh)
|
||||
r.sub.Unsubscribe()
|
||||
}
|
||||
@@ -32,7 +32,7 @@ comment
|
||||
on column prover_task.task_type is 'undefined, chunk, batch';
|
||||
|
||||
comment
|
||||
on column prover_task.proving_status is 'undefined, roller assigned, roller proof valid, roller proof invalid';
|
||||
on column prover_task.proving_status is 'undefined, prover assigned, prover proof valid, prover proof invalid';
|
||||
|
||||
comment
|
||||
on column prover_task.failure_type is 'undefined';
|
||||
|
||||
2
go.work
2
go.work
@@ -7,6 +7,6 @@ use (
|
||||
./coordinator
|
||||
./database
|
||||
./prover-stats-api
|
||||
./roller
|
||||
./prover
|
||||
./tests/integration-test
|
||||
)
|
||||
|
||||
@@ -250,7 +250,7 @@ var SwaggerInfo = &swag.Spec{
|
||||
Host: "localhost:8990",
|
||||
BasePath: "/api/v1",
|
||||
Schemes: []string{},
|
||||
Title: "Zero-knowledge Prover Stats API",
|
||||
Title: "Scroll Prover Stats API",
|
||||
Description: "This is an API server for Provers.",
|
||||
InfoInstanceName: "swagger",
|
||||
SwaggerTemplate: docTemplate,
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
"swagger": "2.0",
|
||||
"info": {
|
||||
"description": "This is an API server for Provers.",
|
||||
"title": "Zero-knowledge Prover Stats API",
|
||||
"title": "Scroll Prover Stats API",
|
||||
"contact": {
|
||||
"name": "Prover Stats API Support",
|
||||
"email": "Be Pending"
|
||||
|
||||
@@ -38,7 +38,7 @@ info:
|
||||
license:
|
||||
name: Apache 2.0
|
||||
url: http://www.apache.org/licenses/LICENSE-2.0.html
|
||||
title: Zero-knowledge Prover Stats API
|
||||
title: Scroll Prover Stats API
|
||||
version: "1.0"
|
||||
paths:
|
||||
/api/prover_task/v1/request_token:
|
||||
|
||||
@@ -26,7 +26,7 @@ var (
|
||||
TaskID: "1",
|
||||
ProverPublicKey: proverPubkey,
|
||||
ProverName: "prover-0",
|
||||
ProvingStatus: int16(types.RollerAssigned),
|
||||
ProvingStatus: int16(types.ProverAssigned),
|
||||
Reward: decimal.NewFromInt(10),
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ var (
|
||||
TaskID: "2",
|
||||
ProverPublicKey: proverPubkey,
|
||||
ProverName: "prover-1",
|
||||
ProvingStatus: int16(types.RollerAssigned),
|
||||
ProvingStatus: int16(types.ProverAssigned),
|
||||
Reward: decimal.NewFromInt(12),
|
||||
}
|
||||
|
||||
|
||||
@@ -67,9 +67,9 @@ func TestProverTaskOrm(t *testing.T) {
|
||||
|
||||
proverTask := ProverTask{
|
||||
TaskID: "test-hash",
|
||||
ProverName: "roller-0",
|
||||
ProverName: "prover-0",
|
||||
ProverPublicKey: "0",
|
||||
ProvingStatus: int16(types.RollerAssigned),
|
||||
ProvingStatus: int16(types.ProverAssigned),
|
||||
Reward: decimal.NewFromBigInt(reward, 0),
|
||||
}
|
||||
|
||||
@@ -84,7 +84,7 @@ func TestProverTaskOrm(t *testing.T) {
|
||||
assert.Equal(t, resultReward, reward)
|
||||
assert.Equal(t, resultReward.String(), "18446744073709551616")
|
||||
|
||||
proverTask.ProvingStatus = int16(types.RollerProofValid)
|
||||
proverTask.ProvingStatus = int16(types.ProverProofValid)
|
||||
err = proverTaskOrm.SetProverTask(context.Background(), &proverTask)
|
||||
assert.NoError(t, err)
|
||||
getTask, err = proverTaskOrm.GetProverTasksByHash(context.Background(), "test-hash")
|
||||
@@ -102,9 +102,9 @@ func TestProverTaskOrmUint256(t *testing.T) {
|
||||
rewardUint256.SetString("115792089237316195423570985008687907853269984665640564039457584007913129639935", 10)
|
||||
proverTask := ProverTask{
|
||||
TaskID: "test-hash",
|
||||
ProverName: "roller-0",
|
||||
ProverName: "prover-0",
|
||||
ProverPublicKey: "0",
|
||||
ProvingStatus: int16(types.RollerAssigned),
|
||||
ProvingStatus: int16(types.ProverAssigned),
|
||||
Reward: decimal.NewFromBigInt(rewardUint256, 0),
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// ProverTask is assigned rollers info of chunk/batch proof prover task
|
||||
// ProverTask is assigned provers info of chunk/batch proof prover task
|
||||
type ProverTask struct {
|
||||
db *gorm.DB `gorm:"column:-"`
|
||||
|
||||
|
||||
@@ -123,7 +123,7 @@ var (
|
||||
TaskID: "1",
|
||||
ProverPublicKey: proverPubkey,
|
||||
ProverName: "prover-0",
|
||||
ProvingStatus: int16(types.RollerAssigned),
|
||||
ProvingStatus: int16(types.ProverAssigned),
|
||||
Reward: decimal.NewFromInt(10),
|
||||
}
|
||||
|
||||
@@ -131,7 +131,7 @@ var (
|
||||
TaskID: "2",
|
||||
ProverPublicKey: proverPubkey,
|
||||
ProverName: "prover-1",
|
||||
ProvingStatus: int16(types.RollerAssigned),
|
||||
ProvingStatus: int16(types.ProverAssigned),
|
||||
Reward: decimal.NewFromInt(12),
|
||||
}
|
||||
)
|
||||
|
||||
0
roller/.gitignore → prover/.gitignore
vendored
0
roller/.gitignore → prover/.gitignore
vendored
@@ -1,4 +1,4 @@
|
||||
.PHONY: lint docker clean roller mock_roller
|
||||
.PHONY: lint docker clean prover mock_prover
|
||||
|
||||
ifeq (4.3,$(firstword $(sort $(MAKE_VERSION) 4.3)))
|
||||
ZKEVM_VERSION=$(shell grep -m 1 "scroll-prover" ../common/libzkp/impl/Cargo.lock | cut -d "#" -f2 | cut -c-7)
|
||||
@@ -20,17 +20,17 @@ endif
|
||||
|
||||
libzkp:
|
||||
cd ../common/libzkp/impl && cargo clean && cargo build --release && cp ./target/release/libzkp.so ../interface/
|
||||
rm -rf ./prover/lib && cp -r ../common/libzkp/interface ./prover/lib
|
||||
find ../common | grep libzktrie.so | xargs -I{} cp {} ./prover/lib/
|
||||
rm -rf ./core/lib && cp -r ../common/libzkp/interface ./core/lib
|
||||
find ../common | grep libzktrie.so | xargs -I{} cp {} ./core/lib/
|
||||
|
||||
roller: libzkp ## Build the Roller instance.
|
||||
GOBIN=$(PWD)/build/bin go build -ldflags "-X scroll-tech/common/version.ZkVersion=${ZK_VERSION}" -o $(PWD)/build/bin/roller ./cmd
|
||||
prover: libzkp ## Build the prover instance.
|
||||
GOBIN=$(PWD)/build/bin go build -ldflags "-X scroll-tech/common/version.ZkVersion=${ZK_VERSION}" -o $(PWD)/build/bin/prover ./cmd
|
||||
|
||||
mock_roller: ## Build the mocked Roller instance.
|
||||
GOBIN=$(PWD)/build/bin go build -tags="mock_prover mock_verifier" -o $(PWD)/build/bin/roller ./cmd
|
||||
mock_prover: ## Build the mocked prover instance.
|
||||
GOBIN=$(PWD)/build/bin go build -tags="mock_prover mock_verifier" -o $(PWD)/build/bin/prover ./cmd
|
||||
|
||||
gpu-roller: libzkp ## Build the GPU Roller instance.
|
||||
GOBIN=$(PWD)/build/bin go build -ldflags "-X scroll-tech/common/version.ZkVersion=${ZK_VERSION}" -tags gpu -o $(PWD)/build/bin/roller ./cmd
|
||||
gpu-prover: libzkp ## Build the GPU prover instance.
|
||||
GOBIN=$(PWD)/build/bin go build -ldflags "-X scroll-tech/common/version.ZkVersion=${ZK_VERSION}" -tags gpu -o $(PWD)/build/bin/prover ./cmd
|
||||
|
||||
test-prover: libzkp
|
||||
go test -tags ffi -timeout 0 -v ./prover
|
||||
@@ -42,7 +42,7 @@ lastest-zk-version:
|
||||
curl -sL https://api.github.com/repos/scroll-tech/scroll-prover/commits | jq -r ".[0].sha"
|
||||
|
||||
lint: ## Lint the files - used for CI
|
||||
cp -r ../common/libzkp/interface ./prover/lib
|
||||
cp -r ../common/libzkp/interface ./core/lib
|
||||
GOBIN=$(PWD)/build/bin go run ../build/lint.go
|
||||
|
||||
clean: ## Empty out the bin folder
|
||||
@@ -1,12 +1,12 @@
|
||||
# Roller
|
||||
# Prover
|
||||
|
||||
This directory contains the Scroll prover (aka "roller") module.
|
||||
This directory contains the Scroll prover (aka "prover") module.
|
||||
|
||||
|
||||
## Build
|
||||
```bash
|
||||
make clean
|
||||
make roller
|
||||
make prover
|
||||
```
|
||||
The built prover binary is in the build/bin directory.
|
||||
|
||||
@@ -22,7 +22,7 @@ make lint
|
||||
For current unit tests, run:
|
||||
|
||||
```bash
|
||||
make roller
|
||||
make prover
|
||||
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./prover/lib
|
||||
export CHAIN_ID=534353 # for Scroll Alpha
|
||||
go test -v ./...
|
||||
@@ -31,13 +31,13 @@ go test -v ./...
|
||||
When you need to mock prover results and run other prover tests (using [`prover/mock.go`](prover/mock.go) instead of [`prover/prover.go`](prover/prover.go)), run:
|
||||
|
||||
```bash
|
||||
go test -tags="mock_prover" -v -race -covermode=atomic scroll-tech/roller/...
|
||||
go test -tags="mock_prover" -v -race -covermode=atomic scroll-tech/prover/...
|
||||
```
|
||||
|
||||
|
||||
## Configure
|
||||
|
||||
The prover behavior can be configured using [`config.json`](config.json). Check the code comments of `Config` and `ProverConfig` in [`config/config.go`](config/config.go), and `NewRoller` in [`roller.go`](roller.go) for more details.
|
||||
The prover behavior can be configured using [`config.json`](conf/config.json). Check the code comments of `Config` and `ProverConfig` in [`config/config.go`](config/config.go), and `NewProver` in [`prover.go`](prover.go) for more details.
|
||||
|
||||
|
||||
## Start
|
||||
@@ -53,7 +53,7 @@ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./prover/lib
|
||||
2. Start the module using settings from config.json:
|
||||
|
||||
```bash
|
||||
./build/bin/roller
|
||||
./build/bin/prover
|
||||
```
|
||||
|
||||
|
||||
@@ -61,7 +61,7 @@ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./prover/lib
|
||||
|
||||
## `cmd/app/app.go`
|
||||
|
||||
This file defines the main entry point for the prover application, initializes prover instances via roller.go, and handles graceful shutdowns. The prover (`cmd/app/app.go`) calls `NewRoller` with config.json parsed and cfg passed to `roller.go`. It then starts creating new instances of provers via `r.Start` and starts the main processing loop for generating proofs dispatched from the coordinator.
|
||||
This file defines the main entry point for the prover application, initializes prover instances via prover.go, and handles graceful shutdowns. The prover (`cmd/app/app.go`) calls `NewProver` with config.json parsed and cfg passed to `prover.go`. It then starts creating new instances of provers via `r.Start` and starts the main processing loop for generating proofs dispatched from the coordinator.
|
||||
|
||||
Multiple prover can be started separately and registered with the coordinator via its API.
|
||||
|
||||
@@ -71,11 +71,11 @@ Multiple prover can be started separately and registered with the coordinator vi
|
||||
This file wrapps mock app functions and is used in the [integration test](../tests/integration-test/).
|
||||
|
||||
|
||||
## `roller.go`
|
||||
## `prover.go`
|
||||
|
||||
This file contains the logic of the `roller`, including starting it, registering with the coordinator, handling tasks from the coordinator, and running the proving loop. The `roller` interacts with `prover` and `stack` to perform its functions.
|
||||
This file contains the logic of the `prover`, including starting it, registering with the coordinator, handling tasks from the coordinator, and running the proving loop. The `prover` interacts with `prover` and `stack` to perform its functions.
|
||||
|
||||
`NewRoller`: A constructor function for creating a new `Roller` instance. It initializes it with the provided configuration, loads or creates a private key, initializes the `Stack` and `Prover` instances, and sets up a client connection to the coordinator.
|
||||
`NewProver`: A constructor function for creating a new `Prover` instance. It initializes it with the provided configuration, loads or creates a private key, initializes the `Stack` and `Prover` instances, and sets up a client connection to the coordinator.
|
||||
|
||||
`Start`: Starts the prover and registers it with the coordinator. It contains `Register`, `HandleCoordinator` and `ProveLoop`:
|
||||
|
||||
@@ -88,7 +88,7 @@ This file contains the logic of the `roller`, including starting it, registering
|
||||
Refer to the functions in `stack`, `prover` and `client` modules for more detail.
|
||||
|
||||
|
||||
## `prover/prover.go`
|
||||
## `core/prover.go`
|
||||
|
||||
This file focuses on the go implementation of the `Prover` struct which is responsible for generating proofs from tasks provided by the coordinator. It handles interactions with the rust-prover library via FFI. Refer to `create_agg_proof_multi` in [`../common/libzkp/impl/src/prove.rs`](../common/libzkp/impl/src/prove.rs) for more detail.
|
||||
|
||||
@@ -97,6 +97,6 @@ This file focuses on the go implementation of the `Prover` struct which is respo
|
||||
This file is responsible for managing task storage and retrieval for the prover. It uses a [BBolt database](https://github.com/etcd-io/bbolt) to store tasks and provides various functions like `Push`, `Peek`, `Delete` and `UpdateTimes` in order to interact with them.
|
||||
|
||||
|
||||
## `roller_metrics.go`
|
||||
## `prover_metrics.go`
|
||||
|
||||
This file is called from [`../coordinator/roller_metrics.go`](../coordinator/roller_metrics.go) and is used to collect metrics from the prover.
|
||||
This file is called from [`../coordinator/internal/logic/provermanager/metrics.go`](../coordinator/internal/logic/provermanager/metrics.go) and is used to collect metrics from the prover.
|
||||
@@ -8,12 +8,12 @@ import (
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
"scroll-tech/roller"
|
||||
"scroll-tech/prover"
|
||||
|
||||
"scroll-tech/common/utils"
|
||||
"scroll-tech/common/version"
|
||||
|
||||
"scroll-tech/roller/config"
|
||||
"scroll-tech/prover/config"
|
||||
)
|
||||
|
||||
var app *cli.App
|
||||
@@ -21,16 +21,16 @@ var app *cli.App
|
||||
func init() {
|
||||
app = cli.NewApp()
|
||||
app.Action = action
|
||||
app.Name = "roller"
|
||||
app.Usage = "The Scroll L2 Roller"
|
||||
app.Name = "prover"
|
||||
app.Usage = "The Scroll L2 Prover"
|
||||
app.Version = version.Version
|
||||
app.Flags = append(app.Flags, utils.CommonFlags...)
|
||||
app.Before = func(ctx *cli.Context) error {
|
||||
return utils.LogSetup(ctx)
|
||||
}
|
||||
|
||||
// Register `roller-test` app for integration-test.
|
||||
utils.RegisterSimulation(app, utils.RollerApp)
|
||||
// Register `prover-test` app for integration-test.
|
||||
utils.RegisterSimulation(app, utils.ProverApp)
|
||||
}
|
||||
|
||||
func action(ctx *cli.Context) error {
|
||||
@@ -41,16 +41,16 @@ func action(ctx *cli.Context) error {
|
||||
log.Crit("failed to load config file", "config file", cfgFile, "error", err)
|
||||
}
|
||||
|
||||
// Create roller
|
||||
r, err := roller.NewRoller(cfg)
|
||||
// Create prover
|
||||
r, err := prover.NewProver(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Start roller.
|
||||
// Start prover.
|
||||
r.Start()
|
||||
|
||||
defer r.Stop()
|
||||
log.Info("roller start successfully", "name", cfg.RollerName, "publickey", r.PublicKey(), "version", version.Version)
|
||||
log.Info("prover start successfully", "name", cfg.ProverName, "publickey", r.PublicKey(), "version", version.Version)
|
||||
|
||||
// Catch CTRL-C to ensure a graceful shutdown.
|
||||
interrupt := make(chan os.Signal, 1)
|
||||
@@ -62,7 +62,7 @@ func action(ctx *cli.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run the roller cmd func.
|
||||
// Run the prover cmd func.
|
||||
func Run() {
|
||||
if err := app.Run(os.Args); err != nil {
|
||||
_, _ = fmt.Fprintln(os.Stderr, err)
|
||||
19
prover/cmd/app/app_test.go
Normal file
19
prover/cmd/app/app_test.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"scroll-tech/common/cmd"
|
||||
"scroll-tech/common/version"
|
||||
)
|
||||
|
||||
func TestRunProver(t *testing.T) {
|
||||
prover := cmd.NewCmd("prover-test", "--version")
|
||||
defer prover.WaitExit()
|
||||
|
||||
// wait result
|
||||
prover.ExpectWithTimeout(t, true, time.Second*3, fmt.Sprintf("prover version %s", version.Version))
|
||||
prover.RunApp(nil)
|
||||
}
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
rollerConfig "scroll-tech/roller/config"
|
||||
proverConfig "scroll-tech/prover/config"
|
||||
|
||||
"scroll-tech/common/cmd"
|
||||
"scroll-tech/common/docker"
|
||||
@@ -18,22 +18,22 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
rollerIndex int
|
||||
proverIndex int
|
||||
)
|
||||
|
||||
func getIndex() int {
|
||||
defer func() { rollerIndex++ }()
|
||||
return rollerIndex
|
||||
defer func() { proverIndex++ }()
|
||||
return proverIndex
|
||||
}
|
||||
|
||||
// RollerApp roller-test client manager.
|
||||
type RollerApp struct {
|
||||
Config *rollerConfig.Config
|
||||
// ProverApp prover-test client manager.
|
||||
type ProverApp struct {
|
||||
Config *proverConfig.Config
|
||||
|
||||
base *docker.App
|
||||
|
||||
originFile string
|
||||
rollerFile string
|
||||
proverFile string
|
||||
bboltDB string
|
||||
|
||||
index int
|
||||
@@ -42,48 +42,48 @@ type RollerApp struct {
|
||||
docker.AppAPI
|
||||
}
|
||||
|
||||
// NewRollerApp return a new rollerApp manager.
|
||||
func NewRollerApp(base *docker.App, file string, wsURL string) *RollerApp {
|
||||
rollerFile := fmt.Sprintf("/tmp/%d_roller-config.json", base.Timestamp)
|
||||
rollerApp := &RollerApp{
|
||||
// NewProverApp return a new proverApp manager.
|
||||
func NewProverApp(base *docker.App, file string, wsURL string) *ProverApp {
|
||||
proverFile := fmt.Sprintf("/tmp/%d_prover-config.json", base.Timestamp)
|
||||
proverApp := &ProverApp{
|
||||
base: base,
|
||||
originFile: file,
|
||||
rollerFile: rollerFile,
|
||||
proverFile: proverFile,
|
||||
bboltDB: fmt.Sprintf("/tmp/%d_bbolt_db", base.Timestamp),
|
||||
index: getIndex(),
|
||||
name: string(utils.RollerApp),
|
||||
args: []string{"--log.debug", "--config", rollerFile},
|
||||
name: string(utils.ProverApp),
|
||||
args: []string{"--log.debug", "--config", proverFile},
|
||||
}
|
||||
if err := rollerApp.MockConfig(true, wsURL); err != nil {
|
||||
if err := proverApp.MockConfig(true, wsURL); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return rollerApp
|
||||
return proverApp
|
||||
}
|
||||
|
||||
// RunApp run roller-test child process by multi parameters.
|
||||
func (r *RollerApp) RunApp(t *testing.T, args ...string) {
|
||||
// RunApp run prover-test child process by multi parameters.
|
||||
func (r *ProverApp) RunApp(t *testing.T, args ...string) {
|
||||
r.AppAPI = cmd.NewCmd(r.name, append(r.args, args...)...)
|
||||
r.AppAPI.RunApp(func() bool { return r.AppAPI.WaitResult(t, time.Second*40, "roller start successfully") })
|
||||
r.AppAPI.RunApp(func() bool { return r.AppAPI.WaitResult(t, time.Second*40, "prover start successfully") })
|
||||
}
|
||||
|
||||
// Free stop and release roller-test.
|
||||
func (r *RollerApp) Free() {
|
||||
// Free stop and release prover-test.
|
||||
func (r *ProverApp) Free() {
|
||||
if !utils.IsNil(r.AppAPI) {
|
||||
r.AppAPI.WaitExit()
|
||||
}
|
||||
_ = os.Remove(r.rollerFile)
|
||||
_ = os.Remove(r.proverFile)
|
||||
_ = os.Remove(r.Config.KeystorePath)
|
||||
_ = os.Remove(r.bboltDB)
|
||||
}
|
||||
|
||||
// MockConfig creates a new roller config.
|
||||
func (r *RollerApp) MockConfig(store bool, wsURL string) error {
|
||||
cfg, err := rollerConfig.NewConfig(r.originFile)
|
||||
// MockConfig creates a new prover config.
|
||||
func (r *ProverApp) MockConfig(store bool, wsURL string) error {
|
||||
cfg, err := proverConfig.NewConfig(r.originFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg.RollerName = fmt.Sprintf("%s_%d", r.name, r.index)
|
||||
cfg.KeystorePath = fmt.Sprintf("/tmp/%d_%s.json", r.base.Timestamp, cfg.RollerName)
|
||||
cfg.ProverName = fmt.Sprintf("%s_%d", r.name, r.index)
|
||||
cfg.KeystorePath = fmt.Sprintf("/tmp/%d_%s.json", r.base.Timestamp, cfg.ProverName)
|
||||
cfg.TraceEndpoint = r.base.L2gethImg.Endpoint()
|
||||
// Reuse l1geth's keystore file
|
||||
cfg.KeystorePassword = "scrolltest"
|
||||
@@ -104,14 +104,14 @@ func (r *RollerApp) MockConfig(store bool, wsURL string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.WriteFile(r.rollerFile, data, 0600)
|
||||
return os.WriteFile(r.proverFile, data, 0600)
|
||||
}
|
||||
|
||||
// RollerApps rollerApp list.
|
||||
type RollerApps []*RollerApp
|
||||
// ProverApps proverApp list.
|
||||
type ProverApps []*ProverApp
|
||||
|
||||
// RunApps starts all the rollerApps.
|
||||
func (r RollerApps) RunApps(t *testing.T, args ...string) {
|
||||
// RunApps starts all the proverApps.
|
||||
func (r ProverApps) RunApps(t *testing.T, args ...string) {
|
||||
var eg errgroup.Group
|
||||
for i := range r {
|
||||
i := i
|
||||
@@ -123,20 +123,20 @@ func (r RollerApps) RunApps(t *testing.T, args ...string) {
|
||||
_ = eg.Wait()
|
||||
}
|
||||
|
||||
// MockConfigs creates all the rollerApps' configs.
|
||||
func (r RollerApps) MockConfigs(store bool, wsURL string) error {
|
||||
// MockConfigs creates all the proverApps' configs.
|
||||
func (r ProverApps) MockConfigs(store bool, wsURL string) error {
|
||||
var eg errgroup.Group
|
||||
for _, roller := range r {
|
||||
roller := roller
|
||||
for _, prover := range r {
|
||||
prover := prover
|
||||
eg.Go(func() error {
|
||||
return roller.MockConfig(store, wsURL)
|
||||
return prover.MockConfig(store, wsURL)
|
||||
})
|
||||
}
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
// Free releases rollerApps.
|
||||
func (r RollerApps) Free() {
|
||||
// Free releases proverApps.
|
||||
func (r ProverApps) Free() {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(r))
|
||||
for i := range r {
|
||||
@@ -149,8 +149,8 @@ func (r RollerApps) Free() {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// WaitExit wait rollerApps stopped.
|
||||
func (r RollerApps) WaitExit() {
|
||||
// WaitExit wait proverApps stopped.
|
||||
func (r ProverApps) WaitExit() {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(r))
|
||||
for i := range r {
|
||||
@@ -1,6 +1,6 @@
|
||||
package main
|
||||
|
||||
import "scroll-tech/roller/cmd/app"
|
||||
import "scroll-tech/prover/cmd/app"
|
||||
|
||||
func main() {
|
||||
app.Run()
|
||||
@@ -1,10 +1,10 @@
|
||||
{
|
||||
"roller_name": "my_roller",
|
||||
"prover_name": "my_prover",
|
||||
"keystore_path": "keystore.json",
|
||||
"keystore_password": "roller-pwd",
|
||||
"keystore_password": "prover-pwd",
|
||||
"coordinator_url": "ws://localhost:8391",
|
||||
"db_path": "bbolt_db",
|
||||
"prover": {
|
||||
"core": {
|
||||
"params_path": "params",
|
||||
"seed_path": "seed"
|
||||
}
|
||||
@@ -10,22 +10,22 @@ import (
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
)
|
||||
|
||||
// Config loads roller configuration items.
|
||||
// Config loads prover configuration items.
|
||||
type Config struct {
|
||||
RollerName string `json:"roller_name"`
|
||||
KeystorePath string `json:"keystore_path"`
|
||||
KeystorePassword string `json:"keystore_password"`
|
||||
CoordinatorURL string `json:"coordinator_url"`
|
||||
TraceEndpoint string `json:"trace_endpoint"`
|
||||
Prover *ProverConfig `json:"prover"`
|
||||
DBPath string `json:"db_path"`
|
||||
ProverName string `json:"prover_name"`
|
||||
KeystorePath string `json:"keystore_path"`
|
||||
KeystorePassword string `json:"keystore_password"`
|
||||
CoordinatorURL string `json:"coordinator_url"`
|
||||
TraceEndpoint string `json:"trace_endpoint"`
|
||||
Core *ProverCoreConfig `json:"core"`
|
||||
DBPath string `json:"db_path"`
|
||||
}
|
||||
|
||||
// ProverConfig load zk prover config.
|
||||
type ProverConfig struct {
|
||||
// ProverCoreConfig load zk prover config.
|
||||
type ProverCoreConfig struct {
|
||||
ParamsPath string `json:"params_path"`
|
||||
SeedPath string `json:"seed_path"`
|
||||
ProofType message.ProofType `json:"prove_type,omitempty"` // 0: basic roller (default type), 1: aggregator roller
|
||||
ProofType message.ProofType `json:"proof_type,omitempty"` // 0: chunk proof (default type), 1: batch proof
|
||||
DumpDir string `json:"dump_dir,omitempty"`
|
||||
}
|
||||
|
||||
34
prover/core/mock.go
Normal file
34
prover/core/mock.go
Normal file
@@ -0,0 +1,34 @@
|
||||
//go:build mock_prover
|
||||
|
||||
package core
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/core/types"
|
||||
|
||||
"scroll-tech/common/types/message"
|
||||
|
||||
"scroll-tech/prover/config"
|
||||
)
|
||||
|
||||
// ProverCore sends block-traces to rust-prover through socket and get back the zk-proof.
|
||||
type ProverCore struct {
|
||||
cfg *config.ProverCoreConfig
|
||||
}
|
||||
|
||||
// NewProverCore inits a ProverCore object.
|
||||
func NewProverCore(cfg *config.ProverCoreConfig) (*ProverCore, error) {
|
||||
return &ProverCore{cfg: cfg}, nil
|
||||
}
|
||||
|
||||
// Prove call rust ffi to generate proof, if first failed, try again.
|
||||
func (p *ProverCore) Prove(taskID string, traces []*types.BlockTrace) (*message.AggProof, error) {
|
||||
_empty := common.BigToHash(big.NewInt(0))
|
||||
return &message.AggProof{
|
||||
Proof: _empty[:],
|
||||
Instance: _empty[:],
|
||||
FinalPair: _empty[:],
|
||||
}, nil
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
//go:build !mock_prover
|
||||
|
||||
package prover
|
||||
package core
|
||||
|
||||
/*
|
||||
#cgo LDFLAGS: -lzkp -lm -ldl -lzktrie -L${SRCDIR}/lib/ -Wl,-rpath=${SRCDIR}/lib
|
||||
@@ -21,16 +21,16 @@ import (
|
||||
|
||||
"scroll-tech/common/types/message"
|
||||
|
||||
"scroll-tech/roller/config"
|
||||
"scroll-tech/prover/config"
|
||||
)
|
||||
|
||||
// Prover sends block-traces to rust-prover through ffi and get back the zk-proof.
|
||||
type Prover struct {
|
||||
cfg *config.ProverConfig
|
||||
// ProverCore sends block-traces to rust-prover through ffi and get back the zk-proof.
|
||||
type ProverCore struct {
|
||||
cfg *config.ProverCoreConfig
|
||||
}
|
||||
|
||||
// NewProver inits a Prover object.
|
||||
func NewProver(cfg *config.ProverConfig) (*Prover, error) {
|
||||
// NewProverCore inits a Core object.
|
||||
func NewProverCore(cfg *config.ProverCoreConfig) (*ProverCore, error) {
|
||||
paramsPathStr := C.CString(cfg.ParamsPath)
|
||||
seedPathStr := C.CString(cfg.SeedPath)
|
||||
defer func() {
|
||||
@@ -47,11 +47,11 @@ func NewProver(cfg *config.ProverConfig) (*Prover, error) {
|
||||
log.Info("Enabled dump_proof", "dir", cfg.DumpDir)
|
||||
}
|
||||
|
||||
return &Prover{cfg: cfg}, nil
|
||||
return &ProverCore{cfg: cfg}, nil
|
||||
}
|
||||
|
||||
// Prove call rust ffi to generate proof, if first failed, try again.
|
||||
func (p *Prover) Prove(taskID string, traces []*types.BlockTrace) (*message.AggProof, error) {
|
||||
func (p *ProverCore) Prove(taskID string, traces []*types.BlockTrace) (*message.AggProof, error) {
|
||||
var proofByt []byte
|
||||
if p.cfg.ProofType == message.ProofTypeChunk {
|
||||
tracesByt, err := json.Marshal(traces)
|
||||
@@ -74,7 +74,7 @@ func (p *Prover) Prove(taskID string, traces []*types.BlockTrace) (*message.AggP
|
||||
}
|
||||
|
||||
// Call cgo to generate proof.
|
||||
func (p *Prover) prove(tracesByt []byte) []byte {
|
||||
func (p *ProverCore) prove(tracesByt []byte) []byte {
|
||||
tracesStr := C.CString(string(tracesByt))
|
||||
|
||||
defer func() {
|
||||
@@ -89,7 +89,7 @@ func (p *Prover) prove(tracesByt []byte) []byte {
|
||||
return []byte(proof)
|
||||
}
|
||||
|
||||
func (p *Prover) dumpProof(id string, proofByt []byte) error {
|
||||
func (p *ProverCore) dumpProof(id string, proofByt []byte) error {
|
||||
if p.cfg.DumpDir == "" {
|
||||
return nil
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
//go:build ffi
|
||||
|
||||
package prover_test
|
||||
package core_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
@@ -13,8 +13,8 @@ import (
|
||||
"github.com/scroll-tech/go-ethereum/core/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"scroll-tech/roller/config"
|
||||
"scroll-tech/roller/prover"
|
||||
"scroll-tech/prover/config"
|
||||
"scroll-tech/prover/prover"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -26,7 +26,7 @@ var (
|
||||
|
||||
func TestFFI(t *testing.T) {
|
||||
as := assert.New(t)
|
||||
cfg := &config.ProverConfig{
|
||||
cfg := &config.ProverCoreConfig{
|
||||
ParamsPath: *paramsPath,
|
||||
SeedPath: *seedPath,
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
module scroll-tech/roller
|
||||
module scroll-tech/prover
|
||||
|
||||
go 1.19
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package roller
|
||||
package prover
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -9,12 +9,11 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/core/types"
|
||||
"github.com/scroll-tech/go-ethereum/ethclient"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum"
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/core/types"
|
||||
"github.com/scroll-tech/go-ethereum/crypto"
|
||||
"github.com/scroll-tech/go-ethereum/ethclient"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
|
||||
"scroll-tech/common/types/message"
|
||||
@@ -23,9 +22,9 @@ import (
|
||||
|
||||
"scroll-tech/coordinator/client"
|
||||
|
||||
"scroll-tech/roller/config"
|
||||
"scroll-tech/roller/prover"
|
||||
"scroll-tech/roller/store"
|
||||
"scroll-tech/prover/config"
|
||||
"scroll-tech/prover/core"
|
||||
"scroll-tech/prover/store"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -33,13 +32,13 @@ var (
|
||||
retryWait = time.Second * 10
|
||||
)
|
||||
|
||||
// Roller contains websocket conn to coordinator, Stack, unix-socket to ipc-prover.
|
||||
type Roller struct {
|
||||
// Prover contains websocket conn to coordinator, Stack, unix-socket to ipc-proverCore.
|
||||
type Prover struct {
|
||||
cfg *config.Config
|
||||
client *client.Client
|
||||
traceClient *ethclient.Client
|
||||
stack *store.Stack
|
||||
prover *prover.Prover
|
||||
proverCore *core.ProverCore
|
||||
taskChan chan *message.TaskMsg
|
||||
sub ethereum.Subscription
|
||||
|
||||
@@ -50,8 +49,8 @@ type Roller struct {
|
||||
priv *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
// NewRoller new a Roller object.
|
||||
func NewRoller(cfg *config.Config) (*Roller, error) {
|
||||
// NewProver new a Prover object.
|
||||
func NewProver(cfg *config.Config) (*Prover, error) {
|
||||
// load or create wallet
|
||||
priv, err := utils.LoadOrCreateKey(cfg.KeystorePath, cfg.KeystorePassword)
|
||||
if err != nil {
|
||||
@@ -70,25 +69,25 @@ func NewRoller(cfg *config.Config) (*Roller, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create prover instance
|
||||
log.Info("init prover")
|
||||
newProver, err := prover.NewProver(cfg.Prover)
|
||||
// Create proverCore instance
|
||||
log.Info("init proverCore")
|
||||
proverCore, err := core.NewProverCore(cfg.Core)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Info("init prover successfully!")
|
||||
log.Info("init proverCore successfully!")
|
||||
|
||||
rClient, err := client.Dial(cfg.CoordinatorURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Roller{
|
||||
return &Prover{
|
||||
cfg: cfg,
|
||||
client: rClient,
|
||||
traceClient: traceClient,
|
||||
stack: stackDb,
|
||||
prover: newProver,
|
||||
proverCore: proverCore,
|
||||
sub: nil,
|
||||
taskChan: make(chan *message.TaskMsg, 10),
|
||||
stopChan: make(chan struct{}),
|
||||
@@ -96,18 +95,18 @@ func NewRoller(cfg *config.Config) (*Roller, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Type returns roller type.
|
||||
func (r *Roller) Type() message.ProofType {
|
||||
return r.cfg.Prover.ProofType
|
||||
// Type returns proverCore type.
|
||||
func (r *Prover) Type() message.ProofType {
|
||||
return r.cfg.Core.ProofType
|
||||
}
|
||||
|
||||
// PublicKey translate public key to hex and return.
|
||||
func (r *Roller) PublicKey() string {
|
||||
func (r *Prover) PublicKey() string {
|
||||
return common.Bytes2Hex(crypto.CompressPubkey(&r.priv.PublicKey))
|
||||
}
|
||||
|
||||
// Start runs Roller.
|
||||
func (r *Roller) Start() {
|
||||
// Start runs Prover.
|
||||
func (r *Prover) Start() {
|
||||
log.Info("start to register to coordinator")
|
||||
if err := r.Register(); err != nil {
|
||||
log.Crit("register to coordinator failed", "error", err)
|
||||
@@ -118,12 +117,12 @@ func (r *Roller) Start() {
|
||||
go r.ProveLoop()
|
||||
}
|
||||
|
||||
// Register registers Roller to the coordinator through Websocket.
|
||||
func (r *Roller) Register() error {
|
||||
// Register registers Prover to the coordinator through Websocket.
|
||||
func (r *Prover) Register() error {
|
||||
authMsg := &message.AuthMsg{
|
||||
Identity: &message.Identity{
|
||||
Name: r.cfg.RollerName,
|
||||
RollerType: r.Type(),
|
||||
Name: r.cfg.ProverName,
|
||||
ProverType: r.Type(),
|
||||
Version: version.Version,
|
||||
},
|
||||
}
|
||||
@@ -149,7 +148,7 @@ func (r *Roller) Register() error {
|
||||
}
|
||||
|
||||
// HandleCoordinator accepts block-traces from coordinator through the Websocket and store it into Stack.
|
||||
func (r *Roller) HandleCoordinator() {
|
||||
func (r *Prover) HandleCoordinator() {
|
||||
for {
|
||||
select {
|
||||
case <-r.stopChan:
|
||||
@@ -170,7 +169,7 @@ func (r *Roller) HandleCoordinator() {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Roller) mustRetryCoordinator() {
|
||||
func (r *Prover) mustRetryCoordinator() {
|
||||
atomic.StoreInt64(&r.isDisconnected, 1)
|
||||
defer atomic.StoreInt64(&r.isDisconnected, 0)
|
||||
for {
|
||||
@@ -187,8 +186,8 @@ func (r *Roller) mustRetryCoordinator() {
|
||||
|
||||
}
|
||||
|
||||
// ProveLoop keep popping the block-traces from Stack and sends it to rust-prover for loop.
|
||||
func (r *Roller) ProveLoop() {
|
||||
// ProveLoop keep popping the block-traces from Stack and sends it to rust-proverCore for loop.
|
||||
func (r *Prover) ProveLoop() {
|
||||
for {
|
||||
select {
|
||||
case <-r.stopChan:
|
||||
@@ -206,7 +205,7 @@ func (r *Roller) ProveLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Roller) prove() error {
|
||||
func (r *Prover) prove() error {
|
||||
task, err := r.stack.Peek()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -233,10 +232,10 @@ func (r *Roller) prove() error {
|
||||
}
|
||||
log.Error("get traces failed!", "task-id", task.Task.ID, "err", err)
|
||||
} else {
|
||||
// If FFI panic during Prove, the roller will restart and re-enter prove() function,
|
||||
// If FFI panic during Prove, the proverCore will restart and re-enter prove() function,
|
||||
// the proof will not be submitted.
|
||||
var proof *message.AggProof
|
||||
proof, err = r.prover.Prove(task.Task.ID, traces)
|
||||
proof, err = r.proverCore.Prove(task.Task.ID, traces)
|
||||
if err != nil {
|
||||
proofMsg = &message.ProofDetail{
|
||||
Status: message.StatusProofError,
|
||||
@@ -257,7 +256,7 @@ func (r *Roller) prove() error {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// when the roller has more than 3 times panic,
|
||||
// when the prover has more than 3 times panic,
|
||||
// it will omit to prove the task, submit StatusProofError and then Delete the task.
|
||||
proofMsg = &message.ProofDetail{
|
||||
Status: message.StatusProofError,
|
||||
@@ -271,7 +270,7 @@ func (r *Roller) prove() error {
|
||||
defer func() {
|
||||
err = r.stack.Delete(task.Task.ID)
|
||||
if err != nil {
|
||||
log.Error("roller stack pop failed!", "err", err)
|
||||
log.Error("prover stack pop failed!", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -279,7 +278,7 @@ func (r *Roller) prove() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Roller) signAndSubmitProof(msg *message.ProofDetail) {
|
||||
func (r *Prover) signAndSubmitProof(msg *message.ProofDetail) {
|
||||
authZkProof := &message.ProofMsg{ProofDetail: msg}
|
||||
if err := authZkProof.Sign(r.priv); err != nil {
|
||||
log.Error("sign proof error", "err", err)
|
||||
@@ -288,8 +287,8 @@ func (r *Roller) signAndSubmitProof(msg *message.ProofDetail) {
|
||||
|
||||
// Retry SubmitProof several times.
|
||||
for i := 0; i < 3; i++ {
|
||||
// When the roller is disconnected from the coordinator,
|
||||
// wait until the roller reconnects to the coordinator.
|
||||
// When the prover is disconnected from the coordinator,
|
||||
// wait until the prover reconnects to the coordinator.
|
||||
for atomic.LoadInt64(&r.isDisconnected) == 1 {
|
||||
time.Sleep(retryWait)
|
||||
}
|
||||
@@ -301,7 +300,7 @@ func (r *Roller) signAndSubmitProof(msg *message.ProofDetail) {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Roller) getSortedTracesByHashes(blockHashes []common.Hash) ([]*types.BlockTrace, error) {
|
||||
func (r *Prover) getSortedTracesByHashes(blockHashes []common.Hash) ([]*types.BlockTrace, error) {
|
||||
var traces []*types.BlockTrace
|
||||
for _, blockHash := range blockHashes {
|
||||
trace, err := r.traceClient.GetBlockTraceByHash(context.Background(), blockHash)
|
||||
@@ -319,7 +318,7 @@ func (r *Roller) getSortedTracesByHashes(blockHashes []common.Hash) ([]*types.Bl
|
||||
}
|
||||
|
||||
// Stop closes the websocket connection.
|
||||
func (r *Roller) Stop() {
|
||||
func (r *Prover) Stop() {
|
||||
if atomic.LoadInt64(&r.isClosed) == 1 {
|
||||
return
|
||||
}
|
||||
@@ -24,7 +24,7 @@ type Stack struct {
|
||||
// It contains TaskMsg and proved times.
|
||||
type ProvingTask struct {
|
||||
Task *message.TaskMsg `json:"task"`
|
||||
// Times is how many times roller proved.
|
||||
// Times is how many times prover proved.
|
||||
Times int `json:"times"`
|
||||
}
|
||||
|
||||
@@ -89,7 +89,7 @@ func (s *Stack) Delete(taskID string) error {
|
||||
})
|
||||
}
|
||||
|
||||
// UpdateTimes udpates the roller prove times of the proving task.
|
||||
// UpdateTimes udpates the prover prove times of the proving task.
|
||||
func (s *Stack) UpdateTimes(task *ProvingTask, udpateTimes int) error {
|
||||
task.Times = udpateTimes
|
||||
byt, err := json.Marshal(task)
|
||||
@@ -1,19 +0,0 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"scroll-tech/common/cmd"
|
||||
"scroll-tech/common/version"
|
||||
)
|
||||
|
||||
func TestRunRoller(t *testing.T) {
|
||||
roller := cmd.NewCmd("roller-test", "--version")
|
||||
defer roller.WaitExit()
|
||||
|
||||
// wait result
|
||||
roller.ExpectWithTimeout(t, true, time.Second*3, fmt.Sprintf("roller version %s", version.Version))
|
||||
roller.RunApp(nil)
|
||||
}
|
||||
@@ -1,34 +0,0 @@
|
||||
//go:build mock_prover
|
||||
|
||||
package prover
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/core/types"
|
||||
|
||||
"scroll-tech/common/types/message"
|
||||
|
||||
"scroll-tech/roller/config"
|
||||
)
|
||||
|
||||
// Prover sends block-traces to rust-prover through socket and get back the zk-proof.
|
||||
type Prover struct {
|
||||
cfg *config.ProverConfig
|
||||
}
|
||||
|
||||
// NewProver inits a Prover object.
|
||||
func NewProver(cfg *config.ProverConfig) (*Prover, error) {
|
||||
return &Prover{cfg: cfg}, nil
|
||||
}
|
||||
|
||||
// Prove call rust ffi to generate proof, if first failed, try again.
|
||||
func (p *Prover) Prove(taskID string, traces []*types.BlockTrace) (*message.AggProof, error) {
|
||||
_empty := common.BigToHash(big.NewInt(0))
|
||||
return &message.AggProof{
|
||||
Proof: _empty[:],
|
||||
Instance: _empty[:],
|
||||
FinalPair: _empty[:],
|
||||
}, nil
|
||||
}
|
||||
@@ -16,7 +16,7 @@ import (
|
||||
|
||||
"scroll-tech/common/docker"
|
||||
|
||||
rapp "scroll-tech/roller/cmd/app"
|
||||
rapp "scroll-tech/prover/cmd/app"
|
||||
|
||||
"scroll-tech/database/migrate"
|
||||
|
||||
@@ -27,18 +27,18 @@ var (
|
||||
base *docker.App
|
||||
bridgeApp *bcmd.MockApp
|
||||
coordinatorApp *capp.CoordinatorApp
|
||||
rollerApp *rapp.RollerApp
|
||||
proverApp *rapp.ProverApp
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
base = docker.NewDockerApp()
|
||||
bridgeApp = bcmd.NewBridgeApp(base, "../../bridge/conf/config.json")
|
||||
coordinatorApp = capp.NewCoordinatorApp(base, "../../coordinator/conf/config.json")
|
||||
rollerApp = rapp.NewRollerApp(base, "../../roller/config.json", coordinatorApp.WSEndpoint())
|
||||
proverApp = rapp.NewProverApp(base, "../../prover/conf/config.json", coordinatorApp.WSEndpoint())
|
||||
m.Run()
|
||||
bridgeApp.Free()
|
||||
coordinatorApp.Free()
|
||||
rollerApp.Free()
|
||||
proverApp.Free()
|
||||
base.Free()
|
||||
}
|
||||
|
||||
@@ -50,11 +50,11 @@ func TestStartProcess(t *testing.T) {
|
||||
|
||||
// Run coordinator app.
|
||||
coordinatorApp.RunApp(t)
|
||||
// Run roller app.
|
||||
rollerApp.RunApp(t)
|
||||
// Run prover app.
|
||||
proverApp.RunApp(t)
|
||||
|
||||
// Free apps.
|
||||
rollerApp.WaitExit()
|
||||
proverApp.WaitExit()
|
||||
coordinatorApp.WaitExit()
|
||||
}
|
||||
|
||||
@@ -80,7 +80,7 @@ func TestMonitorMetrics(t *testing.T) {
|
||||
bodyStr := string(body)
|
||||
assert.Equal(t, 200, resp.StatusCode)
|
||||
assert.Equal(t, true, strings.Contains(bodyStr, "coordinator_sessions_timeout_total"))
|
||||
assert.Equal(t, true, strings.Contains(bodyStr, "coordinator_rollers_disconnects_total"))
|
||||
assert.Equal(t, true, strings.Contains(bodyStr, "coordinator_provers_disconnects_total"))
|
||||
|
||||
// Exit.
|
||||
coordinatorApp.WaitExit()
|
||||
|
||||
Reference in New Issue
Block a user