implement websockets server and swap_subscribeSwap (#96)

This commit is contained in:
noot
2022-02-26 17:58:38 -05:00
committed by GitHub
parent 7710b52cba
commit e81de8cd57
5 changed files with 259 additions and 5 deletions

View File

@@ -38,6 +38,10 @@ const (
defaultRPCPort = 5005
defaultAliceRPCPort = 5001
defaultBobRPCPort = 5002
defaultWSPort = 8080
defaultAliceWSPort = 8081
defaultBobWSPort = 8082
)
var (
@@ -52,6 +56,7 @@ var (
const (
flagRPCPort = "rpc-port"
flagWSPort = "ws-port"
flagBasepath = "basepath"
flagLibp2pKey = "libp2p-key"
flagLibp2pPort = "libp2p-port"
@@ -83,6 +88,10 @@ var (
Name: flagRPCPort,
Usage: "port for the daemon RPC server to run on; default 5001",
},
&cli.UintFlag{
Name: flagWSPort,
Usage: "port for the daemon RPC websockets server to run on; default 8080",
},
&cli.StringFlag{
Name: flagBasepath,
Usage: "path to store swap artefacts",
@@ -288,8 +297,20 @@ func (d *daemon) make(c *cli.Context) error {
rpcPort = defaultRPCPort
}
wsPort := uint16(c.Uint(flagWSPort))
switch {
case wsPort != 0:
case devAlice:
wsPort = defaultAliceWSPort
case devBob:
wsPort = defaultBobWSPort
default:
wsPort = defaultWSPort
}
rpcCfg := &rpc.Config{
Port: rpcPort,
WsPort: wsPort,
Net: host,
Alice: a,
Bob: b,

View File

@@ -210,10 +210,34 @@ Returns:
- `exchangeRate`: the exchange rate of the swap, expressed in a ratio of XMR/ETH.
- `status`: the swap's status, one of `success`, `refunded`, or `aborted`.
Example:
```
curl -X POST http://127.0.0.1:5001 -d '{"jsonrpc":"2.0","id":"0","method":"swap_getPast","params":{"id": 0}}' -H 'Content-Type: application/json'
```
```
{"jsonrpc":"2.0","result":{"provided":"ETH","providedAmount":0.05,"receivedAmount":1,"exchangeRate":20,"status":"success"},"id":"0"}
```
## websocket subscriptions
The daemon also runs a websockets server that can be used to subscribe to push notifications for updates. You can use the command-line tool `wscat` to easily connect to a websockets server.
### `swap_subscribeStatus`
Subscribe to updates of status of a swap. Pushes a notification each time the stage updates, and a final push when the swap completes, containing its completion status.
Paramters:
- `id`: the swap ID.
Returns:
- `stage`: the swap's stage or exit status.
Example:
```
$ wscat -c ws://localhost:8081
# Connected (press CTRL+C to quit)
# > {"jsonrpc":"2.0", "method":"swap_subscribeStatus", "params": {"id": 0}, "id": 0}
# < {"jsonrpc":"2.0","result":{"stage":"ContractDeployed"},"error":null,"id":null}
# < {"jsonrpc":"2.0","result":{"stage":"refunded"},"error":null,"id":null}
```

View File

@@ -59,6 +59,10 @@ func (i *Info) ID() uint64 {
// Provides returns the coin that was provided for this swap.
func (i *Info) Provides() types.ProvidesCoin {
if i == nil {
return ""
}
return i.provides
}

View File

@@ -20,13 +20,16 @@ var log = logging.Logger("rpc")
// Server represents the JSON-RPC server
type Server struct {
s *rpc.Server
port uint16
s *rpc.Server
wsServer *wsServer
port uint16
wsPort uint16
}
// Config ...
type Config struct {
Port uint16
WsPort uint16
Net Net
Alice Alice
Bob Bob
@@ -50,8 +53,10 @@ func NewServer(cfg *Config) (*Server, error) {
}
return &Server{
s: s,
port: cfg.Port,
s: s,
wsServer: newWsServer(cfg.SwapManager, cfg.Alice, cfg.Bob),
port: cfg.Port,
wsPort: cfg.WsPort,
}, nil
}
@@ -70,7 +75,23 @@ func (s *Server) Start() <-chan error {
log.Infof("starting RPC server on http://localhost:%d", s.port)
if err := http.ListenAndServe(fmt.Sprintf(":%d", s.port), handlers.CORS(headersOk, methodsOk, originsOk)(r)); err != nil { //nolint:lll
log.Errorf("failed to start RPC server: %s", err)
log.Errorf("failed to start http RPC server: %s", err)
errCh <- err
}
}()
go func() {
r := mux.NewRouter()
r.Handle("/", s.wsServer)
headersOk := handlers.AllowedHeaders([]string{"content-type", "username", "password"})
methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "OPTIONS"})
originsOk := handlers.AllowedOrigins([]string{"*"})
log.Infof("starting websockets server on ws://localhost:%d", s.wsPort)
if err := http.ListenAndServe(fmt.Sprintf(":%d", s.wsPort), handlers.CORS(headersOk, methodsOk, originsOk)(r)); err != nil { //nolint:lll
log.Errorf("failed to start websockets RPC server: %s", err)
errCh <- err
}
}()

184
rpc/ws.go Normal file
View File

@@ -0,0 +1,184 @@
package rpc
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
"github.com/noot/atomic-swap/common"
"github.com/noot/atomic-swap/common/rpcclient"
"github.com/noot/atomic-swap/common/types"
"github.com/gorilla/websocket"
)
const (
defaultJSONRPCVersion = "2.0"
subscribeNewPeer = "net_subscribeNewPeer"
subscribeSwapStatus = "swap_subscribeStatus"
)
var upgrader = websocket.Upgrader{}
type wsServer struct {
sm SwapManager
alice Alice
bob Bob
}
func newWsServer(sm SwapManager, a Alice, b Bob) *wsServer {
return &wsServer{
sm: sm,
alice: a,
bob: b,
}
}
// ServeHTTP ...
func (s *wsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Warnf("failed to update connection to websockets: %s", err)
return
}
defer conn.Close() //nolint:errcheck
for {
_, message, err := conn.ReadMessage()
if err != nil {
log.Warnf("failed to read websockets message: %s", err)
break
}
var req *Request
err = json.Unmarshal(message, &req)
if err != nil {
_ = writeError(conn, err)
continue
}
log.Debugf("received message over websockets: %s", message)
err = s.handleRequest(conn, req)
if err != nil {
_ = writeError(conn, err)
}
}
}
// Request represents a JSON-RPC request
type Request struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params map[string]interface{} `json:"params"`
ID uint64 `json:"id"`
}
func (s *wsServer) handleRequest(conn *websocket.Conn, req *Request) error {
switch req.Method {
case subscribeNewPeer:
return errors.New("unimplemented")
case subscribeSwapStatus:
idi, has := req.Params["id"] // TODO: make const
if !has {
return errors.New("params missing id field")
}
id, ok := idi.(float64)
if !ok {
return fmt.Errorf("failed to cast id parameter to float64: got %T", idi)
}
return s.subscribeSwapStatus(conn, uint64(id))
default:
return errors.New("invalid method")
}
}
// SubscribeSwapStatusResponse ...
type SubscribeSwapStatusResponse struct {
Stage string `json:"stage"`
}
// subscribeSwapStatus writes the swap's stage to the connection every time it updates.
// when the swap completes, it writes the final status then closes the connection.
// example: `{"jsonrpc":"2.0", "method":"swap_subscribeStatus", "params": {"id": 0}, "id": 0}`
func (s *wsServer) subscribeSwapStatus(conn *websocket.Conn, id uint64) error {
var prevStage common.Stage
for {
info := s.sm.GetOngoingSwap()
if info == nil {
info = s.sm.GetPastSwap(id)
if info == nil {
return errors.New("unable to find swap with given ID")
}
resp := &SubscribeSwapStatusResponse{
Stage: info.Status().String(),
}
if err := writeResponse(conn, resp); err != nil {
return err
}
return nil
}
var swapState common.SwapState
switch info.Provides() {
case types.ProvidesETH:
swapState = s.alice.GetOngoingSwapState()
case types.ProvidesXMR:
swapState = s.bob.GetOngoingSwapState()
}
if swapState == nil {
// we probably completed the swap, continue to call GetPastSwap
continue
}
currStage := swapState.Stage()
if currStage == prevStage {
time.Sleep(time.Millisecond * 10)
continue
}
resp := &SubscribeSwapStatusResponse{
Stage: currStage.String(),
}
if err := writeResponse(conn, resp); err != nil {
return err
}
prevStage = currStage
}
}
func writeResponse(conn *websocket.Conn, result interface{}) error {
bz, err := json.Marshal(result)
if err != nil {
return err
}
resp := &rpcclient.ServerResponse{
Version: defaultJSONRPCVersion,
Result: bz,
}
return conn.WriteJSON(resp)
}
func writeError(conn *websocket.Conn, err error) error {
resp := &rpcclient.ServerResponse{
Version: defaultJSONRPCVersion,
Error: &rpcclient.Error{
Message: err.Error(),
},
}
return conn.WriteJSON(resp)
}