mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-04-23 03:00:50 -04:00
Compare commits
72 Commits
feat/ai_he
...
coordinato
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6783b411a | ||
|
|
409bfb8b26 | ||
|
|
47c7007b64 | ||
|
|
dffab6783b | ||
|
|
21ceaf6056 | ||
|
|
feeac8f57d | ||
|
|
b1094073bb | ||
|
|
3a871717db | ||
|
|
2c14342df3 | ||
|
|
14e2633ba3 | ||
|
|
21326c25e6 | ||
|
|
9c2bc02f64 | ||
|
|
9e5579c4cb | ||
|
|
ac4a72003c | ||
|
|
19447984bd | ||
|
|
d66d705456 | ||
|
|
c938d6c25e | ||
|
|
cf9e3680c0 | ||
|
|
e9470ff7a5 | ||
|
|
51b1e79b31 | ||
|
|
c22d9ecad1 | ||
|
|
e7551650b2 | ||
|
|
20fde41be8 | ||
|
|
4df1dd8acd | ||
|
|
6696aac16a | ||
|
|
4b79e63c9b | ||
|
|
ac0396db3c | ||
|
|
17e6c5b7ac | ||
|
|
b6e33456fa | ||
|
|
7572bf8923 | ||
|
|
5d41788b07 | ||
|
|
8f8a537fba | ||
|
|
b1c3a4ecc0 | ||
|
|
d9a29cddce | ||
|
|
c992157eb4 | ||
|
|
404c664e10 | ||
|
|
8a15836d20 | ||
|
|
4365aafa9a | ||
|
|
6ee026fa16 | ||
|
|
c79ad57fb7 | ||
|
|
fa5b113248 | ||
|
|
884b050866 | ||
|
|
1d9fa41535 | ||
|
|
b7f23c6734 | ||
|
|
057e22072c | ||
|
|
c7b83a0784 | ||
|
|
92ca7a6b76 | ||
|
|
256c90af6f | ||
|
|
50f3e1a97c | ||
|
|
2721503657 | ||
|
|
a04b64df03 | ||
|
|
78dbe6cde1 | ||
|
|
9df6429d98 | ||
|
|
e6be62f633 | ||
|
|
c72ee5d679 | ||
|
|
4725d8a73c | ||
|
|
322766f54f | ||
|
|
5614ec3b86 | ||
|
|
5a07a1652b | ||
|
|
64ef0f4ec0 | ||
|
|
321dd43af8 | ||
|
|
624a7a29b8 | ||
|
|
4f878d9231 | ||
|
|
7b3a65b35b | ||
|
|
0d238d77a6 | ||
|
|
76ecdf064a | ||
|
|
5c6c225f76 | ||
|
|
3adb2e0a1b | ||
|
|
412ad56a64 | ||
|
|
9796d16f6c | ||
|
|
1f2b857671 | ||
|
|
5dbb5c5fb7 |
@@ -1,16 +0,0 @@
|
||||
{
|
||||
"$schema": "https://json.schemastore.org/claude-code-settings.json",
|
||||
"env": {},
|
||||
"companyAnnouncements": [
|
||||
"Welcome! Here is scroll-tech",
|
||||
"Just ask me about what can help"
|
||||
],
|
||||
"permissions": {
|
||||
"allow": [
|
||||
"Bash(pwd)",
|
||||
"Bash(ls *)",
|
||||
"Bash(cat *)"
|
||||
],
|
||||
"deny": []
|
||||
}
|
||||
}
|
||||
@@ -1,36 +0,0 @@
|
||||
---
|
||||
name: db-query
|
||||
description: Do query from database for common task
|
||||
model: sonnet
|
||||
allowed-tools: Bash(psql *)
|
||||
---
|
||||
|
||||
User could like to know about the status of L2 data blocks and proving task, following is their request:
|
||||
|
||||
$ARGUMENTS
|
||||
|
||||
(If you find there is nothing in the request above, just tell "nothing to do" and stop)
|
||||
|
||||
You should have known the data sheme of our database, if not yet, read it from the `.sql` files under `database/migrate/migrations`.
|
||||
|
||||
According to use's request, generate the corresponding SQL expression and query the database. For example, if user ask "list the assigned chunks", it means "query records from `chunk` table with proving_status=2 (assigned)", or the SQL expression 'SELECT * from chunk where proving_status=2;'. If it is not clear, you can ask user which col they are indicating to, and list some possible options.
|
||||
|
||||
For the generated SQL, following rules MUST be obey:
|
||||
|
||||
+ Limit the number of records to 20, unless user has a specification explicitly like "show me ALL chunks".
|
||||
+ Following cols can not be read by human and contain very large texts, they MUST be excluded in the SQL expression:
|
||||
+ For all table, any col named "proof"
|
||||
+ "header" and "transactions" in `l2_block` table
|
||||
+ "calldata" in `l1_message`
|
||||
+ Always omit the `deleted_at` col, never include them in query or use in where condition
|
||||
+ Without explicit specification, the records should be ordered by the `updated_at` col, the most recent one first.
|
||||
|
||||
When you has decided the SQL expression, always print it out.
|
||||
|
||||
You use psql client to query from our PostgreSQL db. When launching psql, always with "-w" options, and use "-o" to send all ouput to `query_report.txt` file under system's temporary dir, like /tmp. You MUST NOT read the generated report.
|
||||
|
||||
If the psql failed since authentication, remind user to prepare their `.pgpass` file under home dir.
|
||||
|
||||
You should have known the endpoint of the database before, in the form of PostgreSQL DSN. If not, try to read it from the `db.dsn` field inside of `coordinator/build/bin/conf/config.json`. If still not able to get the data, ask via Ask User Question to get the endpoint.
|
||||
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
## Notes for handling ProverE2E
|
||||
|
||||
+ Ensure the `conf` dir has been correctly linked and remind user which path it currently links to.
|
||||
|
||||
+ If some files are instructed to be generated, but they have been existed, NEVER refer the content before the generation. They may be left from different setup and contain wrong message for current process.
|
||||
|
||||
+ In step 4, if the `l2.validium_mode` is set to true, MUST Ask User for decryption key to fill the `sequencer.decryption_key` field. The key must be a hex string WITHOUT "0x" prefix.
|
||||
@@ -1,62 +0,0 @@
|
||||
---
|
||||
name: integration-test-helper
|
||||
description: Assist with the process described in the specified directory to prepare or advance integration tests. The target directory and instruction section can be specified, like "tests/prover-e2e test".
|
||||
model: sonnet
|
||||
allowed-tools: Bash(make *), Bash(tee *), Bash(jq *)
|
||||
---
|
||||
|
||||
This skill helps launching the full process described in the instructions, also investigate and report the results.
|
||||
|
||||
## Target directory
|
||||
|
||||
The **target directory** under which the setup process being run is: $ARGUMENTS[0].
|
||||
Under the target dir there are the stuff and instructions. If the target dir above is empty, just use !`pwd`.
|
||||
|
||||
## Instructions
|
||||
|
||||
First read `README.md` under target directory, instructions should be under heading named ($ARGUMENTS[1]). If there is no such a heading name, just try the "Test" heading.
|
||||
|
||||
In additional, there are two optional places for more knowledge about current instructions:
|
||||
|
||||
+ An .md file under current skill dir, named from the top header of the `README.md` file or the name of target directory.
|
||||
For example, if the target dir is `tests/prover-e2e`, the top header in `README.md` has "ProverE2E", so there may be a .md file named as `prover-e2e.md` or `ProverE2E.md`
|
||||
|
||||
+ All files under `experience` path (if it existed) of target dir contains additional experience, which is specialized for current host
|
||||
|
||||
## Run each step listed in instructions
|
||||
|
||||
The instructions often contain multiple steps which should be completed in sequence. Following are some rules MUST be obey while handling each step:
|
||||
|
||||
### "Must do" while executing commands in steps
|
||||
|
||||
Any command mentioned in steps should be executed by Bash tool, with following MUST DO for handling the outputs:
|
||||
|
||||
+ Use "| tee <log_file>" to capture output of bash tool into local file for investigating later. The file name of log should be in format as `<desc_of_ccommand>_<day>_<time>.log`
|
||||
+ Do not read all output, after "| tee", use "|tail -n 50" to only catch the possible error message. That should be enough for common case.
|
||||
|
||||
It may need to jump to other directories for executing a step. We MUST go back to target directory after every step has been completed. Also, DO NOT change anything outside of target directy.
|
||||
|
||||
### When error raised
|
||||
Command execution should get success return. If error raised while executing, do following process:
|
||||
|
||||
1. Try to analysis the reason of error, first from the caught error message. If there is no enough data, grep useful information from the log file of whole output just captured.
|
||||
|
||||
2. Ask User for next action, options are:
|
||||
+ Retry with resolution derived from error analyst
|
||||
+ Retry, with user provide tips to resolve the issue
|
||||
+ Just retry, user has resolved the issue by theirself
|
||||
+ Stop here, discard current and following steps, do after completion
|
||||
|
||||
Error often caused by some mismacthing of configruation in current host. Here are some tips which may help:
|
||||
|
||||
* Install the missing tools / libs via packet manager
|
||||
* Fix the typo, or complete missed fields in configuration files
|
||||
* Copy missed files, it may be just put in some place of the project or can be downloaded according to some documents.
|
||||
|
||||
## After completion
|
||||
|
||||
When every step has done, or the process stop by user, make following materials before stop:
|
||||
|
||||
+ Package all log files generated before into a tarball and save it in tempoaray path. Then clear all log files.
|
||||
+ Generate a report file under target directory, with file name like `report_<day>_<time>.txt`.
|
||||
+ For steps once failed and being resolved later, record the resolution into a file under `experience` path in target dir.
|
||||
43
.github/workflows/docker.yml
vendored
43
.github/workflows/docker.yml
vendored
@@ -360,6 +360,49 @@ jobs:
|
||||
scrolltech/${{ env.REPOSITORY }}:${{ env.IMAGE_TAG }}
|
||||
${{ env.ECR_REGISTRY }}/${{ env.REPOSITORY }}:${{ env.IMAGE_TAG }}
|
||||
|
||||
coordinator-proxy:
|
||||
runs-on:
|
||||
group: scroll-reth-runner-group
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v2
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v2
|
||||
- name: Login to Docker Hub
|
||||
uses: docker/login-action@v2
|
||||
with:
|
||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||
- name: Configure AWS credentials
|
||||
uses: aws-actions/configure-aws-credentials@v4
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
aws-region: ${{ env.AWS_REGION }}
|
||||
- name: Login to Amazon ECR
|
||||
id: login-ecr
|
||||
uses: aws-actions/amazon-ecr-login@v2
|
||||
- name: check repo and create it if not exist
|
||||
env:
|
||||
REPOSITORY: coordinator-proxy
|
||||
run: |
|
||||
aws --region ${{ env.AWS_REGION }} ecr describe-repositories --repository-names ${{ env.REPOSITORY }} && : || aws --region ${{ env.AWS_REGION }} ecr create-repository --repository-name ${{ env.REPOSITORY }}
|
||||
- name: Build and push
|
||||
uses: docker/build-push-action@v3
|
||||
env:
|
||||
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
|
||||
REPOSITORY: coordinator-proxy
|
||||
IMAGE_TAG: ${{ github.ref_name }}
|
||||
with:
|
||||
context: .
|
||||
file: ./build/dockerfiles/coordinator-proxy.Dockerfile
|
||||
push: true
|
||||
tags: |
|
||||
scrolltech/${{ env.REPOSITORY }}:${{ env.IMAGE_TAG }}
|
||||
${{ env.ECR_REGISTRY }}/${{ env.REPOSITORY }}:${{ env.IMAGE_TAG }}
|
||||
|
||||
coordinator-cron:
|
||||
runs-on:
|
||||
group: scroll-reth-runner-group
|
||||
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -23,8 +23,5 @@ coverage.txt
|
||||
sftp-config.json
|
||||
*~
|
||||
|
||||
# AI skills
|
||||
**/experience
|
||||
|
||||
target
|
||||
zkvm-prover/config.json
|
||||
@@ -1,7 +0,0 @@
|
||||
The mono repo for scroll-tech's services. See @README.md to know about the project.
|
||||
|
||||
Skills has been set to help some process being handled easily. When asked by "what can you help", list following skills, along with the skill-description and invoke cost estimation here:
|
||||
|
||||
1. `db-query`: ~$0.1 per query
|
||||
2. `integration-test-helper` Now ready for following target:
|
||||
+ `tests/prover-e2e`: ~$1.0 per process
|
||||
26
build/dockerfiles/coordinator-proxy.Dockerfile
Normal file
26
build/dockerfiles/coordinator-proxy.Dockerfile
Normal file
@@ -0,0 +1,26 @@
|
||||
# Download Go dependencies
|
||||
FROM scrolltech/go-rust-builder:go-1.22.12-rust-nightly-2025-02-14 as base
|
||||
WORKDIR /src
|
||||
COPY go.work* ./
|
||||
COPY ./rollup/go.* ./rollup/
|
||||
COPY ./common/go.* ./common/
|
||||
COPY ./coordinator/go.* ./coordinator/
|
||||
COPY ./database/go.* ./database/
|
||||
COPY ./tests/integration-test/go.* ./tests/integration-test/
|
||||
COPY ./bridge-history-api/go.* ./bridge-history-api/
|
||||
RUN go mod download -x
|
||||
|
||||
|
||||
# Build coordinator proxy
|
||||
FROM base as builder
|
||||
COPY . .
|
||||
RUN cd ./coordinator && CGO_LDFLAGS="-Wl,--no-as-needed -ldl" make coordinator_proxy && mv ./build/bin/coordinator_proxy /bin/coordinator_proxy
|
||||
|
||||
# Pull coordinator proxy into a second stage deploy ubuntu container
|
||||
FROM ubuntu:20.04
|
||||
ENV CGO_LDFLAGS="-Wl,--no-as-needed -ldl"
|
||||
RUN apt update && apt install vim netcat-openbsd net-tools curl jq -y
|
||||
COPY --from=builder /bin/coordinator_proxy /bin/
|
||||
RUN /bin/coordinator_proxy --version
|
||||
WORKDIR /app
|
||||
ENTRYPOINT ["/bin/coordinator_proxy"]
|
||||
@@ -0,0 +1,8 @@
|
||||
assets/
|
||||
contracts/
|
||||
docs/
|
||||
l2geth/
|
||||
rpc-gateway/
|
||||
*target/*
|
||||
|
||||
permissionless-batches/conf/
|
||||
@@ -12,6 +12,7 @@ require (
|
||||
github.com/gin-gonic/gin v1.9.1
|
||||
github.com/mattn/go-colorable v0.1.13
|
||||
github.com/mattn/go-isatty v0.0.20
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/modern-go/reflect2 v1.0.2
|
||||
github.com/orcaman/concurrent-map v1.0.0
|
||||
github.com/prometheus/client_golang v1.19.0
|
||||
@@ -147,7 +148,6 @@ require (
|
||||
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect
|
||||
github.com/miekg/pkcs11 v1.1.1 // indirect
|
||||
github.com/mitchellh/copystructure v1.2.0 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/mitchellh/pointerstructure v1.2.0 // indirect
|
||||
github.com/mitchellh/reflectwalk v1.0.2 // indirect
|
||||
github.com/mmcloughlin/addchain v0.4.0 // indirect
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/urfave/cli/v2"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"scroll-tech/common/types"
|
||||
"scroll-tech/common/utils"
|
||||
)
|
||||
|
||||
@@ -33,9 +34,17 @@ func Server(c *cli.Context, db *gorm.DB) {
|
||||
promhttp.Handler().ServeHTTP(context.Writer, context.Request)
|
||||
})
|
||||
|
||||
probeController := NewProbesController(db)
|
||||
r.GET("/health", probeController.HealthCheck)
|
||||
r.GET("/ready", probeController.Ready)
|
||||
if db != nil {
|
||||
probeController := NewProbesController(db)
|
||||
r.GET("/health", probeController.HealthCheck)
|
||||
r.GET("/ready", probeController.Ready)
|
||||
} else {
|
||||
dummyOk := func(context *gin.Context) {
|
||||
types.RenderSuccess(context, nil)
|
||||
}
|
||||
r.GET("/health", dummyOk)
|
||||
r.GET("/ready", dummyOk)
|
||||
}
|
||||
|
||||
address := fmt.Sprintf(":%s", c.String(utils.MetricsPort.Name))
|
||||
server := &http.Server{
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
)
|
||||
|
||||
// Response the response schema
|
||||
@@ -13,6 +14,19 @@ type Response struct {
|
||||
Data interface{} `json:"data"`
|
||||
}
|
||||
|
||||
func (resp *Response) DecodeData(out interface{}) error {
|
||||
// Decode generically unmarshaled JSON (map[string]any, []any) into a typed struct
|
||||
// honoring `json` tags and allowing weak type conversions.
|
||||
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
|
||||
TagName: "json",
|
||||
Result: out,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return dec.Decode(resp.Data)
|
||||
}
|
||||
|
||||
// RenderJSON renders response with json
|
||||
func RenderJSON(ctx *gin.Context, errCode int, err error, data interface{}) {
|
||||
var errMsg string
|
||||
|
||||
@@ -34,7 +34,11 @@ coordinator_cron:
|
||||
coordinator_tool:
|
||||
go build -ldflags "-X scroll-tech/common/version.ZkVersion=${ZK_VERSION}" -o $(PWD)/build/bin/coordinator_tool ./cmd/tool
|
||||
|
||||
localsetup: libzkp coordinator_api ## Local setup: build coordinator_api, copy config, and setup releases
|
||||
coordinator_proxy:
|
||||
go build -ldflags "-X scroll-tech/common/version.ZkVersion=${ZK_VERSION}" -tags="mock_prover mock_verifier" -o $(PWD)/build/bin/coordinator_proxy ./cmd/proxy
|
||||
|
||||
|
||||
localsetup: coordinator_api ## Local setup: build coordinator_api, copy config, and setup releases
|
||||
mkdir -p build/bin/conf
|
||||
@echo "Copying configuration files..."
|
||||
@if [ -f "$(PWD)/conf/config.template.json" ]; then \
|
||||
|
||||
122
coordinator/cmd/proxy/app/app.go
Normal file
122
coordinator/cmd/proxy/app/app.go
Normal file
@@ -0,0 +1,122 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"github.com/urfave/cli/v2"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"scroll-tech/common/database"
|
||||
"scroll-tech/common/observability"
|
||||
"scroll-tech/common/utils"
|
||||
"scroll-tech/common/version"
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/controller/proxy"
|
||||
"scroll-tech/coordinator/internal/route"
|
||||
)
|
||||
|
||||
var app *cli.App
|
||||
|
||||
func init() {
|
||||
// Set up coordinator app info.
|
||||
app = cli.NewApp()
|
||||
app.Action = action
|
||||
app.Name = "coordinator proxy"
|
||||
app.Usage = "Proxy for multiple Scroll L2 Coordinators"
|
||||
app.Version = version.Version
|
||||
app.Flags = append(app.Flags, utils.CommonFlags...)
|
||||
app.Flags = append(app.Flags, apiFlags...)
|
||||
app.Before = func(ctx *cli.Context) error {
|
||||
return utils.LogSetup(ctx)
|
||||
}
|
||||
// Register `coordinator-test` app for integration-test.
|
||||
utils.RegisterSimulation(app, utils.CoordinatorAPIApp)
|
||||
}
|
||||
|
||||
func action(ctx *cli.Context) error {
|
||||
cfgFile := ctx.String(utils.ConfigFileFlag.Name)
|
||||
cfg, err := config.NewProxyConfig(cfgFile)
|
||||
if err != nil {
|
||||
log.Crit("failed to load config file", "config file", cfgFile, "error", err)
|
||||
}
|
||||
|
||||
var db *gorm.DB
|
||||
if dbCfg := cfg.ProxyManager.DB; dbCfg != nil {
|
||||
log.Info("Apply persistent storage")
|
||||
db, err = database.InitDB(cfg.ProxyManager.DB)
|
||||
if err != nil {
|
||||
log.Crit("failed to init db connection", "err", err)
|
||||
}
|
||||
defer func() {
|
||||
if err = database.CloseDB(db); err != nil {
|
||||
log.Error("can not close db connection", "error", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
registry := prometheus.DefaultRegisterer
|
||||
observability.Server(ctx, db)
|
||||
|
||||
apiSrv := server(ctx, cfg, db, registry)
|
||||
|
||||
log.Info(
|
||||
"Start coordinator api successfully.",
|
||||
"version", version.Version,
|
||||
)
|
||||
|
||||
// Catch CTRL-C to ensure a graceful shutdown.
|
||||
interrupt := make(chan os.Signal, 1)
|
||||
signal.Notify(interrupt, os.Interrupt)
|
||||
|
||||
// Wait until the interrupt signal is received from an OS signal.
|
||||
<-interrupt
|
||||
log.Info("start shutdown coordinator proxy server ...")
|
||||
|
||||
closeCtx, cancelExit := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancelExit()
|
||||
if err = apiSrv.Shutdown(closeCtx); err != nil {
|
||||
log.Warn("shutdown coordinator proxy server failure", "error", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
<-closeCtx.Done()
|
||||
log.Info("coordinator proxy server exiting success")
|
||||
return nil
|
||||
}
|
||||
|
||||
func server(ctx *cli.Context, cfg *config.ProxyConfig, db *gorm.DB, reg prometheus.Registerer) *http.Server {
|
||||
router := gin.New()
|
||||
proxy.InitController(cfg, db, reg)
|
||||
route.ProxyRoute(router, cfg, reg)
|
||||
port := ctx.String(httpPortFlag.Name)
|
||||
srv := &http.Server{
|
||||
Addr: fmt.Sprintf(":%s", port),
|
||||
Handler: router,
|
||||
ReadHeaderTimeout: time.Minute,
|
||||
}
|
||||
|
||||
go func() {
|
||||
if runServerErr := srv.ListenAndServe(); runServerErr != nil && !errors.Is(runServerErr, http.ErrServerClosed) {
|
||||
log.Crit("run coordinator proxy http server failure", "error", runServerErr)
|
||||
}
|
||||
}()
|
||||
return srv
|
||||
}
|
||||
|
||||
// Run coordinator.
|
||||
func Run() {
|
||||
// RunApp the coordinator.
|
||||
if err := app.Run(os.Args); err != nil {
|
||||
_, _ = fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
30
coordinator/cmd/proxy/app/flags.go
Normal file
30
coordinator/cmd/proxy/app/flags.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package app
|
||||
|
||||
import "github.com/urfave/cli/v2"
|
||||
|
||||
var (
|
||||
apiFlags = []cli.Flag{
|
||||
// http flags
|
||||
&httpEnabledFlag,
|
||||
&httpListenAddrFlag,
|
||||
&httpPortFlag,
|
||||
}
|
||||
// httpEnabledFlag enable rpc server.
|
||||
httpEnabledFlag = cli.BoolFlag{
|
||||
Name: "http",
|
||||
Usage: "Enable the HTTP-RPC server",
|
||||
Value: false,
|
||||
}
|
||||
// httpListenAddrFlag set the http address.
|
||||
httpListenAddrFlag = cli.StringFlag{
|
||||
Name: "http.addr",
|
||||
Usage: "HTTP-RPC server listening interface",
|
||||
Value: "localhost",
|
||||
}
|
||||
// httpPortFlag set http.port.
|
||||
httpPortFlag = cli.IntFlag{
|
||||
Name: "http.port",
|
||||
Usage: "HTTP-RPC server listening port",
|
||||
Value: 8590,
|
||||
}
|
||||
)
|
||||
7
coordinator/cmd/proxy/main.go
Normal file
7
coordinator/cmd/proxy/main.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package main
|
||||
|
||||
import "scroll-tech/coordinator/cmd/proxy/app"
|
||||
|
||||
func main() {
|
||||
app.Run()
|
||||
}
|
||||
31
coordinator/conf/config_proxy.json
Normal file
31
coordinator/conf/config_proxy.json
Normal file
@@ -0,0 +1,31 @@
|
||||
{
|
||||
"proxy_manager": {
|
||||
"proxy_cli": {
|
||||
"proxy_name": "proxy_name",
|
||||
"secret": "client private key"
|
||||
},
|
||||
"auth": {
|
||||
"secret": "proxy secret key",
|
||||
"challenge_expire_duration_sec": 3600,
|
||||
"login_expire_duration_sec": 3600
|
||||
},
|
||||
"verifier": {
|
||||
"min_prover_version": "v4.4.45",
|
||||
"verifiers": []
|
||||
},
|
||||
"db": {
|
||||
"driver_name": "postgres",
|
||||
"dsn": "postgres://localhost/scroll?sslmode=disable",
|
||||
"maxOpenNum": 200,
|
||||
"maxIdleNum": 20
|
||||
}
|
||||
},
|
||||
"coordinators": {
|
||||
"sepolia": {
|
||||
"base_url": "http://localhost:8555",
|
||||
"retry_count": 10,
|
||||
"retry_wait_time_sec": 10,
|
||||
"connection_timeout_sec": 30
|
||||
}
|
||||
}
|
||||
}
|
||||
74
coordinator/internal/config/proxy_config.go
Normal file
74
coordinator/internal/config/proxy_config.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"scroll-tech/common/database"
|
||||
"scroll-tech/common/utils"
|
||||
)
|
||||
|
||||
// Proxy loads proxy configuration items.
|
||||
type ProxyManager struct {
|
||||
// Zk verifier config help to confine the connected prover.
|
||||
Verifier *VerifierConfig `json:"verifier"`
|
||||
Client *ProxyClient `json:"proxy_cli"`
|
||||
Auth *Auth `json:"auth"`
|
||||
DB *database.Config `json:"db,omitempty"`
|
||||
}
|
||||
|
||||
func (m *ProxyManager) Normalize() {
|
||||
if m.Client.Secret == "" {
|
||||
m.Client.Secret = m.Auth.Secret
|
||||
}
|
||||
|
||||
if m.Client.ProxyVersion == "" {
|
||||
m.Client.ProxyVersion = m.Verifier.MinProverVersion
|
||||
}
|
||||
}
|
||||
|
||||
// Proxy client configuration for connect to upstream as a client
|
||||
type ProxyClient struct {
|
||||
ProxyName string `json:"proxy_name"`
|
||||
ProxyVersion string `json:"proxy_version,omitempty"`
|
||||
Secret string `json:"secret,omitempty"`
|
||||
}
|
||||
|
||||
// Coordinator configuration
|
||||
type UpStream struct {
|
||||
BaseUrl string `json:"base_url"`
|
||||
RetryCount uint `json:"retry_count"`
|
||||
RetryWaitTime uint `json:"retry_wait_time_sec"`
|
||||
ConnectionTimeoutSec uint `json:"connection_timeout_sec"`
|
||||
CompatibileMode bool `json:"compatible_mode,omitempty"`
|
||||
}
|
||||
|
||||
// Config load configuration items.
|
||||
type ProxyConfig struct {
|
||||
ProxyManager *ProxyManager `json:"proxy_manager"`
|
||||
ProxyName string `json:"proxy_name"`
|
||||
Coordinators map[string]*UpStream `json:"coordinators"`
|
||||
}
|
||||
|
||||
// NewConfig returns a new instance of Config.
|
||||
func NewProxyConfig(file string) (*ProxyConfig, error) {
|
||||
buf, err := os.ReadFile(filepath.Clean(file))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg := &ProxyConfig{}
|
||||
err = json.Unmarshal(buf, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Override config with environment variables
|
||||
err = utils.OverrideConfigWithEnv(cfg, "SCROLL_COORDINATOR_PROXY")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
@@ -19,28 +19,56 @@ type AuthController struct {
|
||||
loginLogic *auth.LoginLogic
|
||||
}
|
||||
|
||||
// NewAuthController returns an LoginController instance
|
||||
func NewAuthController(db *gorm.DB, cfg *config.Config, vf *verifier.Verifier) *AuthController {
|
||||
func NewAuthControllerWithLogic(loginLogic *auth.LoginLogic) *AuthController {
|
||||
return &AuthController{
|
||||
loginLogic: auth.NewLoginLogic(db, cfg, vf),
|
||||
loginLogic: loginLogic,
|
||||
}
|
||||
}
|
||||
|
||||
// Login the api controller for login
|
||||
// NewAuthController returns an LoginController instance
|
||||
func NewAuthController(db *gorm.DB, cfg *config.Config, vf *verifier.Verifier) *AuthController {
|
||||
return &AuthController{
|
||||
loginLogic: auth.NewLoginLogic(db, cfg.ProverManager.Verifier, vf),
|
||||
}
|
||||
}
|
||||
|
||||
// Login the api controller for login, used as the Authenticator in JWT
|
||||
// It can work in two mode: full process for normal login, or if login request
|
||||
// is posted from proxy, run a simpler process to login a client
|
||||
func (a *AuthController) Login(c *gin.Context) (interface{}, error) {
|
||||
|
||||
// check if the login is post by proxy
|
||||
var viaProxy bool
|
||||
if proverType, proverTypeExist := c.Get(types.ProverProviderTypeKey); proverTypeExist {
|
||||
proverType := uint8(proverType.(float64))
|
||||
viaProxy = proverType == types.ProverProviderTypeProxy
|
||||
}
|
||||
|
||||
var login types.LoginParameter
|
||||
if err := c.ShouldBind(&login); err != nil {
|
||||
return "", fmt.Errorf("missing the public_key, err:%w", err)
|
||||
}
|
||||
|
||||
// check login parameter's token is equal to bearer token, the Authorization must be existed
|
||||
// if not exist, the jwt token will intercept it
|
||||
brearToken := c.GetHeader("Authorization")
|
||||
if brearToken != "Bearer "+login.Message.Challenge {
|
||||
return "", errors.New("check challenge failure for the not equal challenge string")
|
||||
// if not, process with normal login
|
||||
if !viaProxy {
|
||||
// check login parameter's token is equal to bearer token, the Authorization must be existed
|
||||
// if not exist, the jwt token will intercept it
|
||||
brearToken := c.GetHeader("Authorization")
|
||||
if brearToken != "Bearer "+login.Message.Challenge {
|
||||
return "", errors.New("check challenge failure for the not equal challenge string")
|
||||
}
|
||||
|
||||
if err := auth.VerifyMsg(&login); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// check the challenge is used, if used, return failure
|
||||
if err := a.loginLogic.InsertChallengeString(c, login.Message.Challenge); err != nil {
|
||||
return "", fmt.Errorf("login insert challenge string failure:%w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := a.loginLogic.Check(&login); err != nil {
|
||||
if err := a.loginLogic.CompatiblityCheck(&login); err != nil {
|
||||
return "", fmt.Errorf("check the login parameter failure: %w", err)
|
||||
}
|
||||
|
||||
@@ -49,11 +77,6 @@ func (a *AuthController) Login(c *gin.Context) (interface{}, error) {
|
||||
return "", fmt.Errorf("prover hard fork name failure:%w", err)
|
||||
}
|
||||
|
||||
// check the challenge is used, if used, return failure
|
||||
if err := a.loginLogic.InsertChallengeString(c, login.Message.Challenge); err != nil {
|
||||
return "", fmt.Errorf("login insert challenge string failure:%w", err)
|
||||
}
|
||||
|
||||
returnData := types.LoginParameterWithHardForkName{
|
||||
HardForkName: hardForkNames,
|
||||
LoginParameter: login,
|
||||
@@ -85,10 +108,6 @@ func (a *AuthController) IdentityHandler(c *gin.Context) interface{} {
|
||||
c.Set(types.ProverName, proverName)
|
||||
}
|
||||
|
||||
if publicKey, ok := claims[types.PublicKey]; ok {
|
||||
c.Set(types.PublicKey, publicKey)
|
||||
}
|
||||
|
||||
if proverVersion, ok := claims[types.ProverVersion]; ok {
|
||||
c.Set(types.ProverVersion, proverVersion)
|
||||
}
|
||||
@@ -101,5 +120,9 @@ func (a *AuthController) IdentityHandler(c *gin.Context) interface{} {
|
||||
c.Set(types.ProverProviderTypeKey, providerType)
|
||||
}
|
||||
|
||||
if publicKey, ok := claims[types.PublicKey]; ok {
|
||||
return publicKey
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
150
coordinator/internal/controller/proxy/auth.go
Normal file
150
coordinator/internal/controller/proxy/auth.go
Normal file
@@ -0,0 +1,150 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
jwt "github.com/appleboy/gin-jwt/v2"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/controller/api"
|
||||
"scroll-tech/coordinator/internal/logic/auth"
|
||||
"scroll-tech/coordinator/internal/logic/verifier"
|
||||
"scroll-tech/coordinator/internal/types"
|
||||
)
|
||||
|
||||
// AuthController is login API
|
||||
type AuthController struct {
|
||||
apiLogin *api.AuthController
|
||||
clients Clients
|
||||
proverMgr *ProverManager
|
||||
}
|
||||
|
||||
const upstreamConnTimeout = time.Second * 5
|
||||
const LoginParamCache = "login_param"
|
||||
const ProverTypesKey = "prover_types"
|
||||
const SignatureKey = "prover_signature"
|
||||
|
||||
// NewAuthController returns an LoginController instance
|
||||
func NewAuthController(cfg *config.ProxyConfig, clients Clients, proverMgr *ProverManager) *AuthController {
|
||||
|
||||
// use a dummy Verifier to create login logic (we do not use any information in verifier)
|
||||
dummyVf := verifier.Verifier{
|
||||
OpenVMVkMap: make(map[string]struct{}),
|
||||
}
|
||||
loginLogic := auth.NewLoginLogicWithSimpleDeduplicator(cfg.ProxyManager.Verifier, &dummyVf)
|
||||
|
||||
authController := &AuthController{
|
||||
apiLogin: api.NewAuthControllerWithLogic(loginLogic),
|
||||
clients: clients,
|
||||
proverMgr: proverMgr,
|
||||
}
|
||||
|
||||
return authController
|
||||
}
|
||||
|
||||
// Login extended the Login hander in api controller
|
||||
func (a *AuthController) Login(c *gin.Context) (interface{}, error) {
|
||||
|
||||
loginRes, err := a.apiLogin.Login(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
loginParam := loginRes.(types.LoginParameterWithHardForkName)
|
||||
|
||||
if loginParam.LoginParameter.Message.ProverProviderType == types.ProverProviderTypeProxy {
|
||||
return nil, fmt.Errorf("proxy do not support recursive login")
|
||||
}
|
||||
|
||||
session := a.proverMgr.GetOrCreate(loginParam.PublicKey)
|
||||
log.Debug("start handling login", "cli", loginParam.Message.ProverName)
|
||||
|
||||
loginCtx, cf := context.WithTimeout(context.Background(), upstreamConnTimeout)
|
||||
var wg sync.WaitGroup
|
||||
for _, cli := range a.clients {
|
||||
wg.Add(1)
|
||||
go func(cli Client) {
|
||||
defer wg.Done()
|
||||
if err := session.ProxyLogin(loginCtx, cli, &loginParam.LoginParameter); err != nil {
|
||||
log.Error("proxy login failed during token cache update",
|
||||
"userKey", loginParam.PublicKey,
|
||||
"upstream", cli.Name(),
|
||||
"error", err)
|
||||
}
|
||||
}(cli)
|
||||
}
|
||||
go func(cliName string) {
|
||||
wg.Wait()
|
||||
cf()
|
||||
log.Debug("first login attempt has completed", "cli", cliName)
|
||||
}(loginParam.Message.ProverName)
|
||||
|
||||
return loginParam.LoginParameter, nil
|
||||
}
|
||||
|
||||
// PayloadFunc returns jwt.MapClaims with {public key, prover name}.
|
||||
func (a *AuthController) PayloadFunc(data interface{}) jwt.MapClaims {
|
||||
v, ok := data.(types.LoginParameter)
|
||||
if !ok {
|
||||
log.Error("PayloadFunc received unexpected type", "type", fmt.Sprintf("%T", data))
|
||||
return jwt.MapClaims{}
|
||||
}
|
||||
|
||||
return jwt.MapClaims{
|
||||
types.PublicKey: v.PublicKey,
|
||||
types.ProverName: v.Message.ProverName,
|
||||
types.ProverVersion: v.Message.ProverVersion,
|
||||
types.ProverProviderTypeKey: v.Message.ProverProviderType,
|
||||
SignatureKey: v.Signature,
|
||||
ProverTypesKey: v.Message.ProverTypes,
|
||||
}
|
||||
}
|
||||
|
||||
// IdentityHandler replies to client for /login
|
||||
func (a *AuthController) IdentityHandler(c *gin.Context) interface{} {
|
||||
claims := jwt.ExtractClaims(c)
|
||||
loginParam := &types.LoginParameter{}
|
||||
|
||||
if proverName, ok := claims[types.ProverName]; ok {
|
||||
loginParam.Message.ProverName, _ = proverName.(string)
|
||||
}
|
||||
|
||||
if proverVersion, ok := claims[types.ProverVersion]; ok {
|
||||
loginParam.Message.ProverVersion, _ = proverVersion.(string)
|
||||
}
|
||||
|
||||
if providerType, ok := claims[types.ProverProviderTypeKey]; ok {
|
||||
num, _ := providerType.(float64)
|
||||
loginParam.Message.ProverProviderType = types.ProverProviderType(num)
|
||||
}
|
||||
|
||||
if signature, ok := claims[SignatureKey]; ok {
|
||||
loginParam.Signature, _ = signature.(string)
|
||||
}
|
||||
|
||||
if proverTypes, ok := claims[ProverTypesKey]; ok {
|
||||
arr, _ := proverTypes.([]any)
|
||||
for _, elm := range arr {
|
||||
num, _ := elm.(float64)
|
||||
loginParam.Message.ProverTypes = append(loginParam.Message.ProverTypes, types.ProverType(num))
|
||||
}
|
||||
}
|
||||
|
||||
if publicKey, ok := claims[types.PublicKey]; ok {
|
||||
loginParam.PublicKey, _ = publicKey.(string)
|
||||
}
|
||||
|
||||
if loginParam.PublicKey != "" {
|
||||
|
||||
c.Set(LoginParamCache, loginParam)
|
||||
c.Set(types.ProverName, loginParam.Message.ProverName)
|
||||
// publickey will also be set since we have specified public_key as identical key
|
||||
return loginParam.PublicKey
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
246
coordinator/internal/controller/proxy/client.go
Normal file
246
coordinator/internal/controller/proxy/client.go
Normal file
@@ -0,0 +1,246 @@
|
||||
//nolint:errcheck,bodyclose // body is closed in the following handleHttpResp call
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/crypto"
|
||||
|
||||
ctypes "scroll-tech/common/types"
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/types"
|
||||
)
|
||||
|
||||
type ProxyCli interface {
|
||||
Login(ctx context.Context, genLogin func(string) (*types.LoginParameter, error)) (*ctypes.Response, error)
|
||||
ProxyLogin(ctx context.Context, param *types.LoginParameter) (*ctypes.Response, error)
|
||||
Token() string
|
||||
Reset()
|
||||
}
|
||||
|
||||
type ProverCli interface {
|
||||
GetTask(ctx context.Context, param *types.GetTaskParameter) (*ctypes.Response, error)
|
||||
SubmitProof(ctx context.Context, param *types.SubmitProofParameter) (*ctypes.Response, error)
|
||||
}
|
||||
|
||||
// Client wraps an http client with a preset host for coordinator API calls
|
||||
type upClient struct {
|
||||
httpClient *http.Client
|
||||
baseURL string
|
||||
loginToken string
|
||||
compatibileMode bool
|
||||
resetFromMgr func()
|
||||
}
|
||||
|
||||
// NewClient creates a new Client with the specified host
|
||||
func newUpClient(cfg *config.UpStream) *upClient {
|
||||
return &upClient{
|
||||
httpClient: &http.Client{
|
||||
Timeout: time.Duration(cfg.ConnectionTimeoutSec) * time.Second,
|
||||
},
|
||||
baseURL: cfg.BaseUrl,
|
||||
compatibileMode: cfg.CompatibileMode,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *upClient) Reset() {
|
||||
if c.resetFromMgr != nil {
|
||||
c.resetFromMgr()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *upClient) Token() string {
|
||||
return c.loginToken
|
||||
}
|
||||
|
||||
// need a parsable schema definition
|
||||
type loginSchema struct {
|
||||
Time string `json:"time"`
|
||||
Token string `json:"token"`
|
||||
}
|
||||
|
||||
// Login performs the complete login process: get challenge then login
|
||||
func (c *upClient) Login(ctx context.Context, genLogin func(string) (*types.LoginParameter, error)) (*ctypes.Response, error) {
|
||||
// Step 1: Get challenge
|
||||
url := fmt.Sprintf("%s/coordinator/v1/challenge", c.baseURL)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create challenge request: %w", err)
|
||||
}
|
||||
|
||||
challengeResp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get challenge: %w", err)
|
||||
}
|
||||
|
||||
parsedResp, err := handleHttpResp(challengeResp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if parsedResp.ErrCode != 0 {
|
||||
return nil, fmt.Errorf("challenge failed: %d (%s)", parsedResp.ErrCode, parsedResp.ErrMsg)
|
||||
}
|
||||
|
||||
// Ste p2: Parse challenge response
|
||||
var challengeSchema loginSchema
|
||||
if err := parsedResp.DecodeData(&challengeSchema); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse challenge response: %w", err)
|
||||
}
|
||||
|
||||
// Step 3: Use the token from challenge as Bearer token for login
|
||||
url = fmt.Sprintf("%s/coordinator/v1/login", c.baseURL)
|
||||
|
||||
param, err := genLogin(challengeSchema.Token)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to setup login parameter: %w", err)
|
||||
}
|
||||
|
||||
jsonData, err := json.Marshal(param)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal login parameter: %w", err)
|
||||
}
|
||||
|
||||
req, err = http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create login request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Authorization", "Bearer "+challengeSchema.Token)
|
||||
|
||||
loginResp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to perform login request: %w", err)
|
||||
}
|
||||
return handleHttpResp(loginResp)
|
||||
}
|
||||
|
||||
func handleHttpResp(resp *http.Response) (*ctypes.Response, error) {
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusUnauthorized {
|
||||
var respWithData ctypes.Response
|
||||
// Note: Body is consumed after decoding, caller should not read it again
|
||||
if err := json.NewDecoder(resp.Body).Decode(&respWithData); err == nil {
|
||||
return &respWithData, nil
|
||||
} else {
|
||||
return nil, fmt.Errorf("login parsing expected response failed: %v", err)
|
||||
}
|
||||
|
||||
}
|
||||
return nil, fmt.Errorf("login request failed with status: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
func (c *upClient) proxyLoginCompatibleMode(ctx context.Context, param *types.LoginParameter) (*ctypes.Response, error) {
|
||||
mimePrivK, err := buildPrivateKey([]byte(param.PublicKey))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mimePkHex := common.Bytes2Hex(crypto.CompressPubkey(&mimePrivK.PublicKey))
|
||||
|
||||
genLoginParam := func(challenge string) (*types.LoginParameter, error) {
|
||||
|
||||
// Create login parameter with proxy settings
|
||||
loginParam := &types.LoginParameter{
|
||||
Message: param.Message,
|
||||
PublicKey: mimePkHex,
|
||||
}
|
||||
loginParam.Message.Challenge = challenge
|
||||
|
||||
// Sign the message with the private key
|
||||
if err := loginParam.SignWithKey(mimePrivK); err != nil {
|
||||
return nil, fmt.Errorf("failed to sign login parameter: %w", err)
|
||||
}
|
||||
|
||||
return loginParam, nil
|
||||
}
|
||||
|
||||
return c.Login(ctx, genLoginParam)
|
||||
}
|
||||
|
||||
// ProxyLogin makes a POST request to /v1/proxy_login with LoginParameter
|
||||
func (c *upClient) ProxyLogin(ctx context.Context, param *types.LoginParameter) (*ctypes.Response, error) {
|
||||
|
||||
if c.compatibileMode {
|
||||
return c.proxyLoginCompatibleMode(ctx, param)
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("%s/coordinator/v1/proxy_login", c.baseURL)
|
||||
|
||||
jsonData, err := json.Marshal(param)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal proxy login parameter: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create proxy login request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Authorization", "Bearer "+c.loginToken)
|
||||
|
||||
proxyLoginResp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to perform proxy login request: %w", err)
|
||||
}
|
||||
return handleHttpResp(proxyLoginResp)
|
||||
}
|
||||
|
||||
// GetTask makes a POST request to /v1/get_task with GetTaskParameter
|
||||
func (c *upClient) GetTask(ctx context.Context, param *types.GetTaskParameter) (*ctypes.Response, error) {
|
||||
url := fmt.Sprintf("%s/coordinator/v1/get_task", c.baseURL)
|
||||
|
||||
jsonData, err := json.Marshal(param)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal get task parameter: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create get task request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if c.loginToken != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+c.loginToken)
|
||||
}
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return handleHttpResp(resp)
|
||||
}
|
||||
|
||||
// SubmitProof makes a POST request to /v1/submit_proof with SubmitProofParameter
|
||||
func (c *upClient) SubmitProof(ctx context.Context, param *types.SubmitProofParameter) (*ctypes.Response, error) {
|
||||
url := fmt.Sprintf("%s/coordinator/v1/submit_proof", c.baseURL)
|
||||
|
||||
jsonData, err := json.Marshal(param)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal submit proof parameter: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create submit proof request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if c.loginToken != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+c.loginToken)
|
||||
}
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return handleHttpResp(resp)
|
||||
}
|
||||
220
coordinator/internal/controller/proxy/client_manager.go
Normal file
220
coordinator/internal/controller/proxy/client_manager.go
Normal file
@@ -0,0 +1,220 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/crypto"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
|
||||
"scroll-tech/common/version"
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/types"
|
||||
)
|
||||
|
||||
type Client interface {
|
||||
// a client to access upstream coordinator with specified identity
|
||||
// so prover can contact with coordinator as itself
|
||||
Client(string) ProverCli
|
||||
// the client to access upstream as proxy itself
|
||||
ClientAsProxy(context.Context) ProxyCli
|
||||
Name() string
|
||||
}
|
||||
|
||||
type ClientManager struct {
|
||||
name string
|
||||
cliCfg *config.ProxyClient
|
||||
cfg *config.UpStream
|
||||
privKey *ecdsa.PrivateKey
|
||||
|
||||
cachedCli struct {
|
||||
sync.RWMutex
|
||||
cli *upClient
|
||||
completionCtx context.Context
|
||||
}
|
||||
}
|
||||
|
||||
// transformToValidPrivateKey safely transforms arbitrary bytes into valid private key bytes
|
||||
func buildPrivateKey(inputBytes []byte) (*ecdsa.PrivateKey, error) {
|
||||
// Try appending bytes from 0x0 to 0x20 until we get a valid private key
|
||||
for appendByte := byte(0x0); appendByte <= 0x20; appendByte++ {
|
||||
// Append the byte to input
|
||||
extendedBytes := append(inputBytes, appendByte)
|
||||
|
||||
// Calculate 256-bit hash
|
||||
hash := crypto.Keccak256(extendedBytes)
|
||||
|
||||
// Try to create private key from hash
|
||||
if k, err := crypto.ToECDSA(hash); err == nil {
|
||||
return k, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("failed to generate valid private key from input bytes")
|
||||
}
|
||||
|
||||
func NewClientManager(name string, cliCfg *config.ProxyClient, cfg *config.UpStream) (*ClientManager, error) {
|
||||
|
||||
log.Info("init client", "name", name, "upcfg", cfg.BaseUrl, "compatible mode", cfg.CompatibileMode)
|
||||
privKey, err := buildPrivateKey([]byte(cliCfg.Secret))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ClientManager{
|
||||
name: name,
|
||||
privKey: privKey,
|
||||
cfg: cfg,
|
||||
cliCfg: cliCfg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type ctxKeyType string
|
||||
|
||||
const loginCliKey ctxKeyType = "cli"
|
||||
|
||||
func (cliMgr *ClientManager) doLogin(ctx context.Context, loginCli *upClient) {
|
||||
if cliMgr.cfg.CompatibileMode {
|
||||
loginCli.loginToken = "dummy"
|
||||
log.Info("Skip login process for compatible mode")
|
||||
return
|
||||
}
|
||||
|
||||
// Calculate wait time between 2 seconds and cfg.RetryWaitTime
|
||||
minWait := 2 * time.Second
|
||||
waitDuration := time.Duration(cliMgr.cfg.RetryWaitTime) * time.Second
|
||||
if waitDuration < minWait {
|
||||
waitDuration = minWait
|
||||
}
|
||||
|
||||
for {
|
||||
log.Info("proxy attempting login to upstream coordinator", "name", cliMgr.name)
|
||||
loginResp, err := loginCli.Login(ctx, cliMgr.genLoginParam)
|
||||
if err == nil && loginResp.ErrCode == 0 {
|
||||
var loginResult loginSchema
|
||||
err = loginResp.DecodeData(&loginResult)
|
||||
if err != nil {
|
||||
log.Error("login parsing data fail", "error", err)
|
||||
} else {
|
||||
loginCli.loginToken = loginResult.Token
|
||||
log.Info("login to upstream coordinator successful", "name", cliMgr.name, "time", loginResult.Time)
|
||||
// TODO: we need to parse time if we start making use of it
|
||||
return
|
||||
}
|
||||
} else if err != nil {
|
||||
log.Error("login process fail", "error", err)
|
||||
} else {
|
||||
log.Error("login get fail resp", "code", loginResp.ErrCode, "msg", loginResp.ErrMsg)
|
||||
}
|
||||
|
||||
log.Info("login to upstream coordinator failed, retrying", "name", cliMgr.name, "error", err, "waitDuration", waitDuration)
|
||||
|
||||
timer := time.NewTimer(waitDuration)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
case <-timer.C:
|
||||
// Continue to next retry
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cliMgr *ClientManager) Name() string {
|
||||
return cliMgr.name
|
||||
}
|
||||
|
||||
func (cliMgr *ClientManager) Client(token string) ProverCli {
|
||||
loginCli := newUpClient(cliMgr.cfg)
|
||||
loginCli.loginToken = token
|
||||
return loginCli
|
||||
}
|
||||
|
||||
func (cliMgr *ClientManager) ClientAsProxy(ctx context.Context) ProxyCli {
|
||||
cliMgr.cachedCli.RLock()
|
||||
if cliMgr.cachedCli.cli != nil {
|
||||
defer cliMgr.cachedCli.RUnlock()
|
||||
return cliMgr.cachedCli.cli
|
||||
}
|
||||
cliMgr.cachedCli.RUnlock()
|
||||
|
||||
cliMgr.cachedCli.Lock()
|
||||
if cliMgr.cachedCli.cli != nil {
|
||||
defer cliMgr.cachedCli.Unlock()
|
||||
return cliMgr.cachedCli.cli
|
||||
}
|
||||
|
||||
var completionCtx context.Context
|
||||
// Check if completion context is set
|
||||
if cliMgr.cachedCli.completionCtx != nil {
|
||||
completionCtx = cliMgr.cachedCli.completionCtx
|
||||
} else {
|
||||
// Set new completion context and launch login goroutine
|
||||
ctx, completionDone := context.WithCancel(context.Background())
|
||||
loginCli := newUpClient(cliMgr.cfg)
|
||||
loginCli.resetFromMgr = func() {
|
||||
cliMgr.cachedCli.Lock()
|
||||
if cliMgr.cachedCli.cli == loginCli {
|
||||
log.Info("cached client cleared", "name", cliMgr.name)
|
||||
cliMgr.cachedCli.cli = nil
|
||||
}
|
||||
cliMgr.cachedCli.Unlock()
|
||||
}
|
||||
completionCtx = context.WithValue(ctx, loginCliKey, loginCli)
|
||||
cliMgr.cachedCli.completionCtx = completionCtx
|
||||
|
||||
// Launch keep-login goroutine
|
||||
go func() {
|
||||
defer completionDone()
|
||||
cliMgr.doLogin(context.Background(), loginCli)
|
||||
|
||||
cliMgr.cachedCli.Lock()
|
||||
cliMgr.cachedCli.cli = loginCli
|
||||
cliMgr.cachedCli.completionCtx = nil
|
||||
|
||||
cliMgr.cachedCli.Unlock()
|
||||
|
||||
}()
|
||||
}
|
||||
cliMgr.cachedCli.Unlock()
|
||||
|
||||
// Wait for completion or request cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-completionCtx.Done():
|
||||
cli := completionCtx.Value(loginCliKey).(*upClient)
|
||||
return cli
|
||||
}
|
||||
}
|
||||
|
||||
func (cliMgr *ClientManager) genLoginParam(challenge string) (*types.LoginParameter, error) {
|
||||
|
||||
// Generate public key string
|
||||
publicKeyHex := common.Bytes2Hex(crypto.CompressPubkey(&cliMgr.privKey.PublicKey))
|
||||
|
||||
// Create login parameter with proxy settings
|
||||
loginParam := &types.LoginParameter{
|
||||
Message: types.Message{
|
||||
Challenge: challenge,
|
||||
ProverName: cliMgr.cliCfg.ProxyName,
|
||||
ProverVersion: version.Version,
|
||||
ProverProviderType: types.ProverProviderTypeProxy,
|
||||
ProverTypes: []types.ProverType{}, // Default empty
|
||||
VKs: []string{}, // Default empty
|
||||
},
|
||||
PublicKey: publicKeyHex,
|
||||
}
|
||||
|
||||
// Sign the message with the private key
|
||||
if err := loginParam.SignWithKey(cliMgr.privKey); err != nil {
|
||||
return nil, fmt.Errorf("failed to sign login parameter: %w", err)
|
||||
}
|
||||
|
||||
return loginParam, nil
|
||||
}
|
||||
44
coordinator/internal/controller/proxy/controller.go
Normal file
44
coordinator/internal/controller/proxy/controller.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
)
|
||||
|
||||
var (
|
||||
// GetTask the prover task controller
|
||||
GetTask *GetTaskController
|
||||
// SubmitProof the submit proof controller
|
||||
SubmitProof *SubmitProofController
|
||||
// Auth the auth controller
|
||||
Auth *AuthController
|
||||
)
|
||||
|
||||
// Clients manager a series of thread-safe clients for requesting upstream
|
||||
// coordinators
|
||||
type Clients map[string]Client
|
||||
|
||||
// InitController inits Controller with database
|
||||
func InitController(cfg *config.ProxyConfig, db *gorm.DB, reg prometheus.Registerer) {
|
||||
// normalize cfg
|
||||
cfg.ProxyManager.Normalize()
|
||||
|
||||
clients := make(map[string]Client)
|
||||
|
||||
for nm, upCfg := range cfg.Coordinators {
|
||||
cli, err := NewClientManager(nm, cfg.ProxyManager.Client, upCfg)
|
||||
if err != nil {
|
||||
panic("create new client fail")
|
||||
}
|
||||
clients[cli.Name()] = cli
|
||||
}
|
||||
|
||||
proverManager := NewProverManagerWithPersistent(100, db, reg)
|
||||
priorityManager := NewPriorityUpstreamManagerPersistent(db)
|
||||
|
||||
Auth = NewAuthController(cfg, clients, proverManager)
|
||||
GetTask = NewGetTaskController(cfg, clients, proverManager, priorityManager, reg)
|
||||
SubmitProof = NewSubmitProofController(cfg, clients, proverManager, priorityManager, reg)
|
||||
}
|
||||
230
coordinator/internal/controller/proxy/get_task.go
Normal file
230
coordinator/internal/controller/proxy/get_task.go
Normal file
@@ -0,0 +1,230 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"scroll-tech/common/types"
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
coordinatorType "scroll-tech/coordinator/internal/types"
|
||||
)
|
||||
|
||||
func getSessionData(ctx *gin.Context) (string, string) {
|
||||
|
||||
publicKeyData, publicKeyExist := ctx.Get(coordinatorType.PublicKey)
|
||||
publicKey, castOk := publicKeyData.(string)
|
||||
if !publicKeyExist || !castOk {
|
||||
nerr := fmt.Errorf("no public key binding: %v", publicKeyData)
|
||||
log.Warn("get_task parameter fail", "error", nerr)
|
||||
|
||||
types.RenderFailure(ctx, types.ErrCoordinatorParameterInvalidNo, nerr)
|
||||
return "", ""
|
||||
}
|
||||
|
||||
publicNameData, publicNameExist := ctx.Get(coordinatorType.ProverName)
|
||||
publicName, castOk := publicNameData.(string)
|
||||
if !publicNameExist || !castOk {
|
||||
log.Error("no public name binding for unknown reason, but we still forward with name = 'unknown'", "data", publicNameData)
|
||||
publicName = "unknown"
|
||||
}
|
||||
|
||||
return publicKey, publicName
|
||||
}
|
||||
|
||||
// PriorityUpstreamManager manages priority upstream mappings with thread safety
|
||||
type PriorityUpstreamManager struct {
|
||||
sync.RWMutex
|
||||
*proverPriorityPersist
|
||||
data map[string]string
|
||||
}
|
||||
|
||||
// NewPriorityUpstreamManager creates a new PriorityUpstreamManager
|
||||
func NewPriorityUpstreamManager() *PriorityUpstreamManager {
|
||||
return &PriorityUpstreamManager{
|
||||
data: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
// NewPriorityUpstreamManager creates a new PriorityUpstreamManager
|
||||
func NewPriorityUpstreamManagerPersistent(db *gorm.DB) *PriorityUpstreamManager {
|
||||
return &PriorityUpstreamManager{
|
||||
data: make(map[string]string),
|
||||
proverPriorityPersist: NewProverPriorityPersist(db),
|
||||
}
|
||||
}
|
||||
|
||||
// Get retrieves the priority upstream for a given key
|
||||
func (p *PriorityUpstreamManager) Get(key string) (string, bool) {
|
||||
|
||||
p.RLock()
|
||||
value, exists := p.data[key]
|
||||
p.RUnlock()
|
||||
|
||||
if !exists {
|
||||
if v, err := p.proverPriorityPersist.Get(key); err != nil {
|
||||
log.Error("persistent priority record read failure", "error", err, "key", key)
|
||||
} else if v != "" {
|
||||
log.Debug("restore record from persistent layer", "key", key, "value", v)
|
||||
return v, true
|
||||
}
|
||||
}
|
||||
|
||||
return value, exists
|
||||
}
|
||||
|
||||
// Set sets the priority upstream for a given key
|
||||
func (p *PriorityUpstreamManager) Set(key, value string) {
|
||||
defer func() {
|
||||
if err := p.proverPriorityPersist.Update(key, value); err != nil {
|
||||
log.Error("update priority record failure", "error", err, "key", key, "value", value)
|
||||
}
|
||||
}()
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
p.data[key] = value
|
||||
}
|
||||
|
||||
// Delete removes the priority upstream for a given key
|
||||
func (p *PriorityUpstreamManager) Delete(key string) {
|
||||
defer func() {
|
||||
if err := p.proverPriorityPersist.Del(key); err != nil {
|
||||
log.Error("delete priority record failure", "error", err, "key", key)
|
||||
}
|
||||
}()
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
delete(p.data, key)
|
||||
}
|
||||
|
||||
// GetTaskController the get prover task api controller
|
||||
type GetTaskController struct {
|
||||
proverMgr *ProverManager
|
||||
clients Clients
|
||||
priorityUpstream *PriorityUpstreamManager
|
||||
|
||||
//workingRnd *rand.Rand
|
||||
//getTaskAccessCounter *prometheus.CounterVec
|
||||
}
|
||||
|
||||
// NewGetTaskController create a get prover task controller
|
||||
func NewGetTaskController(cfg *config.ProxyConfig, clients Clients, proverMgr *ProverManager, priorityMgr *PriorityUpstreamManager, reg prometheus.Registerer) *GetTaskController {
|
||||
// TODO: implement proxy get task controller initialization
|
||||
return &GetTaskController{
|
||||
priorityUpstream: priorityMgr,
|
||||
proverMgr: proverMgr,
|
||||
clients: clients,
|
||||
}
|
||||
}
|
||||
|
||||
// func (ptc *GetTaskController) incGetTaskAccessCounter(ctx *gin.Context) error {
|
||||
// // TODO: implement proxy get task access counter
|
||||
// return nil
|
||||
// }
|
||||
|
||||
// GetTasks get assigned chunk/batch task
|
||||
func (ptc *GetTaskController) GetTasks(ctx *gin.Context) {
|
||||
|
||||
var getTaskParameter coordinatorType.GetTaskParameter
|
||||
if err := ctx.ShouldBind(&getTaskParameter); err != nil {
|
||||
nerr := fmt.Errorf("prover task parameter invalid, err:%w", err)
|
||||
types.RenderFailure(ctx, types.ErrCoordinatorParameterInvalidNo, nerr)
|
||||
return
|
||||
}
|
||||
|
||||
publicKey, proverName := getSessionData(ctx)
|
||||
if publicKey == "" {
|
||||
return
|
||||
}
|
||||
|
||||
session := ptc.proverMgr.Get(publicKey)
|
||||
if session == nil {
|
||||
nerr := fmt.Errorf("Trigger re-login: can not get session for prover %s", proverName)
|
||||
// has to trigger a login in proving-sdk side with ErrJWTTokenExpired error
|
||||
types.RenderFailure(ctx, types.ErrJWTTokenExpired, nerr)
|
||||
return
|
||||
}
|
||||
|
||||
getTask := func(cli Client) (error, int) {
|
||||
log.Debug("Start get task", "up", cli.Name(), "cli", proverName)
|
||||
upStream := cli.Name()
|
||||
resp, err := session.GetTask(ctx, &getTaskParameter, cli)
|
||||
if err != nil {
|
||||
log.Error("Upstream error for get task", "error", err, "up", upStream, "cli", proverName)
|
||||
return err, types.ErrCoordinatorGetTaskFailure
|
||||
} else if resp.ErrCode != types.ErrCoordinatorEmptyProofData {
|
||||
|
||||
if resp.ErrCode != 0 {
|
||||
// simply dispatch the error from upstream to prover
|
||||
log.Error("Upstream has error resp for get task", "code", resp.ErrCode, "msg", resp.ErrMsg, "up", upStream, "cli", proverName)
|
||||
return fmt.Errorf("upstream failure %s:", resp.ErrMsg), resp.ErrCode
|
||||
}
|
||||
|
||||
var task coordinatorType.GetTaskSchema
|
||||
if err = resp.DecodeData(&task); err == nil {
|
||||
task.TaskID = formUpstreamWithTaskName(upStream, task.TaskID)
|
||||
ptc.priorityUpstream.Set(publicKey, upStream)
|
||||
log.Debug("Upstream get task", "up", upStream, "cli", proverName, "taskID", task.TaskID, "taskType", task.TaskType)
|
||||
types.RenderSuccess(ctx, &task)
|
||||
return nil, 0
|
||||
} else {
|
||||
log.Error("Upstream has wrong data for get task", "error", err, "up", upStream, "cli", proverName)
|
||||
return fmt.Errorf("decode task fail: %v", err), types.InternalServerError
|
||||
}
|
||||
}
|
||||
|
||||
return nil, resp.ErrCode
|
||||
}
|
||||
|
||||
// if the priority upstream is set, we try this upstream first until get the task resp or no task resp
|
||||
priorityUpstream, exist := ptc.priorityUpstream.Get(publicKey)
|
||||
if exist {
|
||||
cli := ptc.clients[priorityUpstream]
|
||||
log.Debug("Try get task from priority stream", "up", priorityUpstream, "cli", proverName)
|
||||
if cli != nil {
|
||||
err, code := getTask(cli)
|
||||
if err != nil {
|
||||
types.RenderFailure(ctx, code, err)
|
||||
return
|
||||
} else if code == 0 {
|
||||
// get task done and rendered, return
|
||||
return
|
||||
}
|
||||
// only continue if get empty task (the task has been removed in upstream)
|
||||
log.Debug("can not get priority task from upstream", "up", priorityUpstream, "cli", proverName)
|
||||
|
||||
} else {
|
||||
log.Warn("A upstream is removed or lost for some reason while running", "up", priorityUpstream, "cli", proverName)
|
||||
}
|
||||
}
|
||||
ptc.priorityUpstream.Delete(publicKey)
|
||||
|
||||
// Create a slice to hold the keys
|
||||
keys := make([]string, 0, len(ptc.clients))
|
||||
for k := range ptc.clients {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
|
||||
// Shuffle the keys using a local RNG (avoid deprecated rand.Seed)
|
||||
rand.Shuffle(len(keys), func(i, j int) {
|
||||
keys[i], keys[j] = keys[j], keys[i]
|
||||
})
|
||||
|
||||
// Iterate over the shuffled keys
|
||||
for _, n := range keys {
|
||||
if err, code := getTask(ptc.clients[n]); err == nil && code == 0 {
|
||||
// get task done
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug("get no task from upstream", "cli", proverName)
|
||||
// if all get task failed, throw empty proof resp
|
||||
types.RenderFailure(ctx, types.ErrCoordinatorEmptyProofData, fmt.Errorf("get empty prover task"))
|
||||
}
|
||||
125
coordinator/internal/controller/proxy/persistent.go
Normal file
125
coordinator/internal/controller/proxy/persistent.go
Normal file
@@ -0,0 +1,125 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
|
||||
"scroll-tech/coordinator/internal/types"
|
||||
)
|
||||
|
||||
type proverDataPersist struct {
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
// NewProverDataPersist creates a persistence instance backed by a gorm DB.
|
||||
func NewProverDataPersist(db *gorm.DB) *proverDataPersist {
|
||||
return &proverDataPersist{db: db}
|
||||
}
|
||||
|
||||
// gorm model mapping to table `prover_sessions`
|
||||
type proverSessionRecord struct {
|
||||
PublicKey string `gorm:"column:public_key;not null"`
|
||||
Upstream string `gorm:"column:upstream;not null"`
|
||||
UpToken string `gorm:"column:up_token;not null"`
|
||||
Expired time.Time `gorm:"column:expired;not null"`
|
||||
}
|
||||
|
||||
func (proverSessionRecord) TableName() string { return "prover_sessions" }
|
||||
|
||||
// priority_upstream model
|
||||
type priorityUpstreamRecord struct {
|
||||
PublicKey string `gorm:"column:public_key;not null"`
|
||||
Upstream string `gorm:"column:upstream;not null"`
|
||||
}
|
||||
|
||||
func (priorityUpstreamRecord) TableName() string { return "priority_upstream" }
|
||||
|
||||
// get retrieves ProverSession for a given user key, returns empty if still not exists
|
||||
func (p *proverDataPersist) Get(userKey string) (*proverSession, error) {
|
||||
if p == nil || p.db == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var rows []proverSessionRecord
|
||||
if err := p.db.Where("public_key = ?", userKey).Find(&rows).Error; err != nil || len(rows) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ret := &proverSession{
|
||||
proverToken: make(map[string]loginToken),
|
||||
}
|
||||
for _, r := range rows {
|
||||
ls := &types.LoginSchema{
|
||||
Token: r.UpToken,
|
||||
Time: r.Expired,
|
||||
}
|
||||
ret.proverToken[r.Upstream] = loginToken{LoginSchema: ls}
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (p *proverDataPersist) Update(userKey, up string, login *types.LoginSchema) error {
|
||||
if p == nil || p.db == nil || login == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
rec := proverSessionRecord{
|
||||
PublicKey: userKey,
|
||||
Upstream: up,
|
||||
UpToken: login.Token,
|
||||
Expired: login.Time,
|
||||
}
|
||||
|
||||
return p.db.Clauses(
|
||||
clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "public_key"}, {Name: "upstream"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{"up_token", "expired"}),
|
||||
},
|
||||
).Create(&rec).Error
|
||||
}
|
||||
|
||||
type proverPriorityPersist struct {
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
func NewProverPriorityPersist(db *gorm.DB) *proverPriorityPersist {
|
||||
return &proverPriorityPersist{db: db}
|
||||
}
|
||||
|
||||
func (p *proverPriorityPersist) Get(userKey string) (string, error) {
|
||||
if p == nil || p.db == nil {
|
||||
return "", nil
|
||||
}
|
||||
var rec priorityUpstreamRecord
|
||||
if err := p.db.Where("public_key = ?", userKey).First(&rec).Error; err != nil {
|
||||
if err != gorm.ErrRecordNotFound {
|
||||
return "", err
|
||||
} else {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
}
|
||||
return rec.Upstream, nil
|
||||
}
|
||||
|
||||
func (p *proverPriorityPersist) Update(userKey, up string) error {
|
||||
if p == nil || p.db == nil {
|
||||
return nil
|
||||
}
|
||||
rec := priorityUpstreamRecord{PublicKey: userKey, Upstream: up}
|
||||
return p.db.Clauses(
|
||||
clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "public_key"}},
|
||||
DoUpdates: clause.Assignments(map[string]interface{}{"upstream": up}),
|
||||
},
|
||||
).Create(&rec).Error
|
||||
}
|
||||
|
||||
func (p *proverPriorityPersist) Del(userKey string) error {
|
||||
if p == nil || p.db == nil {
|
||||
return nil
|
||||
}
|
||||
return p.db.Where("public_key = ?", userKey).Delete(&priorityUpstreamRecord{}).Error
|
||||
}
|
||||
343
coordinator/internal/controller/proxy/prover_session.go
Normal file
343
coordinator/internal/controller/proxy/prover_session.go
Normal file
@@ -0,0 +1,343 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
"gorm.io/gorm"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
|
||||
ctypes "scroll-tech/common/types"
|
||||
|
||||
"scroll-tech/coordinator/internal/types"
|
||||
)
|
||||
|
||||
type ProverManager struct {
|
||||
sync.RWMutex
|
||||
data map[string]*proverSession
|
||||
willDeprecatedData map[string]*proverSession
|
||||
sizeLimit int
|
||||
persistent *proverDataPersist
|
||||
|
||||
clientLoginTries *prometheus.CounterVec
|
||||
getTaskTries *prometheus.CounterVec
|
||||
submitTaskTries *prometheus.CounterVec
|
||||
|
||||
upstreamFail *prometheus.GaugeVec
|
||||
clientLoginFail *prometheus.CounterVec
|
||||
getTaskFail *prometheus.CounterVec
|
||||
submitTaskFail *prometheus.CounterVec
|
||||
}
|
||||
|
||||
func NewProverManager(size int, reg prometheus.Registerer) *ProverManager {
|
||||
m := &ProverManager{
|
||||
data: make(map[string]*proverSession),
|
||||
willDeprecatedData: make(map[string]*proverSession),
|
||||
sizeLimit: size,
|
||||
}
|
||||
m.registerCounters(reg)
|
||||
return m
|
||||
}
|
||||
|
||||
func NewProverManagerWithPersistent(size int, db *gorm.DB, reg prometheus.Registerer) *ProverManager {
|
||||
m := &ProverManager{
|
||||
data: make(map[string]*proverSession),
|
||||
willDeprecatedData: make(map[string]*proverSession),
|
||||
sizeLimit: size,
|
||||
persistent: NewProverDataPersist(db),
|
||||
}
|
||||
m.registerCounters(reg)
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *ProverManager) registerCounters(reg prometheus.Registerer) {
|
||||
m.upstreamFail = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "upstream_error",
|
||||
Help: "Set to 1 while a upstream session can not be obtained",
|
||||
}, []string{"upstream"})
|
||||
|
||||
m.clientLoginFail = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "login_failure",
|
||||
Help: "Client login has encountered an error",
|
||||
}, []string{"upstream"})
|
||||
|
||||
m.getTaskFail = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "getTask_failure",
|
||||
Help: "GetTask request has encountered an error",
|
||||
}, []string{"upstream"})
|
||||
|
||||
m.submitTaskFail = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "submitTask_failure",
|
||||
Help: "SubmitTask request has encountered an error",
|
||||
}, []string{"upstream"})
|
||||
|
||||
m.clientLoginTries = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "prover_speed",
|
||||
Help: "Cycle against running time of prover (in mhz)",
|
||||
}, []string{"upstream"})
|
||||
|
||||
m.getTaskTries = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "prover_speed",
|
||||
Help: "Cycle against running time of prover (in mhz)",
|
||||
}, []string{"upstream"})
|
||||
|
||||
m.submitTaskTries = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "prover_speed",
|
||||
Help: "Cycle against running time of prover (in mhz)",
|
||||
}, []string{"upstream"})
|
||||
|
||||
if reg != nil {
|
||||
reg.MustRegister(m.upstreamFail)
|
||||
reg.MustRegister(m.clientLoginFail)
|
||||
}
|
||||
}
|
||||
|
||||
// get retrieves ProverSession for a given user key, returns empty if still not exists
|
||||
func (m *ProverManager) Get(userKey string) (ret *proverSession) {
|
||||
defer func() {
|
||||
if ret == nil {
|
||||
var err error
|
||||
ret, err = m.persistent.Get(userKey)
|
||||
if err != nil {
|
||||
log.Error("Get persistent layer for prover tokens fail", "error", err)
|
||||
} else if ret != nil {
|
||||
log.Debug("restore record from persistent", "key", userKey)
|
||||
ret.persistent = m.persistent
|
||||
}
|
||||
}
|
||||
|
||||
if ret != nil {
|
||||
m.Lock()
|
||||
m.data[userKey] = ret
|
||||
m.Unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
if r, existed := m.data[userKey]; existed {
|
||||
return r
|
||||
} else {
|
||||
return m.willDeprecatedData[userKey]
|
||||
}
|
||||
}
|
||||
|
||||
func (m *ProverManager) GetOrCreate(userKey string) *proverSession {
|
||||
|
||||
if ret := m.Get(userKey); ret != nil {
|
||||
return ret
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
ret := &proverSession{
|
||||
proverToken: make(map[string]loginToken),
|
||||
persistent: m.persistent,
|
||||
}
|
||||
|
||||
if len(m.data) >= m.sizeLimit {
|
||||
m.willDeprecatedData = m.data
|
||||
m.data = make(map[string]*proverSession)
|
||||
}
|
||||
|
||||
m.data[userKey] = ret
|
||||
return ret
|
||||
}
|
||||
|
||||
type loginToken struct {
|
||||
*types.LoginSchema
|
||||
phase uint
|
||||
}
|
||||
|
||||
// Client wraps an http client with a preset host for coordinator API calls
|
||||
type proverSession struct {
|
||||
persistent *proverDataPersist
|
||||
|
||||
sync.RWMutex
|
||||
proverToken map[string]loginToken
|
||||
completionCtx context.Context
|
||||
}
|
||||
|
||||
func (c *proverSession) maintainLogin(ctx context.Context, cliMgr Client, up string, param *types.LoginParameter, phase uint) (result loginToken, nerr error) {
|
||||
c.Lock()
|
||||
curPhase := c.proverToken[up].phase
|
||||
if c.completionCtx != nil {
|
||||
waitctx := c.completionCtx
|
||||
c.Unlock()
|
||||
select {
|
||||
case <-waitctx.Done():
|
||||
return c.maintainLogin(ctx, cliMgr, up, param, phase)
|
||||
case <-ctx.Done():
|
||||
nerr = fmt.Errorf("ctx fail")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if phase < curPhase {
|
||||
// outdate login phase, give up
|
||||
log.Debug("drop outdated proxy login attempt", "upstream", up, "cli", param.Message.ProverName, "phase", phase, "now", curPhase)
|
||||
defer c.Unlock()
|
||||
return c.proverToken[up], nil
|
||||
}
|
||||
|
||||
// occupy the update slot
|
||||
completeCtx, cf := context.WithCancel(context.Background())
|
||||
defer cf()
|
||||
c.completionCtx = completeCtx
|
||||
defer func() {
|
||||
c.Lock()
|
||||
c.completionCtx = nil
|
||||
if result.LoginSchema != nil {
|
||||
c.proverToken[up] = result
|
||||
log.Info("maintain login status", "upstream", up, "cli", param.Message.ProverName, "phase", curPhase+1)
|
||||
}
|
||||
c.Unlock()
|
||||
if nerr != nil {
|
||||
log.Error("maintain login fail", "error", nerr, "upstream", up, "cli", param.Message.ProverName, "phase", curPhase)
|
||||
}
|
||||
}()
|
||||
c.Unlock()
|
||||
|
||||
log.Debug("start proxy login process", "upstream", up, "cli", param.Message.ProverName)
|
||||
|
||||
cli := cliMgr.ClientAsProxy(ctx)
|
||||
if cli == nil {
|
||||
nerr = fmt.Errorf("get upstream cli fail")
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := cli.ProxyLogin(ctx, param)
|
||||
if err != nil {
|
||||
nerr = fmt.Errorf("proxylogin fail: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if resp.ErrCode == ctypes.ErrJWTTokenExpired || resp.ErrCode == ctypes.ErrJWTCommonErr {
|
||||
log.Info("up stream has expired, renew upstream connection", "up", up, "errcode", resp.ErrCode)
|
||||
cli.Reset()
|
||||
cli = cliMgr.ClientAsProxy(ctx)
|
||||
if cli == nil {
|
||||
nerr = fmt.Errorf("get upstream cli fail (secondary try)")
|
||||
return
|
||||
}
|
||||
|
||||
// like SDK, we would try one more time if the upstream token is expired
|
||||
resp, err = cli.ProxyLogin(ctx, param)
|
||||
if err != nil {
|
||||
nerr = fmt.Errorf("proxylogin fail: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if resp.ErrCode != 0 {
|
||||
nerr = fmt.Errorf("upstream fail: %d (%s)", resp.ErrCode, resp.ErrMsg)
|
||||
return
|
||||
}
|
||||
|
||||
var loginResult loginSchema
|
||||
if err := resp.DecodeData(&loginResult); err != nil {
|
||||
nerr = err
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("Proxy login done", "upstream", up, "cli", param.Message.ProverName)
|
||||
result = loginToken{
|
||||
LoginSchema: &types.LoginSchema{
|
||||
Token: loginResult.Token,
|
||||
},
|
||||
phase: curPhase + 1,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// const expireTolerant = 10 * time.Minute
|
||||
|
||||
// ProxyLogin makes a POST request to /v1/proxy_login with LoginParameter
|
||||
func (c *proverSession) ProxyLogin(ctx context.Context, cli Client, param *types.LoginParameter) error {
|
||||
up := cli.Name()
|
||||
c.RLock()
|
||||
existedToken := c.proverToken[up]
|
||||
c.RUnlock()
|
||||
|
||||
newtoken, err := c.maintainLogin(ctx, cli, up, param, math.MaxUint)
|
||||
if newtoken.phase > existedToken.phase {
|
||||
if err := c.persistent.Update(param.PublicKey, up, newtoken.LoginSchema); err != nil {
|
||||
log.Error("Update persistent layer for prover tokens fail", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// GetTask makes a POST request to /v1/get_task with GetTaskParameter
|
||||
func (c *proverSession) GetTask(ctx context.Context, param *types.GetTaskParameter, cliMgr Client) (*ctypes.Response, error) {
|
||||
up := cliMgr.Name()
|
||||
c.RLock()
|
||||
log.Debug("call get task", "up", up, "tokens", c.proverToken)
|
||||
token := c.proverToken[up]
|
||||
c.RUnlock()
|
||||
|
||||
if token.LoginSchema != nil {
|
||||
resp, err := cliMgr.Client(token.Token).GetTask(ctx, param)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp.ErrCode != ctypes.ErrJWTTokenExpired && resp.ErrCode != ctypes.ErrJWTCommonErr {
|
||||
return resp, nil
|
||||
}
|
||||
log.Debug("Get Task first-try failed for broken token", "up", up, "errcode", resp.ErrCode)
|
||||
}
|
||||
|
||||
// like SDK, we would try one more time if the upstream token is expired
|
||||
// get param from ctx
|
||||
loginParam, ok := ctx.Value(LoginParamCache).(*types.LoginParameter)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Unexpected error, no loginparam ctx value")
|
||||
}
|
||||
|
||||
newToken, err := c.maintainLogin(ctx, cliMgr, up, loginParam, token.phase)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("update prover token fail: %v", err)
|
||||
}
|
||||
|
||||
return cliMgr.Client(newToken.Token).GetTask(ctx, param)
|
||||
|
||||
}
|
||||
|
||||
// SubmitProof makes a POST request to /v1/submit_proof with SubmitProofParameter
|
||||
func (c *proverSession) SubmitProof(ctx context.Context, param *types.SubmitProofParameter, cliMgr Client) (*ctypes.Response, error) {
|
||||
up := cliMgr.Name()
|
||||
c.RLock()
|
||||
token := c.proverToken[up]
|
||||
c.RUnlock()
|
||||
|
||||
if token.LoginSchema != nil {
|
||||
resp, err := cliMgr.Client(token.Token).SubmitProof(ctx, param)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp.ErrCode != ctypes.ErrJWTTokenExpired && resp.ErrCode != ctypes.ErrJWTCommonErr {
|
||||
return resp, nil
|
||||
}
|
||||
log.Debug("Submit Proof first-try failed for broken token", "up", up, "errcode", resp.ErrCode)
|
||||
}
|
||||
|
||||
// like SDK, we would try one more time if the upstream token is expired
|
||||
// get param from ctx
|
||||
loginParam, ok := ctx.Value(LoginParamCache).(*types.LoginParameter)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Unexpected error, no loginparam ctx value")
|
||||
}
|
||||
|
||||
newToken, err := c.maintainLogin(ctx, cliMgr, up, loginParam, token.phase)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("update prover token fail: %v", err)
|
||||
}
|
||||
|
||||
return cliMgr.Client(newToken.Token).SubmitProof(ctx, param)
|
||||
}
|
||||
107
coordinator/internal/controller/proxy/prover_session_test.go
Normal file
107
coordinator/internal/controller/proxy/prover_session_test.go
Normal file
@@ -0,0 +1,107 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestProverManagerGetAndCreate validates basic creation and retrieval semantics.
|
||||
func TestProverManagerGetAndCreate(t *testing.T) {
|
||||
pm := NewProverManager(2, nil)
|
||||
|
||||
if got := pm.Get("user1"); got != nil {
|
||||
t.Fatalf("expected nil for non-existent key, got: %+v", got)
|
||||
}
|
||||
|
||||
sess1 := pm.GetOrCreate("user1")
|
||||
if sess1 == nil {
|
||||
t.Fatalf("expected non-nil session from GetOrCreate")
|
||||
}
|
||||
|
||||
// Should be stable on subsequent Get
|
||||
if got := pm.Get("user1"); got != sess1 {
|
||||
t.Fatalf("expected same session pointer on Get, got different instance: %p vs %p", got, sess1)
|
||||
}
|
||||
}
|
||||
|
||||
// TestProverManagerRolloverAndPromotion verifies rollover when sizeLimit is reached
|
||||
// and that old entries are accessible and promoted back to active data map.
|
||||
func TestProverManagerRolloverAndPromotion(t *testing.T) {
|
||||
pm := NewProverManager(2, nil)
|
||||
|
||||
s1 := pm.GetOrCreate("u1")
|
||||
s2 := pm.GetOrCreate("u2")
|
||||
if s1 == nil || s2 == nil {
|
||||
t.Fatalf("expected sessions to be created for u1/u2")
|
||||
}
|
||||
|
||||
// Precondition: data should contain 2 entries, no deprecated yet.
|
||||
pm.RLock()
|
||||
if len(pm.data) != 2 {
|
||||
pm.RUnlock()
|
||||
t.Fatalf("expected data len=2 before rollover, got %d", len(pm.data))
|
||||
}
|
||||
if len(pm.willDeprecatedData) != 0 {
|
||||
pm.RUnlock()
|
||||
t.Fatalf("expected willDeprecatedData len=0 before rollover, got %d", len(pm.willDeprecatedData))
|
||||
}
|
||||
pm.RUnlock()
|
||||
|
||||
// Trigger rollover by creating a third key.
|
||||
s3 := pm.GetOrCreate("u3")
|
||||
if s3 == nil {
|
||||
t.Fatalf("expected session for u3 after rollover")
|
||||
}
|
||||
|
||||
// After rollover: current data should only have u3, deprecated should hold u1 and u2.
|
||||
pm.RLock()
|
||||
if len(pm.data) != 1 {
|
||||
pm.RUnlock()
|
||||
t.Fatalf("expected data len=1 after rollover (only u3), got %d", len(pm.data))
|
||||
}
|
||||
if _, ok := pm.data["u3"]; !ok {
|
||||
pm.RUnlock()
|
||||
t.Fatalf("expected 'u3' to be in active data after rollover")
|
||||
}
|
||||
if len(pm.willDeprecatedData) != 2 {
|
||||
pm.RUnlock()
|
||||
t.Fatalf("expected willDeprecatedData len=2 after rollover, got %d", len(pm.willDeprecatedData))
|
||||
}
|
||||
pm.RUnlock()
|
||||
|
||||
// Accessing an old key should return the same pointer and promote it to active data map.
|
||||
got1 := pm.Get("u1")
|
||||
if got1 != s1 {
|
||||
t.Fatalf("expected same pointer for u1 after promotion, got %p want %p", got1, s1)
|
||||
}
|
||||
|
||||
// The promotion should add it to active data (without enforcing size limit on promotion).
|
||||
pm.RLock()
|
||||
if _, ok := pm.data["u1"]; !ok {
|
||||
pm.RUnlock()
|
||||
t.Fatalf("expected 'u1' to be present in active data after promotion")
|
||||
}
|
||||
if len(pm.data) != 2 {
|
||||
// Now should contain u3 and u1
|
||||
pm.RUnlock()
|
||||
t.Fatalf("expected data len=2 after promotion of u1, got %d", len(pm.data))
|
||||
}
|
||||
pm.RUnlock()
|
||||
|
||||
// Access the other deprecated key and ensure behavior is consistent.
|
||||
got2 := pm.Get("u2")
|
||||
if got2 != s2 {
|
||||
t.Fatalf("expected same pointer for u2 after promotion, got %p want %p", got2, s2)
|
||||
}
|
||||
|
||||
pm.RLock()
|
||||
if _, ok := pm.data["u2"]; !ok {
|
||||
pm.RUnlock()
|
||||
t.Fatalf("expected 'u2' to be present in active data after promotion")
|
||||
}
|
||||
// Note: promotion does not enforce sizeLimit, so data can grow beyond sizeLimit after promotions.
|
||||
if len(pm.data) != 3 {
|
||||
pm.RUnlock()
|
||||
t.Fatalf("expected data len=3 after promoting both u1 and u2, got %d", len(pm.data))
|
||||
}
|
||||
pm.RUnlock()
|
||||
}
|
||||
95
coordinator/internal/controller/proxy/submit_proof.go
Normal file
95
coordinator/internal/controller/proxy/submit_proof.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
|
||||
"scroll-tech/common/types"
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
coordinatorType "scroll-tech/coordinator/internal/types"
|
||||
)
|
||||
|
||||
// SubmitProofController the submit proof api controller
|
||||
type SubmitProofController struct {
|
||||
proverMgr *ProverManager
|
||||
clients Clients
|
||||
priorityUpstream *PriorityUpstreamManager
|
||||
}
|
||||
|
||||
// NewSubmitProofController create the submit proof api controller instance
|
||||
func NewSubmitProofController(cfg *config.ProxyConfig, clients Clients, proverMgr *ProverManager, priorityMgr *PriorityUpstreamManager, reg prometheus.Registerer) *SubmitProofController {
|
||||
return &SubmitProofController{
|
||||
proverMgr: proverMgr,
|
||||
clients: clients,
|
||||
priorityUpstream: priorityMgr,
|
||||
}
|
||||
}
|
||||
|
||||
func upstreamFromTaskName(taskID string) (string, string) {
|
||||
parts, rest, found := strings.Cut(taskID, ":")
|
||||
if found {
|
||||
return parts, rest
|
||||
}
|
||||
return "", parts
|
||||
}
|
||||
|
||||
func formUpstreamWithTaskName(upstream string, taskID string) string {
|
||||
return fmt.Sprintf("%s:%s", upstream, taskID)
|
||||
}
|
||||
|
||||
// SubmitProof prover submit the proof to coordinator
|
||||
func (spc *SubmitProofController) SubmitProof(ctx *gin.Context) {
|
||||
|
||||
var submitParameter coordinatorType.SubmitProofParameter
|
||||
if err := ctx.ShouldBind(&submitParameter); err != nil {
|
||||
nerr := fmt.Errorf("prover submitProof parameter invalid, err:%w", err)
|
||||
types.RenderFailure(ctx, types.ErrCoordinatorParameterInvalidNo, nerr)
|
||||
return
|
||||
}
|
||||
|
||||
publicKey, proverName := getSessionData(ctx)
|
||||
if publicKey == "" {
|
||||
return
|
||||
}
|
||||
|
||||
session := spc.proverMgr.Get(publicKey)
|
||||
if session == nil {
|
||||
nerr := fmt.Errorf("Trigger re-login: can not get session for prover %s", proverName)
|
||||
// has to trigger a login in proving-sdk side with ErrJWTTokenExpired error
|
||||
types.RenderFailure(ctx, types.ErrJWTTokenExpired, nerr)
|
||||
return
|
||||
}
|
||||
|
||||
upstream, realTaskID := upstreamFromTaskName(submitParameter.TaskID)
|
||||
cli, existed := spc.clients[upstream]
|
||||
if !existed {
|
||||
log.Warn("A upstream for submitting is removed or lost for some reason while running", "up", upstream)
|
||||
nerr := fmt.Errorf("Invalid upstream name (%s) from taskID %s", upstream, submitParameter.TaskID)
|
||||
types.RenderFailure(ctx, types.ErrCoordinatorParameterInvalidNo, nerr)
|
||||
return
|
||||
}
|
||||
log.Debug("Start submitting", "up", upstream, "cli", proverName, "id", realTaskID, "status", submitParameter.Status)
|
||||
submitParameter.TaskID = realTaskID
|
||||
|
||||
resp, err := session.SubmitProof(ctx, &submitParameter, cli)
|
||||
if err != nil {
|
||||
log.Error("Upstream has error resp for submit", "error", err, "up", upstream, "cli", proverName, "taskID", realTaskID)
|
||||
types.RenderFailure(ctx, types.ErrCoordinatorGetTaskFailure, err)
|
||||
return
|
||||
} else if resp.ErrCode != 0 {
|
||||
log.Error("Upstream has error resp for get task", "code", resp.ErrCode, "msg", resp.ErrMsg, "up", upstream, "cli", proverName, "taskID", realTaskID)
|
||||
// simply dispatch the error from upstream to prover
|
||||
types.RenderFailure(ctx, resp.ErrCode, fmt.Errorf("%s", resp.ErrMsg))
|
||||
return
|
||||
} else {
|
||||
log.Debug("Submit proof to upstream", "up", upstream, "cli", proverName, "taskID", realTaskID)
|
||||
spc.priorityUpstream.Delete(publicKey)
|
||||
types.RenderSuccess(ctx, resp.Data)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package auth
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
@@ -19,45 +20,72 @@ import (
|
||||
|
||||
// LoginLogic the auth logic
|
||||
type LoginLogic struct {
|
||||
cfg *config.Config
|
||||
challengeOrm *orm.Challenge
|
||||
cfg *config.VerifierConfig
|
||||
deduplicator ChallengeDeduplicator
|
||||
|
||||
openVmVks map[string]struct{}
|
||||
|
||||
proverVersionHardForkMap map[string]string
|
||||
}
|
||||
|
||||
type ChallengeDeduplicator interface {
|
||||
InsertChallenge(ctx context.Context, challengeString string) error
|
||||
}
|
||||
|
||||
type SimpleDeduplicator struct {
|
||||
}
|
||||
|
||||
func (s *SimpleDeduplicator) InsertChallenge(ctx context.Context, challengeString string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewLoginLogicWithSimpleDEduplicator new a LoginLogic, do not use db to deduplicate challenge
|
||||
func NewLoginLogicWithSimpleDeduplicator(vcfg *config.VerifierConfig, vf *verifier.Verifier) *LoginLogic {
|
||||
return newLoginLogic(&SimpleDeduplicator{}, vcfg, vf)
|
||||
}
|
||||
|
||||
// NewLoginLogic new a LoginLogic
|
||||
func NewLoginLogic(db *gorm.DB, cfg *config.Config, vf *verifier.Verifier) *LoginLogic {
|
||||
func NewLoginLogic(db *gorm.DB, vcfg *config.VerifierConfig, vf *verifier.Verifier) *LoginLogic {
|
||||
return newLoginLogic(orm.NewChallenge(db), vcfg, vf)
|
||||
}
|
||||
|
||||
func newLoginLogic(deduplicator ChallengeDeduplicator, vcfg *config.VerifierConfig, vf *verifier.Verifier) *LoginLogic {
|
||||
|
||||
proverVersionHardForkMap := make(map[string]string)
|
||||
|
||||
for _, cfg := range cfg.ProverManager.Verifier.Verifiers {
|
||||
for _, cfg := range vcfg.Verifiers {
|
||||
proverVersionHardForkMap[cfg.ForkName] = cfg.MinProverVersion
|
||||
}
|
||||
|
||||
return &LoginLogic{
|
||||
cfg: cfg,
|
||||
cfg: vcfg,
|
||||
openVmVks: vf.OpenVMVkMap,
|
||||
challengeOrm: orm.NewChallenge(db),
|
||||
deduplicator: deduplicator,
|
||||
proverVersionHardForkMap: proverVersionHardForkMap,
|
||||
}
|
||||
}
|
||||
|
||||
// InsertChallengeString insert and check the challenge string is existed
|
||||
func (l *LoginLogic) InsertChallengeString(ctx *gin.Context, challenge string) error {
|
||||
return l.challengeOrm.InsertChallenge(ctx.Copy(), challenge)
|
||||
}
|
||||
|
||||
func (l *LoginLogic) Check(login *types.LoginParameter) error {
|
||||
// Verify the completeness of login message
|
||||
func VerifyMsg(login *types.LoginParameter) error {
|
||||
verify, err := login.Verify()
|
||||
if err != nil || !verify {
|
||||
log.Error("auth message verify failure", "prover_name", login.Message.ProverName,
|
||||
"prover_version", login.Message.ProverVersion, "message", login.Message)
|
||||
return errors.New("auth message verify failure")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if !version.CheckScrollRepoVersion(login.Message.ProverVersion, l.cfg.ProverManager.Verifier.MinProverVersion) {
|
||||
return fmt.Errorf("incompatible prover version. please upgrade your prover, minimum allowed version: %s, actual version: %s", l.cfg.ProverManager.Verifier.MinProverVersion, login.Message.ProverVersion)
|
||||
// InsertChallengeString insert and check the challenge string is existed
|
||||
func (l *LoginLogic) InsertChallengeString(ctx *gin.Context, challenge string) error {
|
||||
return l.deduplicator.InsertChallenge(ctx.Copy(), challenge)
|
||||
}
|
||||
|
||||
// Check if the login client is compatible with the setting in coordinator
|
||||
func (l *LoginLogic) CompatiblityCheck(login *types.LoginParameter) error {
|
||||
|
||||
if !version.CheckScrollRepoVersion(login.Message.ProverVersion, l.cfg.MinProverVersion) {
|
||||
return fmt.Errorf("incompatible prover version. please upgrade your prover, minimum allowed version: %s, actual version: %s", l.cfg.MinProverVersion, login.Message.ProverVersion)
|
||||
}
|
||||
|
||||
vks := make(map[string]struct{})
|
||||
@@ -65,27 +93,32 @@ func (l *LoginLogic) Check(login *types.LoginParameter) error {
|
||||
vks[vk] = struct{}{}
|
||||
}
|
||||
|
||||
for _, vk := range login.Message.VKs {
|
||||
if _, ok := vks[vk]; !ok {
|
||||
log.Error("vk inconsistency", "prover vk", vk, "prover name", login.Message.ProverName,
|
||||
"prover_version", login.Message.ProverVersion, "message", login.Message)
|
||||
if !version.CheckScrollProverVersion(login.Message.ProverVersion) {
|
||||
return fmt.Errorf("incompatible prover version. please upgrade your prover, expect version: %s, actual version: %s",
|
||||
version.Version, login.Message.ProverVersion)
|
||||
// new coordinator / proxy do not check vks while login, code only for backward compatibility
|
||||
if len(vks) != 0 {
|
||||
for _, vk := range login.Message.VKs {
|
||||
if _, ok := vks[vk]; !ok {
|
||||
log.Error("vk inconsistency", "prover vk", vk, "prover name", login.Message.ProverName,
|
||||
"prover_version", login.Message.ProverVersion, "message", login.Message)
|
||||
if !version.CheckScrollProverVersion(login.Message.ProverVersion) {
|
||||
return fmt.Errorf("incompatible prover version. please upgrade your prover, expect version: %s, actual version: %s",
|
||||
version.Version, login.Message.ProverVersion)
|
||||
}
|
||||
// if the prover reports a same prover version
|
||||
return errors.New("incompatible vk. please check your params files or config files")
|
||||
}
|
||||
// if the prover reports a same prover version
|
||||
return errors.New("incompatible vk. please check your params files or config files")
|
||||
}
|
||||
}
|
||||
|
||||
if login.Message.ProverProviderType != types.ProverProviderTypeInternal && login.Message.ProverProviderType != types.ProverProviderTypeExternal {
|
||||
switch login.Message.ProverProviderType {
|
||||
case types.ProverProviderTypeInternal:
|
||||
case types.ProverProviderTypeExternal:
|
||||
case types.ProverProviderTypeProxy:
|
||||
case types.ProverProviderTypeUndefined:
|
||||
// for backward compatibility, set ProverProviderType as internal
|
||||
if login.Message.ProverProviderType == types.ProverProviderTypeUndefined {
|
||||
login.Message.ProverProviderType = types.ProverProviderTypeInternal
|
||||
} else {
|
||||
log.Error("invalid prover_provider_type", "value", login.Message.ProverProviderType, "prover name", login.Message.ProverName, "prover version", login.Message.ProverVersion)
|
||||
return errors.New("invalid prover provider type.")
|
||||
}
|
||||
login.Message.ProverProviderType = types.ProverProviderTypeInternal
|
||||
default:
|
||||
log.Error("invalid prover_provider_type", "value", login.Message.ProverProviderType, "prover name", login.Message.ProverName, "prover version", login.Message.ProverVersion)
|
||||
return errors.New("invalid prover provider type.")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
//go:build !mock_verifier
|
||||
|
||||
package libzkp
|
||||
|
||||
/*
|
||||
@@ -13,8 +15,6 @@ import (
|
||||
"os"
|
||||
"strings"
|
||||
"unsafe"
|
||||
|
||||
"scroll-tech/common/types/message"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -72,31 +72,6 @@ func VerifyBundleProof(proofData, forkName string) bool {
|
||||
return result != 0
|
||||
}
|
||||
|
||||
// TaskType enum values matching the Rust enum
|
||||
const (
|
||||
TaskTypeChunk = 0
|
||||
TaskTypeBatch = 1
|
||||
TaskTypeBundle = 2
|
||||
)
|
||||
|
||||
func fromMessageTaskType(taskType int) int {
|
||||
switch message.ProofType(taskType) {
|
||||
case message.ProofTypeChunk:
|
||||
return TaskTypeChunk
|
||||
case message.ProofTypeBatch:
|
||||
return TaskTypeBatch
|
||||
case message.ProofTypeBundle:
|
||||
return TaskTypeBundle
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported proof type: %d", taskType))
|
||||
}
|
||||
}
|
||||
|
||||
// Generate a universal task
|
||||
func GenerateUniversalTask(taskType int, taskJSON, forkName string, expectedVk []byte, decryptionKey []byte) (bool, string, string, []byte) {
|
||||
return generateUniversalTask(fromMessageTaskType(taskType), taskJSON, strings.ToLower(forkName), expectedVk, decryptionKey)
|
||||
}
|
||||
|
||||
// Generate wrapped proof
|
||||
func GenerateWrappedProof(proofJSON, metadata string, vkData []byte) string {
|
||||
cProofJSON := goToCString(proofJSON)
|
||||
|
||||
57
coordinator/internal/logic/libzkp/lib_mock.go
Normal file
57
coordinator/internal/logic/libzkp/lib_mock.go
Normal file
@@ -0,0 +1,57 @@
|
||||
//go:build mock_verifier
|
||||
|
||||
package libzkp
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
// // InitVerifier is a no-op in the mock.
|
||||
// func InitVerifier(configJSON string) {}
|
||||
|
||||
// // VerifyChunkProof returns a fixed success in the mock.
|
||||
// func VerifyChunkProof(proofData, forkName string) bool {
|
||||
// return true
|
||||
// }
|
||||
|
||||
// // VerifyBatchProof returns a fixed success in the mock.
|
||||
// func VerifyBatchProof(proofData, forkName string) bool {
|
||||
// return true
|
||||
// }
|
||||
|
||||
// // VerifyBundleProof returns a fixed success in the mock.
|
||||
// func VerifyBundleProof(proofData, forkName string) bool {
|
||||
// return true
|
||||
// }
|
||||
|
||||
func UniversalTaskCompatibilityFix(taskJSON string) (string, error) {
|
||||
panic("should not run here")
|
||||
}
|
||||
|
||||
// GenerateWrappedProof returns a fixed dummy proof string in the mock.
|
||||
func GenerateWrappedProof(proofJSON, metadata string, vkData []byte) string {
|
||||
|
||||
payload := struct {
|
||||
Metadata json.RawMessage `json:"metadata"`
|
||||
Proof json.RawMessage `json:"proof"`
|
||||
GitVersion string `json:"git_version"`
|
||||
}{
|
||||
Metadata: json.RawMessage(metadata),
|
||||
Proof: json.RawMessage(proofJSON),
|
||||
GitVersion: "mock-git-version",
|
||||
}
|
||||
|
||||
out, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return string(out)
|
||||
}
|
||||
|
||||
// DumpVk is a no-op and returns nil in the mock.
|
||||
func DumpVk(forkName, filePath string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetDynamicFeature is a no-op in the mock.
|
||||
func SetDynamicFeature(feats string) {}
|
||||
27
coordinator/internal/logic/libzkp/message_types.go
Normal file
27
coordinator/internal/logic/libzkp/message_types.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package libzkp
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"scroll-tech/common/types/message"
|
||||
)
|
||||
|
||||
// TaskType enum values matching the Rust enum
|
||||
const (
|
||||
TaskTypeChunk = 0
|
||||
TaskTypeBatch = 1
|
||||
TaskTypeBundle = 2
|
||||
)
|
||||
|
||||
func fromMessageTaskType(taskType int) int {
|
||||
switch message.ProofType(taskType) {
|
||||
case message.ProofTypeChunk:
|
||||
return TaskTypeChunk
|
||||
case message.ProofTypeBatch:
|
||||
return TaskTypeBatch
|
||||
case message.ProofTypeBundle:
|
||||
return TaskTypeBundle
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported proof type: %d", taskType))
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@ package libzkp
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"scroll-tech/common/types/message"
|
||||
|
||||
@@ -14,6 +15,10 @@ import (
|
||||
func InitL2geth(configJSON string) {
|
||||
}
|
||||
|
||||
func GenerateUniversalTask(taskType int, taskJSON, forkName string, expectedVk []byte, decryptionKey []byte) (bool, string, string, []byte) {
|
||||
return generateUniversalTask(fromMessageTaskType(taskType), taskJSON, strings.ToLower(forkName), expectedVk, decryptionKey)
|
||||
}
|
||||
|
||||
func generateUniversalTask(taskType int, taskJSON, forkName string, expectedVk []byte, decryptionKey []byte) (bool, string, string, []byte) {
|
||||
|
||||
fmt.Printf("call mocked generate universal task %d, taskJson %s\n", taskType, taskJSON)
|
||||
|
||||
@@ -7,7 +7,10 @@ package libzkp
|
||||
#include "libzkp.h"
|
||||
*/
|
||||
import "C" //nolint:typecheck
|
||||
import "unsafe"
|
||||
import (
|
||||
"strings"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// Initialize the handler for universal task
|
||||
func InitL2geth(configJSON string) {
|
||||
@@ -17,6 +20,11 @@ func InitL2geth(configJSON string) {
|
||||
C.init_l2geth(cConfig)
|
||||
}
|
||||
|
||||
// Generate a universal task
|
||||
func GenerateUniversalTask(taskType int, taskJSON, forkName string, expectedVk []byte, decryptionKey []byte) (bool, string, string, []byte) {
|
||||
return generateUniversalTask(fromMessageTaskType(taskType), taskJSON, strings.ToLower(forkName), expectedVk, decryptionKey)
|
||||
}
|
||||
|
||||
func generateUniversalTask(taskType int, taskJSON, forkName string, expectedVk []byte, decryptionKey []byte) (bool, string, string, []byte) {
|
||||
cTask := goToCString(taskJSON)
|
||||
cForkName := goToCString(forkName)
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
)
|
||||
|
||||
// ChallengeMiddleware jwt challenge middleware
|
||||
func ChallengeMiddleware(conf *config.Config) *jwt.GinJWTMiddleware {
|
||||
func ChallengeMiddleware(auth *config.Auth) *jwt.GinJWTMiddleware {
|
||||
jwtMiddleware, err := jwt.New(&jwt.GinJWTMiddleware{
|
||||
Authenticator: func(c *gin.Context) (interface{}, error) {
|
||||
return nil, nil
|
||||
@@ -30,8 +30,8 @@ func ChallengeMiddleware(conf *config.Config) *jwt.GinJWTMiddleware {
|
||||
}
|
||||
},
|
||||
Unauthorized: unauthorized,
|
||||
Key: []byte(conf.Auth.Secret),
|
||||
Timeout: time.Second * time.Duration(conf.Auth.ChallengeExpireDurationSec),
|
||||
Key: []byte(auth.Secret),
|
||||
Timeout: time.Second * time.Duration(auth.ChallengeExpireDurationSec),
|
||||
TokenLookup: "header: Authorization, query: token, cookie: jwt",
|
||||
TokenHeadName: "Bearer",
|
||||
TimeFunc: time.Now,
|
||||
|
||||
@@ -4,22 +4,57 @@ import (
|
||||
"time"
|
||||
|
||||
jwt "github.com/appleboy/gin-jwt/v2"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/controller/api"
|
||||
"scroll-tech/coordinator/internal/controller/proxy"
|
||||
"scroll-tech/coordinator/internal/types"
|
||||
)
|
||||
|
||||
func nonIdendityAuthorizator(data interface{}, _ *gin.Context) bool {
|
||||
return data != nil
|
||||
}
|
||||
|
||||
// LoginMiddleware jwt auth middleware
|
||||
func LoginMiddleware(conf *config.Config) *jwt.GinJWTMiddleware {
|
||||
func LoginMiddleware(auth *config.Auth) *jwt.GinJWTMiddleware {
|
||||
jwtMiddleware, err := jwt.New(&jwt.GinJWTMiddleware{
|
||||
PayloadFunc: api.Auth.PayloadFunc,
|
||||
IdentityHandler: api.Auth.IdentityHandler,
|
||||
IdentityKey: types.PublicKey,
|
||||
Key: []byte(conf.Auth.Secret),
|
||||
Timeout: time.Second * time.Duration(conf.Auth.LoginExpireDurationSec),
|
||||
Key: []byte(auth.Secret),
|
||||
Timeout: time.Second * time.Duration(auth.LoginExpireDurationSec),
|
||||
Authenticator: api.Auth.Login,
|
||||
Authorizator: nonIdendityAuthorizator,
|
||||
Unauthorized: unauthorized,
|
||||
TokenLookup: "header: Authorization, query: token, cookie: jwt",
|
||||
TokenHeadName: "Bearer",
|
||||
TimeFunc: time.Now,
|
||||
LoginResponse: loginResponse,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Crit("new jwt middleware panic", "error", err)
|
||||
}
|
||||
|
||||
if errInit := jwtMiddleware.MiddlewareInit(); errInit != nil {
|
||||
log.Crit("init jwt middleware panic", "error", errInit)
|
||||
}
|
||||
|
||||
return jwtMiddleware
|
||||
}
|
||||
|
||||
// ProxyLoginMiddleware jwt auth middleware for proxy login
|
||||
func ProxyLoginMiddleware(auth *config.Auth) *jwt.GinJWTMiddleware {
|
||||
jwtMiddleware, err := jwt.New(&jwt.GinJWTMiddleware{
|
||||
PayloadFunc: proxy.Auth.PayloadFunc,
|
||||
IdentityHandler: proxy.Auth.IdentityHandler,
|
||||
IdentityKey: types.PublicKey,
|
||||
Key: []byte(auth.Secret),
|
||||
Timeout: time.Second * time.Duration(auth.LoginExpireDurationSec),
|
||||
Authenticator: proxy.Auth.Login,
|
||||
Authorizator: nonIdendityAuthorizator,
|
||||
Unauthorized: unauthorized,
|
||||
TokenLookup: "header: Authorization, query: token, cookie: jwt",
|
||||
TokenHeadName: "Bearer",
|
||||
|
||||
@@ -28,8 +28,8 @@ func TestMain(m *testing.M) {
|
||||
defer func() {
|
||||
if testApps != nil {
|
||||
testApps.Free()
|
||||
tearDownEnv(t)
|
||||
}
|
||||
tearDownEnv(t)
|
||||
}()
|
||||
m.Run()
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/controller/api"
|
||||
"scroll-tech/coordinator/internal/controller/proxy"
|
||||
"scroll-tech/coordinator/internal/middleware"
|
||||
)
|
||||
|
||||
@@ -25,16 +26,45 @@ func Route(router *gin.Engine, cfg *config.Config, reg prometheus.Registerer) {
|
||||
func v1(router *gin.RouterGroup, conf *config.Config) {
|
||||
r := router.Group("/v1")
|
||||
|
||||
challengeMiddleware := middleware.ChallengeMiddleware(conf)
|
||||
challengeMiddleware := middleware.ChallengeMiddleware(conf.Auth)
|
||||
r.GET("/challenge", challengeMiddleware.LoginHandler)
|
||||
|
||||
loginMiddleware := middleware.LoginMiddleware(conf)
|
||||
loginMiddleware := middleware.LoginMiddleware(conf.Auth)
|
||||
r.POST("/login", challengeMiddleware.MiddlewareFunc(), loginMiddleware.LoginHandler)
|
||||
|
||||
// need jwt token api
|
||||
r.Use(loginMiddleware.MiddlewareFunc())
|
||||
{
|
||||
r.POST("/proxy_login", loginMiddleware.LoginHandler)
|
||||
r.POST("/get_task", api.GetTask.GetTasks)
|
||||
r.POST("/submit_proof", api.SubmitProof.SubmitProof)
|
||||
}
|
||||
}
|
||||
|
||||
// Route register route for coordinator
|
||||
func ProxyRoute(router *gin.Engine, cfg *config.ProxyConfig, reg prometheus.Registerer) {
|
||||
router.Use(gin.Recovery())
|
||||
|
||||
observability.Use(router, "coordinator", reg)
|
||||
|
||||
r := router.Group("coordinator")
|
||||
|
||||
v1_proxy(r, cfg)
|
||||
}
|
||||
|
||||
func v1_proxy(router *gin.RouterGroup, conf *config.ProxyConfig) {
|
||||
r := router.Group("/v1")
|
||||
|
||||
challengeMiddleware := middleware.ChallengeMiddleware(conf.ProxyManager.Auth)
|
||||
r.GET("/challenge", challengeMiddleware.LoginHandler)
|
||||
|
||||
loginMiddleware := middleware.ProxyLoginMiddleware(conf.ProxyManager.Auth)
|
||||
r.POST("/login", challengeMiddleware.MiddlewareFunc(), loginMiddleware.LoginHandler)
|
||||
|
||||
// need jwt token api
|
||||
r.Use(loginMiddleware.MiddlewareFunc())
|
||||
{
|
||||
r.POST("/get_task", proxy.GetTask.GetTasks)
|
||||
r.POST("/submit_proof", proxy.SubmitProof.SubmitProof)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,6 +64,8 @@ func (r ProverProviderType) String() string {
|
||||
return "prover provider type internal"
|
||||
case ProverProviderTypeExternal:
|
||||
return "prover provider type external"
|
||||
case ProverProviderTypeProxy:
|
||||
return "prover provider type proxy"
|
||||
default:
|
||||
return fmt.Sprintf("prover provider type: %d", r)
|
||||
}
|
||||
@@ -76,4 +78,6 @@ const (
|
||||
ProverProviderTypeInternal
|
||||
// ProverProviderTypeExternal is an external prover provider type
|
||||
ProverProviderTypeExternal
|
||||
// ProverProviderTypeProxy is an proxy prover provider type
|
||||
ProverProviderTypeProxy = 3
|
||||
)
|
||||
|
||||
48
coordinator/internal/types/response_test.go
Normal file
48
coordinator/internal/types/response_test.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"scroll-tech/common/types"
|
||||
)
|
||||
|
||||
func TestResponseDecodeData_GetTaskSchema(t *testing.T) {
|
||||
// Arrange: build a dummy payload and wrap it in Response
|
||||
in := GetTaskSchema{
|
||||
UUID: "uuid-123",
|
||||
TaskID: "task-abc",
|
||||
TaskType: 1,
|
||||
UseSnark: true,
|
||||
TaskData: "dummy-data",
|
||||
HardForkName: "cancun",
|
||||
}
|
||||
|
||||
resp := types.Response{
|
||||
ErrCode: 0,
|
||||
ErrMsg: "",
|
||||
Data: in,
|
||||
}
|
||||
|
||||
// Act: JSON round-trip the Response to simulate real HTTP encoding/decoding
|
||||
b, err := json.Marshal(resp)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal response: %v", err)
|
||||
}
|
||||
|
||||
var decoded types.Response
|
||||
if err := json.Unmarshal(b, &decoded); err != nil {
|
||||
t.Fatalf("unmarshal response: %v", err)
|
||||
}
|
||||
|
||||
var out GetTaskSchema
|
||||
if err := decoded.DecodeData(&out); err != nil {
|
||||
t.Fatalf("DecodeData error: %v", err)
|
||||
}
|
||||
|
||||
// Assert: structs match after decode
|
||||
if !reflect.DeepEqual(in, out) {
|
||||
t.Fatalf("decoded struct mismatch:\nwant: %+v\n got: %+v", in, out)
|
||||
}
|
||||
}
|
||||
@@ -30,12 +30,14 @@ import (
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/controller/api"
|
||||
"scroll-tech/coordinator/internal/controller/cron"
|
||||
"scroll-tech/coordinator/internal/controller/proxy"
|
||||
"scroll-tech/coordinator/internal/orm"
|
||||
"scroll-tech/coordinator/internal/route"
|
||||
)
|
||||
|
||||
var (
|
||||
conf *config.Config
|
||||
conf *config.Config
|
||||
proxyConf *config.ProxyConfig
|
||||
|
||||
testApps *testcontainers.TestcontainerApps
|
||||
|
||||
@@ -51,6 +53,9 @@ var (
|
||||
chunk *encoding.Chunk
|
||||
batch *encoding.Batch
|
||||
tokenTimeout int
|
||||
|
||||
envSet bool
|
||||
portUsed map[int64]struct{}
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
@@ -63,18 +68,44 @@ func TestMain(m *testing.M) {
|
||||
}
|
||||
|
||||
func randomURL() string {
|
||||
id, _ := rand.Int(rand.Reader, big.NewInt(2000-1))
|
||||
return fmt.Sprintf("localhost:%d", 10000+2000+id.Int64())
|
||||
return randmURLBatch(1)[0]
|
||||
}
|
||||
|
||||
func setupCoordinator(t *testing.T, proversPerSession uint8, coordinatorURL string) (*cron.Collector, *http.Server) {
|
||||
var err error
|
||||
db, err = testApps.GetGormDBClient()
|
||||
// Generate a batch of random localhost URLs with different ports, similar to randomURL.
|
||||
func randmURLBatch(n int) []string {
|
||||
if n <= 0 {
|
||||
return nil
|
||||
}
|
||||
urls := make([]string, 0, n)
|
||||
if portUsed == nil {
|
||||
portUsed = make(map[int64]struct{})
|
||||
}
|
||||
for len(urls) < n {
|
||||
id, _ := rand.Int(rand.Reader, big.NewInt(2000-1))
|
||||
port := 20000 + 2000 + id.Int64()
|
||||
if _, exist := portUsed[port]; exist {
|
||||
continue
|
||||
}
|
||||
portUsed[port] = struct{}{}
|
||||
urls = append(urls, fmt.Sprintf("localhost:%d", port))
|
||||
}
|
||||
return urls
|
||||
}
|
||||
|
||||
assert.NoError(t, err)
|
||||
func setupCoordinatorDb(t *testing.T) {
|
||||
var err error
|
||||
assert.NotNil(t, db, "setEnv must be called before")
|
||||
// db, err = testApps.GetGormDBClient()
|
||||
|
||||
// assert.NoError(t, err)
|
||||
sqlDB, err := db.DB()
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, migrate.ResetDB(sqlDB))
|
||||
}
|
||||
|
||||
func launchCoordinator(t *testing.T, proversPerSession uint8, coordinatorURL string) (*cron.Collector, *http.Server) {
|
||||
|
||||
assert.NotNil(t, db, "db must be set")
|
||||
|
||||
tokenTimeout = 60
|
||||
conf = &config.Config{
|
||||
@@ -114,6 +145,7 @@ func setupCoordinator(t *testing.T, proversPerSession uint8, coordinatorURL stri
|
||||
EuclidV2Time: new(uint64),
|
||||
}, db, nil)
|
||||
route.Route(router, conf, nil)
|
||||
t.Log("coordinator server url", coordinatorURL)
|
||||
srv := &http.Server{
|
||||
Addr: coordinatorURL,
|
||||
Handler: router,
|
||||
@@ -129,7 +161,77 @@ func setupCoordinator(t *testing.T, proversPerSession uint8, coordinatorURL stri
|
||||
return proofCollector, srv
|
||||
}
|
||||
|
||||
func setupCoordinator(t *testing.T, proversPerSession uint8, coordinatorURL string) (*cron.Collector, *http.Server) {
|
||||
setupCoordinatorDb(t)
|
||||
return launchCoordinator(t, proversPerSession, coordinatorURL)
|
||||
}
|
||||
|
||||
func setupProxyDb(t *testing.T) {
|
||||
assert.NotNil(t, db, "setEnv must be called before")
|
||||
sqlDB, err := db.DB()
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, migrate.ResetModuleDB(sqlDB, "proxy"))
|
||||
}
|
||||
|
||||
func launchProxy(t *testing.T, proxyURL string, coordinatorURL []string, usePersistent bool) *http.Server {
|
||||
var err error
|
||||
assert.NoError(t, err)
|
||||
|
||||
coordinators := make(map[string]*config.UpStream)
|
||||
for i, n := range coordinatorURL {
|
||||
coordinators[fmt.Sprintf("coordinator_%d", i)] = testProxyUpStreamCfg(n)
|
||||
}
|
||||
|
||||
tokenTimeout = 60
|
||||
proxyConf = &config.ProxyConfig{
|
||||
ProxyName: "test_proxy",
|
||||
ProxyManager: &config.ProxyManager{
|
||||
Verifier: &config.VerifierConfig{
|
||||
MinProverVersion: "v4.4.89",
|
||||
Verifiers: []config.AssetConfig{{
|
||||
AssetsPath: "",
|
||||
ForkName: "euclidV2",
|
||||
}},
|
||||
},
|
||||
Client: testProxyClientCfg(),
|
||||
Auth: &config.Auth{
|
||||
Secret: "proxy",
|
||||
ChallengeExpireDurationSec: tokenTimeout,
|
||||
LoginExpireDurationSec: tokenTimeout,
|
||||
},
|
||||
},
|
||||
Coordinators: coordinators,
|
||||
}
|
||||
|
||||
router := gin.New()
|
||||
if usePersistent {
|
||||
proxy.InitController(proxyConf, db, nil)
|
||||
} else {
|
||||
proxy.InitController(proxyConf, nil, nil)
|
||||
}
|
||||
route.ProxyRoute(router, proxyConf, nil)
|
||||
t.Log("proxy server url", proxyURL)
|
||||
srv := &http.Server{
|
||||
Addr: proxyURL,
|
||||
Handler: router,
|
||||
}
|
||||
go func() {
|
||||
runErr := srv.ListenAndServe()
|
||||
if runErr != nil && !errors.Is(runErr, http.ErrServerClosed) {
|
||||
assert.NoError(t, runErr)
|
||||
}
|
||||
}()
|
||||
time.Sleep(time.Second * 2)
|
||||
|
||||
return srv
|
||||
}
|
||||
|
||||
func setEnv(t *testing.T) {
|
||||
if envSet {
|
||||
t.Log("SetEnv is re-entried")
|
||||
return
|
||||
}
|
||||
|
||||
var err error
|
||||
|
||||
version.Version = "v4.5.45"
|
||||
@@ -146,6 +248,7 @@ func setEnv(t *testing.T) {
|
||||
sqlDB, err := db.DB()
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, migrate.ResetDB(sqlDB))
|
||||
assert.NoError(t, migrate.MigrateModule(sqlDB, "proxy"))
|
||||
|
||||
batchOrm = orm.NewBatch(db)
|
||||
chunkOrm = orm.NewChunk(db)
|
||||
@@ -169,6 +272,7 @@ func setEnv(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
batch = &encoding.Batch{Chunks: []*encoding.Chunk{chunk}}
|
||||
|
||||
envSet = true
|
||||
}
|
||||
|
||||
func TestApis(t *testing.T) {
|
||||
|
||||
@@ -34,6 +34,8 @@ type mockProver struct {
|
||||
privKey *ecdsa.PrivateKey
|
||||
proofType message.ProofType
|
||||
coordinatorURL string
|
||||
token string
|
||||
useCacheToken bool
|
||||
}
|
||||
|
||||
func newMockProver(t *testing.T, proverName string, coordinatorURL string, proofType message.ProofType, version string) *mockProver {
|
||||
@@ -50,6 +52,14 @@ func newMockProver(t *testing.T, proverName string, coordinatorURL string, proof
|
||||
return prover
|
||||
}
|
||||
|
||||
func (r *mockProver) resetConnection(coordinatorURL string) {
|
||||
r.coordinatorURL = coordinatorURL
|
||||
}
|
||||
|
||||
func (r *mockProver) setUseCacheToken(enable bool) {
|
||||
r.useCacheToken = enable
|
||||
}
|
||||
|
||||
// connectToCoordinator sets up a websocket client to connect to the prover manager.
|
||||
func (r *mockProver) connectToCoordinator(t *testing.T, proverTypes []types.ProverType) (string, int, string) {
|
||||
challengeString := r.challenge(t)
|
||||
@@ -115,6 +125,7 @@ func (r *mockProver) login(t *testing.T, challengeString string, proverTypes []t
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, http.StatusOK, resp.StatusCode())
|
||||
assert.Empty(t, result.ErrMsg)
|
||||
r.token = loginData.Token
|
||||
return loginData.Token, 0, ""
|
||||
}
|
||||
|
||||
@@ -144,11 +155,14 @@ func (r *mockProver) healthCheckFailure(t *testing.T) bool {
|
||||
|
||||
func (r *mockProver) getProverTask(t *testing.T, proofType message.ProofType) (*types.GetTaskSchema, int, string) {
|
||||
// get task from coordinator
|
||||
token, errCode, errMsg := r.connectToCoordinator(t, []types.ProverType{types.MakeProverType(proofType)})
|
||||
if errCode != 0 {
|
||||
return nil, errCode, errMsg
|
||||
if !r.useCacheToken || r.token == "" {
|
||||
token, errCode, errMsg := r.connectToCoordinator(t, []types.ProverType{types.MakeProverType(proofType)})
|
||||
if errCode != 0 {
|
||||
return nil, errCode, errMsg
|
||||
}
|
||||
assert.NotEmpty(t, token)
|
||||
assert.Equal(t, token, r.token)
|
||||
}
|
||||
assert.NotEmpty(t, token)
|
||||
|
||||
type response struct {
|
||||
ErrCode int `json:"errcode"`
|
||||
@@ -160,7 +174,7 @@ func (r *mockProver) getProverTask(t *testing.T, proofType message.ProofType) (*
|
||||
client := resty.New()
|
||||
resp, err := client.R().
|
||||
SetHeader("Content-Type", "application/json").
|
||||
SetHeader("Authorization", fmt.Sprintf("Bearer %s", token)).
|
||||
SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.token)).
|
||||
SetBody(map[string]interface{}{"universal": true, "prover_height": 100, "task_types": []int{int(proofType)}}).
|
||||
SetResult(&result).
|
||||
Post("http://" + r.coordinatorURL + "/coordinator/v1/get_task")
|
||||
@@ -174,11 +188,14 @@ func (r *mockProver) getProverTask(t *testing.T, proofType message.ProofType) (*
|
||||
//nolint:unparam
|
||||
func (r *mockProver) tryGetProverTask(t *testing.T, proofType message.ProofType) (int, string) {
|
||||
// get task from coordinator
|
||||
token, errCode, errMsg := r.connectToCoordinator(t, []types.ProverType{types.MakeProverType(proofType)})
|
||||
if errCode != 0 {
|
||||
return errCode, errMsg
|
||||
if !r.useCacheToken || r.token == "" {
|
||||
token, errCode, errMsg := r.connectToCoordinator(t, []types.ProverType{types.MakeProverType(proofType)})
|
||||
if errCode != 0 {
|
||||
return errCode, errMsg
|
||||
}
|
||||
assert.NotEmpty(t, token)
|
||||
assert.Equal(t, token, r.token)
|
||||
}
|
||||
assert.NotEmpty(t, token)
|
||||
|
||||
type response struct {
|
||||
ErrCode int `json:"errcode"`
|
||||
@@ -190,8 +207,8 @@ func (r *mockProver) tryGetProverTask(t *testing.T, proofType message.ProofType)
|
||||
client := resty.New()
|
||||
resp, err := client.R().
|
||||
SetHeader("Content-Type", "application/json").
|
||||
SetHeader("Authorization", fmt.Sprintf("Bearer %s", token)).
|
||||
SetBody(map[string]interface{}{"prover_height": 100, "task_type": int(proofType), "universal": true}).
|
||||
SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.token)).
|
||||
SetBody(map[string]interface{}{"prover_height": 100, "task_types": []int{int(proofType)}, "universal": true}).
|
||||
SetResult(&result).
|
||||
Post("http://" + r.coordinatorURL + "/coordinator/v1/get_task")
|
||||
assert.NoError(t, err)
|
||||
@@ -249,10 +266,13 @@ func (r *mockProver) submitProof(t *testing.T, proverTaskSchema *types.GetTaskSc
|
||||
Universal: true,
|
||||
}
|
||||
|
||||
token, authErrCode, errMsg := r.connectToCoordinator(t, []types.ProverType{types.MakeProverType(message.ProofType(proverTaskSchema.TaskType))})
|
||||
assert.Equal(t, authErrCode, 0)
|
||||
assert.Equal(t, errMsg, "")
|
||||
assert.NotEmpty(t, token)
|
||||
if !r.useCacheToken || r.token == "" {
|
||||
token, authErrCode, errMsg := r.connectToCoordinator(t, []types.ProverType{types.MakeProverType(message.ProofType(proverTaskSchema.TaskType))})
|
||||
assert.Equal(t, authErrCode, 0)
|
||||
assert.Equal(t, errMsg, "")
|
||||
assert.NotEmpty(t, token)
|
||||
assert.Equal(t, token, r.token)
|
||||
}
|
||||
|
||||
submitProofData, err := json.Marshal(submitProof)
|
||||
assert.NoError(t, err)
|
||||
@@ -262,7 +282,7 @@ func (r *mockProver) submitProof(t *testing.T, proverTaskSchema *types.GetTaskSc
|
||||
client := resty.New()
|
||||
resp, err := client.R().
|
||||
SetHeader("Content-Type", "application/json").
|
||||
SetHeader("Authorization", fmt.Sprintf("Bearer %s", token)).
|
||||
SetHeader("Authorization", fmt.Sprintf("Bearer %s", r.token)).
|
||||
SetBody(string(submitProofData)).
|
||||
SetResult(&result).
|
||||
Post("http://" + r.coordinatorURL + "/coordinator/v1/submit_proof")
|
||||
|
||||
297
coordinator/test/proxy_test.go
Normal file
297
coordinator/test/proxy_test.go
Normal file
@@ -0,0 +1,297 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/scroll-tech/da-codec/encoding"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"scroll-tech/common/types"
|
||||
"scroll-tech/common/types/message"
|
||||
"scroll-tech/common/version"
|
||||
|
||||
"scroll-tech/coordinator/internal/config"
|
||||
"scroll-tech/coordinator/internal/controller/proxy"
|
||||
)
|
||||
|
||||
func testProxyClientCfg() *config.ProxyClient {
|
||||
|
||||
return &config.ProxyClient{
|
||||
Secret: "test-secret-key",
|
||||
ProxyName: "test-proxy",
|
||||
ProxyVersion: version.Version,
|
||||
}
|
||||
}
|
||||
|
||||
var testCompatibileMode bool
|
||||
|
||||
func testProxyUpStreamCfg(coordinatorURL string) *config.UpStream {
|
||||
|
||||
return &config.UpStream{
|
||||
BaseUrl: fmt.Sprintf("http://%s", coordinatorURL),
|
||||
RetryWaitTime: 3,
|
||||
ConnectionTimeoutSec: 30,
|
||||
CompatibileMode: testCompatibileMode,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func testProxyClient(t *testing.T) {
|
||||
|
||||
// Setup coordinator and http server.
|
||||
coordinatorURL := randomURL()
|
||||
proofCollector, httpHandler := setupCoordinator(t, 1, coordinatorURL)
|
||||
defer func() {
|
||||
proofCollector.Stop()
|
||||
assert.NoError(t, httpHandler.Shutdown(context.Background()))
|
||||
}()
|
||||
|
||||
cliCfg := testProxyClientCfg()
|
||||
upCfg := testProxyUpStreamCfg(coordinatorURL)
|
||||
|
||||
clientManager, err := proxy.NewClientManager("test_coordinator", cliCfg, upCfg)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, clientManager)
|
||||
|
||||
// Create context with timeout
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Test Client method
|
||||
client := clientManager.ClientAsProxy(ctx)
|
||||
|
||||
// Client should not be nil if login succeeds
|
||||
// Note: This might be nil if the coordinator is not properly set up for proxy authentication
|
||||
// but the test validates that the Client method completes without panic
|
||||
assert.NotNil(t, client)
|
||||
token1 := client.Token()
|
||||
assert.NotEmpty(t, token1)
|
||||
t.Logf("Client token: %s (%v)", token1, client)
|
||||
|
||||
if !upCfg.CompatibileMode {
|
||||
time.Sleep(time.Second * 2)
|
||||
client.Reset()
|
||||
client = clientManager.ClientAsProxy(ctx)
|
||||
assert.NotNil(t, client)
|
||||
token2 := client.Token()
|
||||
assert.NotEmpty(t, token2)
|
||||
t.Logf("Client token (sec): %s (%v)", token2, client)
|
||||
assert.NotEqual(t, token1, token2, "token should not be identical")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func testProxyHandshake(t *testing.T) {
|
||||
// Setup proxy http server.
|
||||
proxyURL := randomURL()
|
||||
proxyHttpHandler := launchProxy(t, proxyURL, []string{}, false)
|
||||
defer func() {
|
||||
assert.NoError(t, proxyHttpHandler.Shutdown(context.Background()))
|
||||
}()
|
||||
|
||||
chunkProver := newMockProver(t, "prover_chunk_test", proxyURL, message.ProofTypeChunk, version.Version)
|
||||
assert.True(t, chunkProver.healthCheckSuccess(t))
|
||||
}
|
||||
|
||||
func testProxyGetTask(t *testing.T) {
|
||||
// Setup coordinator and http server.
|
||||
urls := randmURLBatch(2)
|
||||
coordinatorURL := urls[0]
|
||||
collector, httpHandler := setupCoordinator(t, 3, coordinatorURL)
|
||||
defer func() {
|
||||
collector.Stop()
|
||||
assert.NoError(t, httpHandler.Shutdown(context.Background()))
|
||||
}()
|
||||
|
||||
proxyURL := urls[1]
|
||||
proxyHttpHandler := launchProxy(t, proxyURL, []string{coordinatorURL}, false)
|
||||
defer func() {
|
||||
assert.NoError(t, proxyHttpHandler.Shutdown(context.Background()))
|
||||
}()
|
||||
|
||||
chunkProver := newMockProver(t, "prover_chunk_test", proxyURL, message.ProofTypeChunk, version.Version)
|
||||
chunkProver.setUseCacheToken(true)
|
||||
code, _ := chunkProver.tryGetProverTask(t, message.ProofTypeChunk)
|
||||
assert.Equal(t, int(types.ErrCoordinatorEmptyProofData), code)
|
||||
|
||||
err := l2BlockOrm.InsertL2Blocks(context.Background(), []*encoding.Block{block1, block2})
|
||||
assert.NoError(t, err)
|
||||
dbChunk, err := chunkOrm.InsertChunk(context.Background(), chunk)
|
||||
assert.NoError(t, err)
|
||||
err = l2BlockOrm.UpdateChunkHashInRange(context.Background(), 0, 100, dbChunk.Hash)
|
||||
assert.NoError(t, err)
|
||||
|
||||
task, code, msg := chunkProver.getProverTask(t, message.ProofTypeChunk)
|
||||
assert.Empty(t, code)
|
||||
if code == 0 {
|
||||
t.Log("get task id", task.TaskID)
|
||||
} else {
|
||||
t.Log("get task error msg", msg)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func testProxyProof(t *testing.T) {
|
||||
urls := randmURLBatch(3)
|
||||
coordinatorURL0 := urls[0]
|
||||
setupCoordinatorDb(t)
|
||||
collector0, httpHandler0 := launchCoordinator(t, 3, coordinatorURL0)
|
||||
defer func() {
|
||||
collector0.Stop()
|
||||
httpHandler0.Shutdown(context.Background())
|
||||
}()
|
||||
coordinatorURL1 := urls[1]
|
||||
collector1, httpHandler1 := launchCoordinator(t, 3, coordinatorURL1)
|
||||
defer func() {
|
||||
collector1.Stop()
|
||||
httpHandler1.Shutdown(context.Background())
|
||||
}()
|
||||
coordinators := map[string]*http.Server{
|
||||
"coordinator_0": httpHandler0,
|
||||
"coordinator_1": httpHandler1,
|
||||
}
|
||||
|
||||
proxyURL := urls[2]
|
||||
proxyHttpHandler := launchProxy(t, proxyURL, []string{coordinatorURL0, coordinatorURL1}, false)
|
||||
defer func() {
|
||||
assert.NoError(t, proxyHttpHandler.Shutdown(context.Background()))
|
||||
}()
|
||||
|
||||
err := l2BlockOrm.InsertL2Blocks(context.Background(), []*encoding.Block{block1, block2})
|
||||
assert.NoError(t, err)
|
||||
dbChunk, err := chunkOrm.InsertChunk(context.Background(), chunk)
|
||||
assert.NoError(t, err)
|
||||
err = l2BlockOrm.UpdateChunkHashInRange(context.Background(), 0, 100, dbChunk.Hash)
|
||||
assert.NoError(t, err)
|
||||
|
||||
chunkProver := newMockProver(t, "prover_chunk_test", proxyURL, message.ProofTypeChunk, version.Version)
|
||||
chunkProver.setUseCacheToken(true)
|
||||
task, code, msg := chunkProver.getProverTask(t, message.ProofTypeChunk)
|
||||
assert.Empty(t, code)
|
||||
if code == 0 {
|
||||
t.Log("get task", task)
|
||||
parts, _, _ := strings.Cut(task.TaskID, ":")
|
||||
// close the coordinator which do not dispatch task first, so if we submit to wrong target,
|
||||
// there would be a chance the submit failed (to the closed coordinator)
|
||||
for n, srv := range coordinators {
|
||||
if n != parts {
|
||||
t.Log("close coordinator", n)
|
||||
assert.NoError(t, srv.Shutdown(context.Background()))
|
||||
}
|
||||
}
|
||||
exceptProofStatus := verifiedSuccess
|
||||
chunkProver.submitProof(t, task, exceptProofStatus, types.Success)
|
||||
|
||||
} else {
|
||||
t.Log("get task error msg", msg)
|
||||
}
|
||||
|
||||
// verify proof status
|
||||
var (
|
||||
tick = time.Tick(1500 * time.Millisecond)
|
||||
tickStop = time.Tick(time.Minute)
|
||||
)
|
||||
|
||||
var (
|
||||
chunkProofStatus types.ProvingStatus
|
||||
chunkActiveAttempts int16
|
||||
chunkMaxAttempts int16
|
||||
)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-tick:
|
||||
chunkProofStatus, err = chunkOrm.GetProvingStatusByHash(context.Background(), dbChunk.Hash)
|
||||
assert.NoError(t, err)
|
||||
if chunkProofStatus == types.ProvingTaskVerified {
|
||||
return
|
||||
}
|
||||
|
||||
chunkActiveAttempts, chunkMaxAttempts, err = chunkOrm.GetAttemptsByHash(context.Background(), dbChunk.Hash)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, int(chunkMaxAttempts))
|
||||
assert.Equal(t, 0, int(chunkActiveAttempts))
|
||||
|
||||
case <-tickStop:
|
||||
t.Error("failed to check proof status", "chunkProofStatus", chunkProofStatus.String())
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testProxyPersistent(t *testing.T) {
|
||||
urls := randmURLBatch(4)
|
||||
coordinatorURL0 := urls[0]
|
||||
setupCoordinatorDb(t)
|
||||
collector0, httpHandler0 := launchCoordinator(t, 3, coordinatorURL0)
|
||||
defer func() {
|
||||
collector0.Stop()
|
||||
httpHandler0.Shutdown(context.Background())
|
||||
}()
|
||||
coordinatorURL1 := urls[1]
|
||||
collector1, httpHandler1 := launchCoordinator(t, 3, coordinatorURL1)
|
||||
defer func() {
|
||||
collector1.Stop()
|
||||
httpHandler1.Shutdown(context.Background())
|
||||
}()
|
||||
|
||||
setupProxyDb(t)
|
||||
proxyURL1 := urls[2]
|
||||
proxyHttpHandler := launchProxy(t, proxyURL1, []string{coordinatorURL0, coordinatorURL1}, true)
|
||||
defer func() {
|
||||
assert.NoError(t, proxyHttpHandler.Shutdown(context.Background()))
|
||||
}()
|
||||
|
||||
proxyURL2 := urls[3]
|
||||
proxyHttpHandler2 := launchProxy(t, proxyURL2, []string{coordinatorURL0, coordinatorURL1}, true)
|
||||
defer func() {
|
||||
assert.NoError(t, proxyHttpHandler2.Shutdown(context.Background()))
|
||||
}()
|
||||
|
||||
err := l2BlockOrm.InsertL2Blocks(context.Background(), []*encoding.Block{block1, block2})
|
||||
assert.NoError(t, err)
|
||||
dbChunk, err := chunkOrm.InsertChunk(context.Background(), chunk)
|
||||
assert.NoError(t, err)
|
||||
err = l2BlockOrm.UpdateChunkHashInRange(context.Background(), 0, 100, dbChunk.Hash)
|
||||
assert.NoError(t, err)
|
||||
|
||||
chunkProver := newMockProver(t, "prover_chunk_test", proxyURL1, message.ProofTypeChunk, version.Version)
|
||||
chunkProver.setUseCacheToken(true)
|
||||
task, _, _ := chunkProver.getProverTask(t, message.ProofTypeChunk)
|
||||
assert.NotNil(t, task)
|
||||
taskFrom, _, _ := strings.Cut(task.TaskID, ":")
|
||||
t.Log("get task from coordinator:", taskFrom)
|
||||
|
||||
chunkProver.resetConnection(proxyURL2)
|
||||
task, _, _ = chunkProver.getProverTask(t, message.ProofTypeChunk)
|
||||
assert.NotNil(t, task)
|
||||
taskFrom2, _, _ := strings.Cut(task.TaskID, ":")
|
||||
assert.Equal(t, taskFrom, taskFrom2)
|
||||
}
|
||||
|
||||
func TestProxyClient(t *testing.T) {
|
||||
testCompatibileMode = false
|
||||
// Set up the test environment.
|
||||
setEnv(t)
|
||||
t.Run("TestProxyClient", testProxyClient)
|
||||
t.Run("TestProxyHandshake", testProxyHandshake)
|
||||
t.Run("TestProxyGetTask", testProxyGetTask)
|
||||
t.Run("TestProxyValidProof", testProxyProof)
|
||||
t.Run("testProxyPersistent", testProxyPersistent)
|
||||
}
|
||||
|
||||
func TestProxyClientCompatibleMode(t *testing.T) {
|
||||
testCompatibileMode = true
|
||||
// Set up the test environment.
|
||||
setEnv(t)
|
||||
t.Run("TestProxyClient", testProxyClient)
|
||||
t.Run("TestProxyHandshake", testProxyHandshake)
|
||||
t.Run("TestProxyGetTask", testProxyGetTask)
|
||||
t.Run("TestProxyValidProof", testProxyProof)
|
||||
t.Run("testProxyPersistent", testProxyPersistent)
|
||||
}
|
||||
@@ -9,13 +9,14 @@ import (
|
||||
"github.com/pressly/goose/v3"
|
||||
)
|
||||
|
||||
//go:embed migrations/*.sql
|
||||
//go:embed migrations
|
||||
var embedMigrations embed.FS
|
||||
|
||||
// MigrationsDir migration dir
|
||||
const MigrationsDir string = "migrations"
|
||||
|
||||
func init() {
|
||||
// note goose ignore ono-sql files by default so we do not need to specify *.sql
|
||||
goose.SetBaseFS(embedMigrations)
|
||||
goose.SetSequential(true)
|
||||
goose.SetTableName("scroll_migrations")
|
||||
@@ -24,6 +25,41 @@ func init() {
|
||||
goose.SetVerbose(verbose)
|
||||
}
|
||||
|
||||
// MigrateModule migrate db used by other module with specified goose TableName
|
||||
// sql file for that module must be put as a sub-directory under `MigrationsDir`
|
||||
func MigrateModule(db *sql.DB, moduleName string) error {
|
||||
|
||||
goose.SetTableName(moduleName + "_migrations")
|
||||
defer func() {
|
||||
goose.SetTableName("scroll_migrations")
|
||||
}()
|
||||
|
||||
return goose.Up(db, MigrationsDir+"/"+moduleName, goose.WithAllowMissing())
|
||||
}
|
||||
|
||||
// RollbackModule rollback the specified module to the given version
|
||||
func RollbackModule(db *sql.DB, moduleName string, version *int64) error {
|
||||
|
||||
goose.SetTableName(moduleName + "_migrations")
|
||||
defer func() {
|
||||
goose.SetTableName("scroll_migrations")
|
||||
}()
|
||||
moduleDir := MigrationsDir + "/" + moduleName
|
||||
|
||||
if version != nil {
|
||||
return goose.DownTo(db, moduleDir, *version)
|
||||
}
|
||||
return goose.Down(db, moduleDir)
|
||||
}
|
||||
|
||||
// ResetModuleDB clean and migrate db for a module.
|
||||
func ResetModuleDB(db *sql.DB, moduleName string) error {
|
||||
if err := RollbackModule(db, moduleName, new(int64)); err != nil {
|
||||
return err
|
||||
}
|
||||
return MigrateModule(db, moduleName)
|
||||
}
|
||||
|
||||
// Migrate migrate db
|
||||
func Migrate(db *sql.DB) error {
|
||||
//return goose.Up(db, MIGRATIONS_DIR, goose.WithAllowMissing())
|
||||
|
||||
30
database/migrate/migrations/proxy/0001_running_tables.sql
Normal file
30
database/migrate/migrations/proxy/0001_running_tables.sql
Normal file
@@ -0,0 +1,30 @@
|
||||
-- +goose Up
|
||||
-- +goose StatementBegin
|
||||
create table prover_sessions
|
||||
(
|
||||
public_key TEXT NOT NULL,
|
||||
upstream TEXT NOT NULL,
|
||||
up_token TEXT NOT NULL,
|
||||
expired TIMESTAMP(0) NOT NULL,
|
||||
constraint uk_prover_sessions_public_key_upstream unique (public_key, upstream)
|
||||
);
|
||||
|
||||
create index idx_prover_sessions_expired on prover_sessions (expired);
|
||||
|
||||
create table priority_upstream
|
||||
(
|
||||
public_key TEXT NOT NULL,
|
||||
upstream TEXT NOT NULL,
|
||||
update_time TIMESTAMP(0) NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
create unique index idx_priority_upstream_public_key on priority_upstream (public_key);
|
||||
-- +goose StatementEnd
|
||||
|
||||
-- +goose Down
|
||||
-- +goose StatementBegin
|
||||
drop index if exists idx_prover_sessions_expired;
|
||||
|
||||
drop table if exists prover_sessions;
|
||||
drop table if exists priority_upstream;
|
||||
-- +goose StatementEnd
|
||||
4
tests/prover-e2e/.gitignore
vendored
4
tests/prover-e2e/.gitignore
vendored
@@ -1,5 +1,3 @@
|
||||
build/*
|
||||
testset.json
|
||||
conf
|
||||
*.log
|
||||
*.txt
|
||||
conf
|
||||
@@ -1,24 +1,16 @@
|
||||
# ProverE2E: A new e2e test tool to setup a local environment for testing coordinator and prover.
|
||||
## A new e2e test tool to setup a local environment for testing coordinator and prover.
|
||||
|
||||
It contains data from some blocks in a specified testnet, and helps to generate a series of chunks/batches/bundles from these blocks, filling the DB for the coordinator, so an e2e test (from chunk to bundle) can be run completely local
|
||||
|
||||
## Prepare
|
||||
Prepare:
|
||||
link the staff dir as "conf" from one of the dir with staff set, currently we have following staff sets:
|
||||
+ sepolia: with blocks from scroll sepolia
|
||||
+ cloak-xen: with blocks from xen sepolia, which is a cloak network
|
||||
|
||||
## Test
|
||||
Steps:
|
||||
1. run `make all` under `tests/prover-e2e`, it would launch a postgreSql db in local docker container, which is ready to be used by coordinator (include some chunks/batches/bundles waiting to be proven)
|
||||
2. setup assets by run `make coordinator_setup`
|
||||
3. come into `coordinator/build/bin` for following steps:
|
||||
+ rename `conf/config.template.json` as `conf/config.json`
|
||||
+ if the `l2.validium_mode` is set to true in `config.json`, the `sequencer.decryption_key` must be set
|
||||
+ launch `coordinator_api` service by executing the file
|
||||
4. come into `zkvm-prover` for following steps:
|
||||
+ copy `config.template.json` to `config.json`,
|
||||
+ set the `sdk_config.coordinator.base_url` field in `config.json`, so zkvm prover would connect with the locally launched coordinator api,
|
||||
for common case the url is `http://localhost:8390` (the default listening port of coordinator api)
|
||||
+ launch `make test_e2e_run`, which would specific prover run locally, connect to the local coordinator api service according to the `config.json`, and prove all tasks being injected to db in step 1.
|
||||
|
||||
## AI Helper
|
||||
The test process can be run with the help of `integration-test-helper` skill (~$1.0 for each full process)
|
||||
3. in `coordinator/build/bin/conf`, update necessary items in `config.template.json` and rename it as `config.json`
|
||||
4. build and launch `coordinator_api` service locally
|
||||
5. setup the `config.json` for zkvm prover to connect with the locally launched coordinator api
|
||||
6. in `zkvm-prover`, launch `make test_e2e_run`, which would specific prover run locally, connect to the local coordinator api service according to the `config.json`, and prove all tasks being injected to db in step 1.
|
||||
@@ -1 +0,0 @@
|
||||
Let coordiantor api listen at port 18390 to avoid security restriction or port confliction. Also change the corresponding field in `config.json`
|
||||
1
zkvm-prover/.work/.gitignore
vendored
1
zkvm-prover/.work/.gitignore
vendored
@@ -1,5 +1,4 @@
|
||||
*.vmexe
|
||||
*.elf
|
||||
openvm.toml
|
||||
*.bin
|
||||
*.sol
|
||||
|
||||
Reference in New Issue
Block a user