Cleanup RPC posting (#241)

* Moves RPC posting out of the `rpctypes` package making it a member function of the RPC client type
* Does proper context handling when making http POSTs
* Handles JSON serialization/deserialization in the POST method, simplifying code for callers
* Takes advantage of the Gorilla JSON-RPC 2.0 client library which simplifies error handling
* Updates HTTP transport to use DialContext in place of the deprecated Dial
This commit is contained in:
Dmitry Holodov
2022-11-29 22:36:34 -06:00
committed by GitHub
parent 6cdffdbebf
commit 7dd84a1d77
14 changed files with 152 additions and 394 deletions

View File

@@ -293,7 +293,7 @@ func main() {
func newRRPClient(ctx *cli.Context) *rpcclient.Client {
swapdPort := ctx.Uint(flagSwapdPort)
endpoint := fmt.Sprintf("http://127.0.0.1:%d", swapdPort)
return rpcclient.NewClient(endpoint)
return rpcclient.NewClient(ctx.Context, endpoint)
}
func newWSClient(ctx *cli.Context) (wsclient.WsClient, error) {

View File

@@ -150,8 +150,8 @@ func TestDaemon_PersistOffers(t *testing.T) {
defaultXMRMakerSwapdEndpoint := fmt.Sprintf("http://localhost:%d", defaultXMRMakerRPCPort)
startupTimeout := time.Millisecond * 100
datadir := t.TempDir()
wc := monero.CreateWalletClientWithWalletDir(t, datadir)
dataDir := t.TempDir()
wc := monero.CreateWalletClientWithWalletDir(t, dataDir)
monero.MineMinXMRBalance(t, wc, common.MoneroToPiconero(1))
c := newTestContext(t,
@@ -160,8 +160,8 @@ func TestDaemon_PersistOffers(t *testing.T) {
flagEnv: "dev",
flagDevXMRMaker: true,
flagDeploy: true,
flagDataDir: datadir,
flagMoneroWalletPath: path.Join(datadir, "test-wallet"),
flagDataDir: dataDir,
flagMoneroWalletPath: path.Join(dataDir, "test-wallet"),
},
)
@@ -182,7 +182,7 @@ func TestDaemon_PersistOffers(t *testing.T) {
time.Sleep(startupTimeout) // let the server start
// make an offer
client := rpcclient.NewClient(defaultXMRMakerSwapdEndpoint)
client := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
balance, err := client.Balances()
require.NoError(t, err)
require.GreaterOrEqual(t, balance.PiconeroUnlockedBalance, common.MoneroToPiconero(1))
@@ -206,6 +206,7 @@ func TestDaemon_PersistOffers(t *testing.T) {
defer func() {
require.NoError(t, d.stop())
}()
client = rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
wg.Add(1)
go func() {

View File

@@ -1,70 +1 @@
package rpctypes
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"time"
)
var (
contentTypeJSON = "application/json"
dialTimeout = 60 * time.Second
httpClientTimeout = 30 * time.Minute
callTimeout = 30 * time.Minute
transport = &http.Transport{
Dial: (&net.Dialer{
Timeout: dialTimeout,
}).Dial,
}
httpClient = &http.Client{
Transport: transport,
Timeout: httpClientTimeout,
}
)
// PostRPC posts a JSON-RPC call to the given endpoint.
func PostRPC(endpoint, method, params string) (*Response, error) {
data := []byte(`{"jsonrpc":"2.0","method":"` + method + `","params":` + params + `,"id":0}`)
buf := &bytes.Buffer{}
_, err := buf.Write(data)
if err != nil {
return nil, err
}
r, err := http.NewRequest("POST", endpoint, buf)
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}
r.Header.Set("Content-Type", contentTypeJSON)
ctx, cancel := context.WithTimeout(context.Background(), callTimeout)
defer cancel()
r = r.WithContext(ctx)
resp, err := httpClient.Do(r)
if err != nil {
return nil, fmt.Errorf("failed to post request: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
var sv *Response
if err = json.Unmarshal(body, &sv); err != nil {
return nil, err
}
return sv, nil
}

View File

@@ -1,9 +1,6 @@
package rpcclient
import (
"encoding/json"
"github.com/athanorlabs/atomic-swap/common/rpctypes"
"github.com/athanorlabs/atomic-swap/rpc"
)
@@ -13,17 +10,9 @@ func (c *Client) Addresses() ([]string, error) {
method = "net_addresses"
)
resp, err := rpctypes.PostRPC(c.endpoint, method, "{}")
if err != nil {
return nil, err
}
res := &rpc.AddressesResponse{}
if resp.Error != nil {
return nil, resp.Error
}
var res *rpc.AddressesResponse
if err = json.Unmarshal(resp.Result, &res); err != nil {
if err := c.Post(method, nil, res); err != nil {
return nil, err
}

View File

@@ -1,9 +1,6 @@
package rpcclient
import (
"encoding/json"
"github.com/athanorlabs/atomic-swap/common/rpctypes"
"github.com/athanorlabs/atomic-swap/common/types"
"github.com/athanorlabs/atomic-swap/rpc"
)
@@ -17,23 +14,9 @@ func (c *Client) Cancel(id string) (types.Status, error) {
req := &rpc.CancelRequest{
OfferID: id,
}
res := &rpc.CancelResponse{}
params, err := json.Marshal(req)
if err != nil {
return 0, err
}
resp, err := rpctypes.PostRPC(c.endpoint, method, string(params))
if err != nil {
return 0, err
}
if resp.Error != nil {
return 0, resp.Error
}
var res *rpc.CancelResponse
if err = json.Unmarshal(resp.Result, &res); err != nil {
if err := c.Post(method, req, res); err != nil {
return 0, err
}

View File

@@ -1,13 +1,85 @@
package rpcclient
// Client represents a swap RPC client, used to interact with a swap daemon via JSON-RPC calls.
import (
"bytes"
"context"
"fmt"
"net"
"net/http"
"time"
"github.com/gorilla/rpc/v2/json2"
)
var (
contentTypeJSON = "application/json"
dialTimeout = 60 * time.Second
httpClientTimeout = 30 * time.Minute
callTimeout = 30 * time.Minute
transport = &http.Transport{
DialContext: (&net.Dialer{
Timeout: dialTimeout,
}).DialContext,
}
httpClient = &http.Client{
Transport: transport,
Timeout: httpClientTimeout,
}
)
// Client primarily exists to be a JSON-RPC client to swapd instances, but it can be used
// to POST JSON-RPC requests to any JSON-RPC server. Its current use case assumes swapd is
// running on the local host of a single use system. TLS and authentication are not
// currently supported.
type Client struct {
ctx context.Context
endpoint string
}
// NewClient ...
func NewClient(endpoint string) *Client {
// NewClient creates a new JSON-RPC client for the specified endpoint. The passed context
// is used for the full lifetime of the client.
func NewClient(ctx context.Context, endpoint string) *Client {
return &Client{
ctx: ctx,
endpoint: endpoint,
}
}
// Post makes a JSON-RPC call to the client's endpoint, serialising any passed request
// object and deserializing any passed response object from the POST response body. Nil
// can be passed as the request or response when no data needs to be serialised or
// deserialised respectively.
func (c *Client) Post(method string, request any, response any) error {
data, err := json2.EncodeClientRequest(method, request)
if err != nil {
return err
}
httpReq, err := http.NewRequest("POST", c.endpoint, bytes.NewReader(data))
if err != nil {
return fmt.Errorf("failed to create HTTP request: %w", err)
}
httpReq.Header.Set("Content-Type", contentTypeJSON)
ctx, cancel := context.WithTimeout(c.ctx, callTimeout)
defer cancel()
httpReq = httpReq.WithContext(ctx)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
return fmt.Errorf("failed to post %q request: %w", method, err)
}
defer func() { _ = httpResp.Body.Close() }()
if response == nil {
return nil
}
if err = json2.DecodeClientResponse(httpResp.Body, response); err != nil {
return fmt.Errorf("failed to read %q response: %w", method, err)
}
return nil
}

View File

@@ -1,8 +1,6 @@
package rpcclient
import (
"encoding/json"
"github.com/athanorlabs/atomic-swap/common/rpctypes"
"github.com/athanorlabs/atomic-swap/common/types"
)
@@ -17,23 +15,9 @@ func (c *Client) Discover(provides types.ProvidesCoin, searchTime uint64) ([][]s
Provides: provides,
SearchTime: searchTime,
}
res := &rpctypes.DiscoverResponse{}
params, err := json.Marshal(req)
if err != nil {
return nil, err
}
resp, err := rpctypes.PostRPC(c.endpoint, method, string(params))
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
var res *rpctypes.DiscoverResponse
if err = json.Unmarshal(resp.Result, &res); err != nil {
if err := c.Post(method, req, res); err != nil {
return nil, err
}
@@ -50,23 +34,9 @@ func (c *Client) QueryAll(provides types.ProvidesCoin, searchTime uint64) ([]*rp
Provides: provides,
SearchTime: searchTime,
}
res := &rpctypes.QueryAllResponse{}
params, err := json.Marshal(req)
if err != nil {
return nil, err
}
resp, err := rpctypes.PostRPC(c.endpoint, method, string(params))
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
var res rpctypes.QueryAllResponse
if err = json.Unmarshal(resp.Result, &res); err != nil {
if err := c.Post(method, req, res); err != nil {
return nil, err
}

View File

@@ -1,9 +1,6 @@
package rpcclient
import (
"encoding/json"
"github.com/athanorlabs/atomic-swap/common/rpctypes"
"github.com/athanorlabs/atomic-swap/common/types"
"github.com/athanorlabs/atomic-swap/rpc"
)
@@ -14,17 +11,9 @@ func (c *Client) GetOffers() ([]*types.Offer, error) {
method = "swap_getOffers"
)
resp, err := rpctypes.PostRPC(c.endpoint, method, "{}")
if err != nil {
return nil, err
}
res := &rpc.GetOffersResponse{}
if resp.Error != nil {
return nil, resp.Error
}
var res *rpc.GetOffersResponse
if err = json.Unmarshal(resp.Result, &res); err != nil {
if err := c.Post(method, nil, res); err != nil {
return nil, err
}

View File

@@ -1,9 +1,6 @@
package rpcclient
import (
"encoding/json"
"fmt"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/athanorlabs/atomic-swap/common/rpctypes"
@@ -29,23 +26,9 @@ func (c *Client) MakeOffer(
RelayerEndpoint: relayerEndpoint,
RelayerCommission: relayerCommission,
}
res := &rpctypes.MakeOfferResponse{}
params, err := json.Marshal(req)
if err != nil {
return "", err
}
resp, err := rpctypes.PostRPC(c.endpoint, method, string(params))
if err != nil {
return "", err
}
if resp.Error != nil {
return "", fmt.Errorf("failed to call %s: %w", method, resp.Error)
}
var res *rpctypes.MakeOfferResponse
if err = json.Unmarshal(resp.Result, &res); err != nil {
if err := c.Post(method, req, res); err != nil {
return "", err
}

View File

@@ -1,9 +1,6 @@
package rpcclient
import (
"encoding/json"
"errors"
"github.com/athanorlabs/atomic-swap/common/rpctypes"
"github.com/athanorlabs/atomic-swap/rpc"
)
@@ -18,20 +15,10 @@ func (c *Client) SetSwapTimeout(timeoutSeconds uint64) error {
Timeout: timeoutSeconds,
}
params, err := json.Marshal(req)
if err != nil {
if err := c.Post(method, req, nil); err != nil {
return err
}
resp, err := rpctypes.PostRPC(c.endpoint, method, string(params))
if err != nil {
return err
}
if resp.Error != nil {
return resp.Error
}
return nil
}
@@ -41,22 +28,10 @@ func (c *Client) Balances() (*rpctypes.BalancesResponse, error) {
method = "personal_balances"
)
resp, err := rpctypes.PostRPC(c.endpoint, method, "{}")
if err != nil {
balances := &rpctypes.BalancesResponse{}
if err := c.Post(method, nil, balances); err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
var balances rpctypes.BalancesResponse
if err = json.Unmarshal(resp.Result, &balances); err != nil {
return nil, err
}
if balances.WeiBalance == nil {
return nil, errors.New("required field wei_balance missing")
}
return &balances, nil
return balances, nil
}

View File

@@ -1,8 +1,6 @@
package rpcclient
import (
"encoding/json"
"github.com/athanorlabs/atomic-swap/common/rpctypes"
)
@@ -15,23 +13,9 @@ func (c *Client) Query(maddr string) (*rpctypes.QueryPeerResponse, error) {
req := &rpctypes.QueryPeerRequest{
Multiaddr: maddr,
}
res := &rpctypes.QueryPeerResponse{}
params, err := json.Marshal(req)
if err != nil {
return nil, err
}
resp, err := rpctypes.PostRPC(c.endpoint, method, string(params))
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
var res *rpctypes.QueryPeerResponse
if err = json.Unmarshal(resp.Result, &res); err != nil {
if err := c.Post(method, req, res); err != nil {
return nil, err
}

View File

@@ -1,10 +1,8 @@
package rpcclient
import (
"encoding/json"
"fmt"
"github.com/athanorlabs/atomic-swap/common/rpctypes"
"github.com/athanorlabs/atomic-swap/rpc"
)
@@ -14,17 +12,9 @@ func (c *Client) GetPastSwapIDs() ([]string, error) {
method = "swap_getPastIDs"
)
resp, err := rpctypes.PostRPC(c.endpoint, method, "{}")
if err != nil {
return nil, err
}
res := &rpc.GetPastIDsResponse{}
if resp.Error != nil {
return nil, fmt.Errorf("failed to call %s: %w", method, resp.Error)
}
var res *rpc.GetPastIDsResponse
if err = json.Unmarshal(resp.Result, &res); err != nil {
if err := c.Post(method, nil, res); err != nil {
return nil, err
}
@@ -41,25 +31,11 @@ func (c *Client) GetOngoingSwap(id string) (*rpc.GetOngoingResponse, error) {
OfferID: id,
}
params, err := json.Marshal(req)
if err != nil {
res := &rpc.GetOngoingResponse{}
if err := c.Post(method, req, res); err != nil {
return nil, err
}
resp, err := rpctypes.PostRPC(c.endpoint, method, string(params))
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, fmt.Errorf("failed to call %s: %w", method, resp.Error)
}
var res *rpc.GetOngoingResponse
if err = json.Unmarshal(resp.Result, &res); err != nil {
return nil, err
}
return res, nil
}
@@ -73,22 +49,9 @@ func (c *Client) GetPastSwap(id string) (*rpc.GetPastResponse, error) {
OfferID: id,
}
params, err := json.Marshal(req)
if err != nil {
return nil, err
}
res := &rpc.GetPastResponse{}
resp, err := rpctypes.PostRPC(c.endpoint, method, string(params))
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
var res *rpc.GetPastResponse
if err = json.Unmarshal(resp.Result, &res); err != nil {
if err := c.Post(method, req, res); err != nil {
return nil, err
}
@@ -104,23 +67,9 @@ func (c *Client) Refund(id string) (*rpc.RefundResponse, error) {
req := &rpc.RefundRequest{
OfferID: id,
}
res := &rpc.RefundResponse{}
params, err := json.Marshal(req)
if err != nil {
return nil, err
}
resp, err := rpctypes.PostRPC(c.endpoint, method, string(params))
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, fmt.Errorf("failed to call %s: %w", method, resp.Error)
}
var res *rpc.RefundResponse
if err = json.Unmarshal(resp.Result, &res); err != nil {
if err := c.Post(method, req, res); err != nil {
return nil, err
}
@@ -136,23 +85,9 @@ func (c *Client) GetStage(id string) (*rpc.GetStageResponse, error) {
req := &rpc.GetStageRequest{
OfferID: id,
}
res := &rpc.GetStageResponse{}
params, err := json.Marshal(req)
if err != nil {
return nil, err
}
resp, err := rpctypes.PostRPC(c.endpoint, method, string(params))
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, fmt.Errorf("failed to call %s: %w", method, resp.Error)
}
var res *rpc.GetStageResponse
if err = json.Unmarshal(resp.Result, &res); err != nil {
if err := c.Post(method, req, res); err != nil {
return nil, err
}
@@ -169,18 +104,8 @@ func (c *Client) ClearOffers(ids []string) error {
IDs: ids,
}
params, err := json.Marshal(req)
if err != nil {
return err
}
resp, err := rpctypes.PostRPC(c.endpoint, method, string(params))
if err != nil {
return err
}
if resp.Error != nil {
return fmt.Errorf("failed to call %s: %w", method, resp.Error)
if err := c.Post(method, req, nil); err != nil {
return fmt.Errorf("failed to call %s: %w", method, err)
}
return nil

View File

@@ -1,9 +1,6 @@
package rpcclient
import (
"encoding/json"
"fmt"
"github.com/athanorlabs/atomic-swap/common/rpctypes"
)
@@ -19,19 +16,9 @@ func (c *Client) TakeOffer(maddr string, offerID string, providesAmount float64)
ProvidesAmount: providesAmount,
}
params, err := json.Marshal(req)
if err != nil {
if err := c.Post(method, req, nil); err != nil {
return err
}
resp, err := rpctypes.PostRPC(c.endpoint, method, string(params))
if err != nil {
return err
}
if resp.Error != nil {
return fmt.Errorf("failed to call %s: %w", method, resp.Error)
}
return nil
}

View File

@@ -60,12 +60,13 @@ func (s *IntegrationTestSuite) SetupTest() {
// We need slightly more than xmrmakerProvideAmount for transaction fees
mineMinXMRMakerBalance(s.T(), common.MoneroToPiconero(xmrmakerProvideAmount*2))
}
s.setSwapTimeout(defaultSwapTimeout) // reset between tests, so every test starts in a known state
}
func (s *IntegrationTestSuite) setSwapTimeout(timeoutSeconds uint64) {
ac := rpcclient.NewClient(defaultXMRTakerSwapdEndpoint)
err := ac.SetSwapTimeout(timeoutSeconds)
// Reset XMR Maker and Taker between tests, so tests starts in a known state
ac := rpcclient.NewClient(context.Background(), defaultXMRTakerSwapdEndpoint)
err := ac.SetSwapTimeout(defaultSwapTimeout)
require.NoError(s.T(), err)
bc := rpcclient.NewClient(context.Background(), defaultXMRMakerSwapdEndpoint)
err = bc.ClearOffers(nil)
require.NoError(s.T(), err)
}
@@ -74,8 +75,9 @@ func (s *IntegrationTestSuite) setSwapTimeout(timeoutSeconds uint64) {
// running swapd instance instead of interacting with a wallet.
func mineMinXMRMakerBalance(t *testing.T, minBalance common.PiconeroAmount) {
daemonCli := monerorpc.New(monero.MonerodRegtestEndpoint, nil).Daemon
ctx := context.Background()
for {
balances, err := rpcclient.NewClient(defaultXMRMakerSwapdEndpoint).Balances()
balances, err := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint).Balances()
require.NoError(t, err)
if balances.PiconeroUnlockedBalance >= uint64(minBalance) {
break
@@ -101,15 +103,12 @@ func (s *IntegrationTestSuite) newSwapdWSClient(ctx context.Context, endpoint st
}
func (s *IntegrationTestSuite) TestXMRTaker_Discover() {
bc := rpcclient.NewClient(defaultXMRMakerSwapdEndpoint)
ctx := context.Background()
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
_, err := bc.MakeOffer(xmrmakerProvideAmount, xmrmakerProvideAmount, exchangeRate, types.EthAssetETH, "", 0)
require.NoError(s.T(), err)
defer func() {
err = bc.ClearOffers(nil)
require.NoError(s.T(), err)
}()
ac := rpcclient.NewClient(defaultXMRTakerSwapdEndpoint)
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
providers, err := ac.Discover(types.ProvidesXMR, defaultDiscoverTimeout)
require.NoError(s.T(), err)
require.Equal(s.T(), 1, len(providers))
@@ -117,7 +116,8 @@ func (s *IntegrationTestSuite) TestXMRTaker_Discover() {
}
func (s *IntegrationTestSuite) TestXMRMaker_Discover() {
c := rpcclient.NewClient(defaultXMRMakerSwapdEndpoint)
ctx := context.Background()
c := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
providers, err := c.Discover(types.ProvidesETH, defaultDiscoverTimeout)
require.NoError(s.T(), err)
require.Equal(s.T(), 0, len(providers))
@@ -128,16 +128,12 @@ func (s *IntegrationTestSuite) TestXMRTaker_Query() {
}
func (s *IntegrationTestSuite) testXMRTakerQuery(asset types.EthAsset) {
bc := rpcclient.NewClient(defaultXMRMakerSwapdEndpoint)
ctx := context.Background()
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
offerID, err := bc.MakeOffer(xmrmakerProvideAmount, xmrmakerProvideAmount, exchangeRate, asset, "", 0)
require.NoError(s.T(), err)
defer func() {
err = bc.ClearOffers(nil)
require.NoError(s.T(), err)
}()
c := rpcclient.NewClient(defaultXMRTakerSwapdEndpoint)
c := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
providers, err := c.Discover(types.ProvidesXMR, defaultDiscoverTimeout)
require.NoError(s.T(), err)
@@ -180,13 +176,9 @@ func (s *IntegrationTestSuite) testSuccessOneSwap(
types.ExchangeRate(exchangeRate), asset, relayerEndpoint, relayerCommission)
require.NoError(s.T(), err)
bc := rpcclient.NewClient(defaultXMRMakerSwapdEndpoint)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
offersBefore, err := bc.GetOffers()
require.NoError(s.T(), err)
defer func() {
err = bc.ClearOffers(nil)
require.NoError(s.T(), err)
}()
errCh := make(chan error, 2)
@@ -217,7 +209,7 @@ func (s *IntegrationTestSuite) testSuccessOneSwap(
}
}()
ac := rpcclient.NewClient(defaultXMRTakerSwapdEndpoint)
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
awsc := s.newSwapdWSClient(ctx, defaultXMRTakerSwapdWSEndpoint)
// TODO: implement discovery over websockets (#97)
@@ -274,13 +266,9 @@ func (s *IntegrationTestSuite) testRefundXMRTakerCancels(asset types.EthAsset) {
types.ExchangeRate(exchangeRate), asset, "", 0)
require.NoError(s.T(), err)
bc := rpcclient.NewClient(defaultXMRMakerSwapdEndpoint)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
offersBefore, err := bc.GetOffers()
require.NoError(s.T(), err)
defer func() {
err = bc.ClearOffers(nil)
require.NoError(s.T(), err)
}()
errCh := make(chan error, 2)
@@ -313,10 +301,11 @@ func (s *IntegrationTestSuite) testRefundXMRTakerCancels(asset types.EthAsset) {
}
}()
ac := rpcclient.NewClient(defaultXMRTakerSwapdEndpoint)
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
awsc := s.newSwapdWSClient(ctx, defaultXMRTakerSwapdWSEndpoint)
s.setSwapTimeout(swapTimeout)
err = ac.SetSwapTimeout(swapTimeout)
require.NoError(s.T(), err)
providers, err := ac.Discover(types.ProvidesXMR, defaultDiscoverTimeout)
require.NoError(s.T(), err)
@@ -403,12 +392,8 @@ func (s *IntegrationTestSuite) testRefundXMRMakerCancels( //nolint:unused
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
bc := rpcclient.NewClient(defaultXMRMakerSwapdEndpoint)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
bwsc := s.newSwapdWSClient(ctx, defaultXMRMakerSwapdWSEndpoint)
defer func() {
err := bc.ClearOffers(nil)
require.NoError(s.T(), err)
}()
offerID, statusCh, err := bwsc.MakeOfferAndSubscribe(0.1, xmrmakerProvideAmount,
types.ExchangeRate(exchangeRate), types.EthAssetETH, "", 0)
@@ -454,10 +439,11 @@ func (s *IntegrationTestSuite) testRefundXMRMakerCancels( //nolint:unused
}
}()
ac := rpcclient.NewClient(defaultXMRTakerSwapdEndpoint)
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
awsc := s.newSwapdWSClient(ctx, defaultXMRTakerSwapdWSEndpoint)
s.setSwapTimeout(swapTimeout)
err = ac.SetSwapTimeout(swapTimeout)
require.NoError(s.T(), err)
providers, err := ac.Discover(types.ProvidesXMR, defaultDiscoverTimeout)
require.NoError(s.T(), err)
@@ -520,13 +506,9 @@ func (s *IntegrationTestSuite) testAbortXMRTakerCancels(asset types.EthAsset) {
types.ExchangeRate(exchangeRate), asset, "", 0)
require.NoError(s.T(), err)
bc := rpcclient.NewClient(defaultXMRMakerSwapdEndpoint)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
offersBefore, err := bc.GetOffers()
require.NoError(s.T(), err)
defer func() {
err = bc.ClearOffers(nil)
require.NoError(s.T(), err)
}()
errCh := make(chan error, 2)
@@ -556,7 +538,7 @@ func (s *IntegrationTestSuite) testAbortXMRTakerCancels(asset types.EthAsset) {
}
}()
ac := rpcclient.NewClient(defaultXMRTakerSwapdEndpoint)
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
awsc := s.newSwapdWSClient(ctx, defaultXMRTakerSwapdWSEndpoint)
providers, err := ac.Discover(types.ProvidesXMR, defaultDiscoverTimeout)
@@ -619,20 +601,16 @@ func (s *IntegrationTestSuite) testAbortXMRMakerCancels(asset types.EthAsset) {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
bcli := rpcclient.NewClient(defaultXMRMakerSwapdEndpoint)
bcli := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
bwsc := s.newSwapdWSClient(ctx, defaultXMRMakerSwapdWSEndpoint)
offerID, statusCh, err := bwsc.MakeOfferAndSubscribe(0.1, xmrmakerProvideAmount,
types.ExchangeRate(exchangeRate), asset, "", 0)
require.NoError(s.T(), err)
bc := rpcclient.NewClient(defaultXMRMakerSwapdEndpoint)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
offersBefore, err := bc.GetOffers()
require.NoError(s.T(), err)
defer func() {
err = bc.ClearOffers(nil)
require.NoError(s.T(), err)
}()
errCh := make(chan error, 2)
@@ -665,7 +643,7 @@ func (s *IntegrationTestSuite) testAbortXMRMakerCancels(asset types.EthAsset) {
}
}()
c := rpcclient.NewClient(defaultXMRTakerSwapdEndpoint)
c := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
wsc := s.newSwapdWSClient(ctx, defaultXMRTakerSwapdWSEndpoint)
providers, err := c.Discover(types.ProvidesXMR, defaultDiscoverTimeout)
@@ -719,16 +697,11 @@ func (s *IntegrationTestSuite) testErrorShouldOnlyTakeOfferOnce(asset types.EthA
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
bc := rpcclient.NewClient(defaultXMRMakerSwapdEndpoint)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
offerID, err := bc.MakeOffer(xmrmakerProvideAmount, xmrmakerProvideAmount, exchangeRate, asset, "", 0)
require.NoError(s.T(), err)
defer func() {
err = bc.ClearOffers(nil)
require.NoError(s.T(), err)
}()
ac := rpcclient.NewClient(defaultXMRTakerSwapdEndpoint)
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
providers, err := ac.Discover(types.ProvidesXMR, defaultDiscoverTimeout)
require.NoError(s.T(), err)
@@ -758,7 +731,6 @@ func (s *IntegrationTestSuite) testErrorShouldOnlyTakeOfferOnce(asset types.EthA
if status != types.CompletedSuccess {
errCh <- fmt.Errorf("0th swap did not exit successfully: got %s", status)
cancel()
return
}
@@ -826,7 +798,9 @@ func (s *IntegrationTestSuite) testSuccessConcurrentSwaps(asset types.EthAsset)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
s.setSwapTimeout(swapTimeout)
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
err := ac.SetSwapTimeout(swapTimeout)
require.NoError(s.T(), err)
type makerTest struct {
offerID string
@@ -839,7 +813,7 @@ func (s *IntegrationTestSuite) testSuccessConcurrentSwaps(asset types.EthAsset)
makerTests := make([]*makerTest, numConcurrentSwaps)
for i := 0; i < numConcurrentSwaps; i++ {
bwsc := s.newSwapdWSClient(ctx, defaultXMRMakerSwapdWSEndpoint)
offerID, statusCh, err := bwsc.MakeOfferAndSubscribe(0.1, xmrmakerProvideAmount,
offerID, statusCh, err := bwsc.MakeOfferAndSubscribe(0.1, xmrmakerProvideAmount, //nolint:govet
types.ExchangeRate(exchangeRate), asset, "", 0)
require.NoError(s.T(), err)
@@ -853,13 +827,9 @@ func (s *IntegrationTestSuite) testSuccessConcurrentSwaps(asset types.EthAsset)
}
}
bc := rpcclient.NewClient(defaultXMRMakerSwapdEndpoint)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
offersBefore, err := bc.GetOffers()
require.NoError(s.T(), err)
defer func() {
err = bc.ClearOffers(nil)
require.NoError(s.T(), err)
}()
var wg sync.WaitGroup
wg.Add(2 * numConcurrentSwaps)
@@ -897,7 +867,6 @@ func (s *IntegrationTestSuite) testSuccessConcurrentSwaps(asset types.EthAsset)
// Create the XMRTakers synchronously
takerTests := make([]*takerTest, numConcurrentSwaps)
for i := 0; i < numConcurrentSwaps; i++ {
ac := rpcclient.NewClient(defaultXMRTakerSwapdEndpoint)
awsc := s.newSwapdWSClient(ctx, defaultXMRTakerSwapdWSEndpoint)
// TODO: implement discovery over websockets (#97)