feat(coordinator): add prover rest api (#716)

Co-authored-by: colinlyguo <colinlyguo@scroll.io>
This commit is contained in:
georgehao
2023-08-04 20:48:17 +08:00
committed by GitHub
parent d11de12637
commit fcf34edbfb
64 changed files with 2366 additions and 15500 deletions

View File

@@ -2,13 +2,17 @@ package app
import (
"context"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"time"
// enable the pprof
_ "net/http/pprof"
"github.com/gin-gonic/gin"
"github.com/scroll-tech/go-ethereum/log"
"github.com/urfave/cli/v2"
@@ -20,7 +24,7 @@ import (
"scroll-tech/coordinator/internal/config"
"scroll-tech/coordinator/internal/controller/api"
"scroll-tech/coordinator/internal/controller/cron"
"scroll-tech/coordinator/internal/logic/provermanager"
"scroll-tech/coordinator/internal/route"
)
var app *cli.App
@@ -49,15 +53,12 @@ func action(ctx *cli.Context) error {
}
subCtx, cancel := context.WithCancel(ctx.Context)
db, err := database.InitDB(cfg.DBConfig)
db, err := database.InitDB(cfg.DB)
if err != nil {
log.Crit("failed to init db connection", "err", err)
}
proofCollector := cron.NewCollector(subCtx, db, cfg)
provermanager.InitProverManager(db)
defer func() {
proofCollector.Stop()
cancel()
@@ -66,34 +67,24 @@ func action(ctx *cli.Context) error {
}
}()
router := gin.Default()
api.InitController(cfg, db)
route.Route(router, cfg)
port := ctx.String(httpPortFlag.Name)
srv := &http.Server{
Addr: fmt.Sprintf(":%s", port),
Handler: router,
ReadHeaderTimeout: time.Minute,
}
// Start metrics server.
metrics.Serve(subCtx, ctx)
apis := api.RegisterAPIs(cfg, db)
// Register api and start rpc service.
if ctx.Bool(httpEnabledFlag.Name) {
handler, addr, err := utils.StartHTTPEndpoint(fmt.Sprintf("%s:%d", ctx.String(httpListenAddrFlag.Name), ctx.Int(httpPortFlag.Name)), apis)
if err != nil {
log.Crit("Could not start RPC api", "error", err)
go func() {
if runServerErr := srv.ListenAndServe(); err != nil && !errors.Is(runServerErr, http.ErrServerClosed) {
log.Crit("run coordinator http server failure", "error", runServerErr)
}
defer func() {
_ = handler.Shutdown(ctx.Context)
log.Info("HTTP endpoint closed", "url", fmt.Sprintf("http://%v/", addr))
}()
log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", addr))
}
// Register api and start ws service.
if ctx.Bool(wsEnabledFlag.Name) {
handler, addr, err := utils.StartWSEndpoint(fmt.Sprintf("%s:%d", ctx.String(wsListenAddrFlag.Name), ctx.Int(wsPortFlag.Name)), apis, cfg.ProverManagerConfig.CompressionLevel)
if err != nil {
log.Crit("Could not start WS api", "error", err)
}
defer func() {
_ = handler.Shutdown(ctx.Context)
log.Info("WS endpoint closed", "url", fmt.Sprintf("ws://%v/", addr))
}()
log.Info("WS endpoint opened", "url", fmt.Sprintf("ws://%v/", addr))
}
}()
// Catch CTRL-C to ensure a graceful shutdown.
interrupt := make(chan os.Signal, 1)
@@ -101,7 +92,17 @@ func action(ctx *cli.Context) error {
// Wait until the interrupt signal is received from an OS signal.
<-interrupt
log.Info("start shutdown coordinator server ...")
closeCtx, cancelExit := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelExit()
if err = srv.Shutdown(closeCtx); err != nil {
log.Warn("shutdown coordinator server failure", "error", err)
return nil
}
<-closeCtx.Done()
log.Info("coordinator server exiting success")
return nil
}

View File

@@ -18,7 +18,7 @@ import (
)
var (
wsStartPort int64 = 40000
httpStartPort int64 = 40000
)
// CoordinatorApp coordinator-test client manager.
@@ -29,7 +29,7 @@ type CoordinatorApp struct {
originFile string
coordinatorFile string
WSPort int64
HTTPPort int64
args []string
docker.AppAPI
@@ -39,13 +39,13 @@ type CoordinatorApp struct {
func NewCoordinatorApp(base *docker.App, file string) *CoordinatorApp {
coordinatorFile := fmt.Sprintf("/tmp/%d_coordinator-config.json", base.Timestamp)
port, _ := rand.Int(rand.Reader, big.NewInt(2000))
wsPort := port.Int64() + wsStartPort
httpPort := port.Int64() + httpStartPort
coordinatorApp := &CoordinatorApp{
base: base,
originFile: file,
coordinatorFile: coordinatorFile,
WSPort: wsPort,
args: []string{"--log.debug", "--config", coordinatorFile, "--ws", "--ws.port", strconv.Itoa(int(wsPort))},
HTTPPort: httpPort,
args: []string{"--log.debug", "--config", coordinatorFile, "--http", "--http.port", strconv.Itoa(int(httpPort))},
}
if err := coordinatorApp.MockConfig(true); err != nil {
panic(err)
@@ -67,9 +67,9 @@ func (c *CoordinatorApp) Free() {
_ = os.Remove(c.coordinatorFile)
}
// WSEndpoint returns ws endpoint.
func (c *CoordinatorApp) WSEndpoint() string {
return fmt.Sprintf("ws://localhost:%d", c.WSPort)
// HTTPEndpoint returns ws endpoint.
func (c *CoordinatorApp) HTTPEndpoint() string {
return fmt.Sprintf("http://localhost:%d", c.HTTPPort)
}
// MockConfig creates a new coordinator config.
@@ -80,14 +80,13 @@ func (c *CoordinatorApp) MockConfig(store bool) error {
return err
}
// Reset prover manager config for manager test cases.
cfg.ProverManagerConfig = &coordinatorConfig.ProverManagerConfig{
cfg.ProverManager = &coordinatorConfig.ProverManager{
ProversPerSession: 1,
Verifier: &coordinatorConfig.VerifierConfig{MockMode: true},
CollectionTime: 1,
TokenTimeToLive: 1,
CollectionTimeSec: 60,
}
cfg.DBConfig.DSN = base.DBImg.Endpoint()
cfg.L2Config.ChainID = 111
cfg.DB.DSN = base.DBImg.Endpoint()
cfg.L2.ChainID = 111
c.Config = cfg
if !store {