unified RPC/websockets client (#472)

This commit is contained in:
Dmitry Holodov
2023-05-20 16:29:42 -05:00
committed by GitHub
parent f41c9af432
commit 0ab11176c4
38 changed files with 232 additions and 556 deletions

View File

@@ -52,7 +52,7 @@ func TestBootnode(t *testing.T) {
// Ensure the bootnode fully starts before some basic sanity checks
daemon.WaitForSwapdStart(t, rpcPort)
cli := rpcclient.NewClient(ctx, fmt.Sprintf("http://127.0.0.1:%d", rpcPort))
cli := rpcclient.NewClient(ctx, rpcPort)
versionResp, err := cli.Version()
require.NoError(t, err)
require.NotEmpty(t, versionResp.P2PVersion)

View File

@@ -27,7 +27,6 @@ import (
"github.com/athanorlabs/atomic-swap/net"
"github.com/athanorlabs/atomic-swap/rpc"
"github.com/athanorlabs/atomic-swap/rpcclient"
"github.com/athanorlabs/atomic-swap/rpcclient/wsclient"
)
const (
@@ -487,20 +486,13 @@ func main() {
}
}
func newRRPClient(ctx *cli.Context) *rpcclient.Client {
func newClient(ctx *cli.Context) *rpcclient.Client {
swapdPort := ctx.Uint(flagSwapdPort)
endpoint := fmt.Sprintf("http://127.0.0.1:%d", swapdPort)
return rpcclient.NewClient(ctx.Context, endpoint)
}
func newWSClient(ctx *cli.Context) (wsclient.WsClient, error) {
swapdPort := ctx.Uint(flagSwapdPort)
endpoint := fmt.Sprintf("ws://127.0.0.1:%d/ws", swapdPort)
return wsclient.NewWsClient(ctx.Context, endpoint)
return rpcclient.NewClient(ctx.Context, uint16(swapdPort))
}
func runAddresses(ctx *cli.Context) error {
c := newRRPClient(ctx)
c := newClient(ctx)
resp, err := c.Addresses()
if err != nil {
return err
@@ -517,7 +509,7 @@ func runAddresses(ctx *cli.Context) error {
}
func runPeers(ctx *cli.Context) error {
c := newRRPClient(ctx)
c := newClient(ctx)
resp, err := c.Peers()
if err != nil {
return err
@@ -534,7 +526,7 @@ func runPeers(ctx *cli.Context) error {
}
func runBalances(ctx *cli.Context) error {
c := newRRPClient(ctx)
c := newClient(ctx)
request := &rpctypes.BalancesRequest{}
tokens := ctx.StringSlice(flagToken)
@@ -571,7 +563,7 @@ func runBalances(ctx *cli.Context) error {
}
func runETHAddress(ctx *cli.Context) error {
c := newRRPClient(ctx)
c := newClient(ctx)
balances, err := c.Balances(nil)
if err != nil {
return err
@@ -586,7 +578,7 @@ func runETHAddress(ctx *cli.Context) error {
}
func runXMRAddress(ctx *cli.Context) error {
c := newRRPClient(ctx)
c := newClient(ctx)
balances, err := c.Balances(nil)
if err != nil {
return err
@@ -601,7 +593,7 @@ func runXMRAddress(ctx *cli.Context) error {
}
func runDiscover(ctx *cli.Context) error {
c := newRRPClient(ctx)
c := newClient(ctx)
provides := ctx.String(flagProvides)
peerIDs, err := c.Discover(provides, ctx.Uint64(flagSearchTime))
if err != nil {
@@ -624,7 +616,7 @@ func runQuery(ctx *cli.Context) error {
return errInvalidFlagValue(flagPeerID, err)
}
c := newRRPClient(ctx)
c := newClient(ctx)
res, err := c.Query(peerID)
if err != nil {
return err
@@ -647,7 +639,7 @@ func runQueryAll(ctx *cli.Context) error {
searchTime := ctx.Uint64(flagSearchTime)
c := newRRPClient(ctx)
c := newClient(ctx)
peerOffers, err := c.QueryAll(provides, searchTime)
if err != nil {
return err
@@ -672,7 +664,7 @@ func runQueryAll(ctx *cli.Context) error {
}
func runMake(ctx *cli.Context) error {
c := newRRPClient(ctx)
c := newClient(ctx)
min, err := cliutil.ReadUnsignedDecimalFlag(ctx, flagMinAmount)
if err != nil {
@@ -738,13 +730,9 @@ func runMake(ctx *cli.Context) error {
alwaysUseRelayer := ctx.Bool(flagUseRelayer)
if !ctx.Bool(flagDetached) {
wsc, err := newWSClient(ctx) //nolint:govet
if err != nil {
return err
}
defer wsc.Close()
wsc := newClient(ctx)
resp, statusCh, err := wsc.MakeOfferAndSubscribe(
resp, statusCh, err := wsc.MakeOfferAndSubscribe( //nolint:govet
min,
max,
exchangeRate,
@@ -792,11 +780,7 @@ func runTake(ctx *cli.Context) error {
}
if !ctx.Bool(flagDetached) {
wsc, err := newWSClient(ctx)
if err != nil {
return err
}
defer wsc.Close()
wsc := newClient(ctx)
statusCh, err := wsc.TakeOfferAndSubscribe(peerID, offerID, providesAmount)
if err != nil {
@@ -815,7 +799,7 @@ func runTake(ctx *cli.Context) error {
return nil
}
c := newRRPClient(ctx)
c := newClient(ctx)
if err := c.TakeOffer(peerID, offerID, providesAmount); err != nil {
return err
}
@@ -835,7 +819,7 @@ func runGetOngoingSwap(ctx *cli.Context) error {
offerID = &hash
}
c := newRRPClient(ctx)
c := newClient(ctx)
resp, err := c.GetOngoingSwap(offerID)
if err != nil {
return err
@@ -885,7 +869,7 @@ func runGetPastSwap(ctx *cli.Context) error {
offerID = &hash
}
c := newRRPClient(ctx)
c := newClient(ctx)
resp, err := c.GetPastSwap(offerID)
if err != nil {
return err
@@ -930,7 +914,7 @@ func runCancel(ctx *cli.Context) error {
return errInvalidFlagValue(flagOfferID, err)
}
c := newRRPClient(ctx)
c := newClient(ctx)
fmt.Printf("Attempting to exit swap with id %s\n", offerID)
resp, err := c.Cancel(offerID)
if err != nil {
@@ -942,7 +926,7 @@ func runCancel(ctx *cli.Context) error {
}
func runClearOffers(ctx *cli.Context) error {
c := newRRPClient(ctx)
c := newClient(ctx)
ids := ctx.String(flagOfferIDs)
if ids == "" {
@@ -973,7 +957,7 @@ func runClearOffers(ctx *cli.Context) error {
}
func runGetOffers(ctx *cli.Context) error {
c := newRRPClient(ctx)
c := newClient(ctx)
resp, err := c.GetOffers()
if err != nil {
return err
@@ -1000,7 +984,7 @@ func runGetStatus(ctx *cli.Context) error {
return errInvalidFlagValue(flagOfferID, err)
}
c := newRRPClient(ctx)
c := newClient(ctx)
resp, err := c.GetStatus(offerID)
if err != nil {
return err
@@ -1017,7 +1001,7 @@ func runClaim(ctx *cli.Context) error {
return errInvalidFlagValue(flagOfferID, err)
}
c := newRRPClient(ctx)
c := newClient(ctx)
resp, err := c.Claim(offerID)
if err != nil {
return err
@@ -1033,7 +1017,7 @@ func runRefund(ctx *cli.Context) error {
return errInvalidFlagValue(flagOfferID, err)
}
c := newRRPClient(ctx)
c := newClient(ctx)
resp, err := c.Refund(offerID)
if err != nil {
return err
@@ -1049,7 +1033,7 @@ func runSetSwapTimeout(ctx *cli.Context) error {
return errNoDuration
}
c := newRRPClient(ctx)
c := newClient(ctx)
err := c.SetSwapTimeout(uint64(duration))
if err != nil {
return err
@@ -1060,7 +1044,7 @@ func runSetSwapTimeout(ctx *cli.Context) error {
}
func runGetSwapTimeout(ctx *cli.Context) error {
c := newRRPClient(ctx)
c := newClient(ctx)
resp, err := c.GetSwapTimeout()
if err != nil {
return err
@@ -1071,7 +1055,7 @@ func runGetSwapTimeout(ctx *cli.Context) error {
}
func runSuggestedExchangeRate(ctx *cli.Context) error {
c := newRRPClient(ctx)
c := newClient(ctx)
resp, err := c.SuggestedExchangeRate()
if err != nil {
return err
@@ -1087,7 +1071,7 @@ func runSuggestedExchangeRate(ctx *cli.Context) error {
func runGetVersions(ctx *cli.Context) error {
fmt.Printf("swapcli: %s\n", cliutil.GetVersion())
c := newRRPClient(ctx)
c := newClient(ctx)
resp, err := c.Version()
if err != nil {
return err
@@ -1102,7 +1086,7 @@ func runGetVersions(ctx *cli.Context) error {
}
func runShutdown(ctx *cli.Context) error {
c := newRRPClient(ctx)
c := newClient(ctx)
err := c.Shutdown()
if err != nil {
return err
@@ -1116,7 +1100,7 @@ func runGetContractSwapInfo(ctx *cli.Context) error {
return errInvalidFlagValue(flagOfferID, err)
}
c := newRRPClient(ctx)
c := newClient(ctx)
resp, err := c.GetContractSwapInfo(offerID)
if err != nil {
return err
@@ -1144,7 +1128,7 @@ func runGetSwapSecret(ctx *cli.Context) error {
return errInvalidFlagValue(flagOfferID, err)
}
c := newRRPClient(ctx)
c := newClient(ctx)
resp, err := c.GetSwapSecret(offerID)
if err != nil {
return err
@@ -1170,7 +1154,7 @@ func runTransferXMR(ctx *cli.Context) error {
return err
}
c := newRRPClient(ctx)
c := newClient(ctx)
req := &rpc.TransferXMRRequest{
To: to,
Amount: amount,
@@ -1198,7 +1182,7 @@ func runSweepXMR(ctx *cli.Context) error {
return err
}
c := newRRPClient(ctx)
c := newClient(ctx)
request := &rpctypes.BalancesRequest{}
balances, err := c.Balances(request)
if err != nil {
@@ -1232,7 +1216,7 @@ func runTransferETH(ctx *cli.Context) error {
return err
}
c := newRRPClient(ctx)
c := newClient(ctx)
req := &rpc.TransferETHRequest{
To: to,
Amount: amount,

View File

@@ -2,7 +2,6 @@ package main
import (
"context"
"fmt"
"strconv"
"testing"
"time"
@@ -34,7 +33,7 @@ func TestRunSwapcliWithDaemonTests(t *testing.T) {
}
func (s *swapCLITestSuite) rpcEndpoint() *rpcclient.Client {
return rpcclient.NewClient(context.Background(), fmt.Sprintf("http://127.0.0.1:%d", s.conf.RPCPort))
return rpcclient.NewClient(context.Background(), s.conf.RPCPort)
}
func (s *swapCLITestSuite) mockDaiAddr() ethcommon.Address {

View File

@@ -80,7 +80,7 @@ func TestDaemon_DevXMRTaker(t *testing.T) {
// Ensure the daemon fully before we query the contract address
daemon.WaitForSwapdStart(t, rpcPort)
cli := rpcclient.NewClient(ctx, fmt.Sprintf("http://127.0.0.1:%d", rpcPort))
cli := rpcclient.NewClient(ctx, rpcPort)
versionResp, err := cli.Version()
require.NoError(t, err)
@@ -218,7 +218,7 @@ func TestDaemon_PersistOffers(t *testing.T) {
wc.Close() // wallet file stays in place with mined monero
rpcPort := getFreePort(t)
rpcEndpoint := fmt.Sprintf("http://127.0.0.1:%d", rpcPort)
rpcEndpoint := rpcPort
flags := []string{
"testSwapd",

View File

@@ -17,9 +17,6 @@ import (
// JSON RPC method names that we serve on the localhost server
const (
NetDiscover = "net_discover"
NetQueryPeer = "net_queryPeer"
SubscribeNewPeer = "net_subscribeNewPeer"
SubscribeMakeOffer = "net_makeOfferAndSubscribe"
SubscribeTakeOffer = "net_takeOfferAndSubscribe"
SubscribeSwapStatus = "swap_subscribeStatus"

View File

@@ -5,7 +5,6 @@ package daemon
import (
"context"
"fmt"
"sync"
"testing"
"time"
@@ -17,7 +16,6 @@ import (
"github.com/athanorlabs/atomic-swap/common/types"
"github.com/athanorlabs/atomic-swap/monero"
"github.com/athanorlabs/atomic-swap/rpcclient"
"github.com/athanorlabs/atomic-swap/rpcclient/wsclient"
"github.com/athanorlabs/atomic-swap/tests"
)
@@ -41,22 +39,17 @@ func TestAliceDoubleRestartAfterXMRLock(t *testing.T) {
timeout := 7 * time.Minute
ctx, cancel := LaunchDaemons(t, timeout, bobConf, aliceConf)
bws, err := wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", bobConf.RPCPort))
require.NoError(t, err)
aws, err := wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", aliceConf.RPCPort))
require.NoError(t, err)
// Use an independent context for these clients that will execute across multiple runs of the daemons
bc := rpcclient.NewClient(context.Background(), fmt.Sprintf("http://127.0.0.1:%d", bobConf.RPCPort))
ac := rpcclient.NewClient(context.Background(), fmt.Sprintf("http://127.0.0.1:%d", aliceConf.RPCPort))
bc := rpcclient.NewClient(context.Background(), bobConf.RPCPort)
ac := rpcclient.NewClient(context.Background(), aliceConf.RPCPort)
tokenAddr := GetMockTokens(t, aliceConf.EthereumClient)[MockTether]
tokenAsset := types.EthAsset(tokenAddr)
makeResp, bobStatusCh, err := bws.MakeOfferAndSubscribe(minXMR, maxXMR, exRate, tokenAsset, false)
makeResp, bobStatusCh, err := bc.MakeOfferAndSubscribe(minXMR, maxXMR, exRate, tokenAsset, false)
require.NoError(t, err)
aliceStatusCh, err := aws.TakeOfferAndSubscribe(makeResp.PeerID, makeResp.OfferID, providesAmt)
aliceStatusCh, err := ac.TakeOfferAndSubscribe(makeResp.PeerID, makeResp.OfferID, providesAmt)
require.NoError(t, err)
var statusWG sync.WaitGroup
@@ -129,10 +122,7 @@ func TestAliceDoubleRestartAfterXMRLock(t *testing.T) {
ctx, _ = LaunchDaemons(t, 5*time.Minute, bobConf, aliceConf)
t.Logf("daemons relaunched, checking swap status")
// Give alice a fresh client with a fresh context
aws, err = wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", aliceConf.RPCPort))
require.NoError(t, err)
aliceStatusCh, err = aws.SubscribeSwapStatus(makeResp.OfferID)
aliceStatusCh, err = ac.SubscribeSwapStatus(makeResp.OfferID)
require.NoError(t, err)
t.Logf("subscribed to Alice's swap status")

View File

@@ -4,7 +4,6 @@
package daemon
import (
"fmt"
"sync"
"testing"
"time"
@@ -15,7 +14,6 @@ import (
"github.com/athanorlabs/atomic-swap/ethereum/block"
"github.com/athanorlabs/atomic-swap/monero"
"github.com/athanorlabs/atomic-swap/rpcclient"
"github.com/athanorlabs/atomic-swap/rpcclient/wsclient"
"github.com/athanorlabs/atomic-swap/tests"
"github.com/ethereum/go-ethereum/crypto"
@@ -24,7 +22,8 @@ import (
"github.com/stretchr/testify/require"
)
// Test if Alice is able to call the Refund() RPC API (used by swapcli refund) immediately after locking her ETH.
// Test if Alice is able to call the Refund() RPC API, used by swapcli refund,
// immediately after locking her ETH.
func TestRunSwapDaemon_ManualRefund(t *testing.T) {
minXMR := coins.StrToDecimal("1")
maxXMR := minXMR
@@ -42,14 +41,10 @@ func TestRunSwapDaemon_ManualRefund(t *testing.T) {
timeout := 7 * time.Minute
ctx, _ := LaunchDaemons(t, timeout, bobConf, aliceConf)
bc, err := wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", bobConf.RPCPort))
require.NoError(t, err)
ac, err := wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", aliceConf.RPCPort))
require.NoError(t, err)
acHTTP := rpcclient.NewClient(ctx, fmt.Sprintf("http://127.0.0.1:%d", aliceConf.RPCPort))
bc := rpcclient.NewClient(ctx, bobConf.RPCPort)
ac := rpcclient.NewClient(ctx, aliceConf.RPCPort)
useRelayer := false
makeResp, bobStatusCh, err := bc.MakeOfferAndSubscribe(minXMR, maxXMR, exRate, types.EthAssetETH, useRelayer)
makeResp, bobStatusCh, err := bc.MakeOfferAndSubscribe(minXMR, maxXMR, exRate, types.EthAssetETH, false)
require.NoError(t, err)
aliceStatusCh, err := ac.TakeOfferAndSubscribe(makeResp.PeerID, makeResp.OfferID, providesAmt)
@@ -75,7 +70,7 @@ func TestRunSwapDaemon_ManualRefund(t *testing.T) {
// call refund
t.Log("> Alice calling refund")
refundResp, err := acHTTP.Refund(makeResp.OfferID)
refundResp, err := ac.Refund(makeResp.OfferID)
require.NoError(t, err)
ec, err := ethclient.Dial(common.DefaultGanacheEndpoint)
@@ -87,7 +82,7 @@ func TestRunSwapDaemon_ManualRefund(t *testing.T) {
assert.Equal(t, uint64(1), receipt.Status)
// manually trigger exit, since the xmrtaker doesn't watch for Refunded events.
status, err := acHTTP.Cancel(makeResp.OfferID)
status, err := ac.Cancel(makeResp.OfferID)
require.NoError(t, err)
assert.Equal(t, types.CompletedRefund.String(), status.String())
return

View File

@@ -2,7 +2,6 @@ package daemon
import (
"context"
"fmt"
"sync"
"testing"
"time"
@@ -13,7 +12,7 @@ import (
"github.com/athanorlabs/atomic-swap/coins"
"github.com/athanorlabs/atomic-swap/common/types"
"github.com/athanorlabs/atomic-swap/monero"
"github.com/athanorlabs/atomic-swap/rpcclient/wsclient"
"github.com/athanorlabs/atomic-swap/rpcclient"
"github.com/athanorlabs/atomic-swap/tests"
)
@@ -34,10 +33,8 @@ func TestXMRNotLockedAndETHRefundedAfterAliceRestarts(t *testing.T) {
// clients use a separate context and will work across server restarts
clientCtx := context.Background()
bc, err := wsclient.NewWsClient(clientCtx, fmt.Sprintf("ws://127.0.0.1:%d/ws", bobConf.RPCPort))
require.NoError(t, err)
ac, err := wsclient.NewWsClient(clientCtx, fmt.Sprintf("ws://127.0.0.1:%d/ws", aliceConf.RPCPort))
require.NoError(t, err)
bc := rpcclient.NewClient(clientCtx, bobConf.RPCPort)
ac := rpcclient.NewClient(clientCtx, aliceConf.RPCPort)
// Bob makes an offer
makeResp, bobStatusCh, err := bc.MakeOfferAndSubscribe(minXMR, maxXMR, exRate, types.EthAssetETH, false)
@@ -107,11 +104,6 @@ func TestXMRNotLockedAndETHRefundedAfterAliceRestarts(t *testing.T) {
t.Logf("daemons stopped, now re-launching Alice's daemon in isolation")
ctx, cancel = LaunchDaemons(t, 3*time.Minute, aliceConf)
// This is a bug that we need to recreate Alice's websocket client here. Remove this
// code when we fix https://github.com/AthanorLabs/atomic-swap/issues/353.
ac, err = wsclient.NewWsClient(clientCtx, fmt.Sprintf("ws://127.0.0.1:%d/ws", aliceConf.RPCPort))
require.NoError(t, err)
aliceStatusCh, err = ac.SubscribeSwapStatus(makeResp.OfferID)
require.NoError(t, err)

View File

@@ -1,7 +1,6 @@
package daemon
import (
"fmt"
"sync"
"testing"
"time"
@@ -15,7 +14,6 @@ import (
"github.com/athanorlabs/atomic-swap/common/types"
"github.com/athanorlabs/atomic-swap/monero"
"github.com/athanorlabs/atomic-swap/rpcclient"
"github.com/athanorlabs/atomic-swap/rpcclient/wsclient"
"github.com/athanorlabs/atomic-swap/tests"
)
@@ -37,24 +35,21 @@ func TestRunSwapDaemon_ExchangesXMRForERC20Tokens(t *testing.T) {
timeout := 7 * time.Minute
ctx, _ := LaunchDaemons(t, timeout, aliceConf, bobConf)
bc, err := wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", bobConf.RPCPort))
require.NoError(t, err)
ac, err := wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", aliceConf.RPCPort))
require.NoError(t, err)
bc := rpcclient.NewClient(ctx, bobConf.RPCPort)
ac := rpcclient.NewClient(ctx, aliceConf.RPCPort)
_, bobStatusCh, err := bc.MakeOfferAndSubscribe(minXMR, maxXMR, exRate, tokenAsset, false)
require.NoError(t, err)
time.Sleep(250 * time.Millisecond) // offer propagation time
// Have Alice query all the offer information back
aRPC := rpcclient.NewClient(ctx, fmt.Sprintf("http://127.0.0.1:%d", aliceConf.RPCPort))
peersWithOffers, err := aRPC.QueryAll(coins.ProvidesXMR, 3)
peersWithOffers, err := ac.QueryAll(coins.ProvidesXMR, 3)
require.NoError(t, err)
require.Len(t, peersWithOffers, 1)
require.Len(t, peersWithOffers[0].Offers, 1)
peerID := peersWithOffers[0].PeerID
offer := peersWithOffers[0].Offers[0]
tokenInfo, err := aRPC.TokenInfo(offer.EthAsset.Address())
tokenInfo, err := ac.TokenInfo(offer.EthAsset.Address())
require.NoError(t, err)
providesAmt, err := exRate.ToERC20Amount(offer.MaxAmount, tokenInfo)
require.NoError(t, err)
@@ -109,8 +104,7 @@ func TestRunSwapDaemon_ExchangesXMRForERC20Tokens(t *testing.T) {
//
// Check Bob's token balance via RPC method instead of doing it directly
//
bRPC := rpcclient.NewClient(ctx, fmt.Sprintf("http://127.0.0.1:%d", bobConf.RPCPort))
balances, err := bRPC.Balances(&rpctypes.BalancesRequest{TokenAddrs: []ethcommon.Address{tokenAddr}})
balances, err := bc.Balances(&rpctypes.BalancesRequest{TokenAddrs: []ethcommon.Address{tokenAddr}})
require.NoError(t, err)
t.Logf("Balances: %#v", balances)

View File

@@ -29,7 +29,6 @@ import (
"github.com/athanorlabs/atomic-swap/monero"
"github.com/athanorlabs/atomic-swap/net"
"github.com/athanorlabs/atomic-swap/rpcclient"
"github.com/athanorlabs/atomic-swap/rpcclient/wsclient"
"github.com/athanorlabs/atomic-swap/tests"
)
@@ -116,10 +115,8 @@ func TestRunSwapDaemon_SwapBobHasNoEth_AliceRelaysClaim(t *testing.T) {
timeout := 7 * time.Minute
ctx, _ := LaunchDaemons(t, timeout, bobConf, aliceConf)
bc, err := wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", bobConf.RPCPort))
require.NoError(t, err)
ac, err := wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", aliceConf.RPCPort))
require.NoError(t, err)
bc := rpcclient.NewClient(ctx, bobConf.RPCPort)
ac := rpcclient.NewClient(ctx, aliceConf.RPCPort)
useRelayer := false // Bob will use the relayer regardless, because he has no ETH
makeResp, bobStatusCh, err := bc.MakeOfferAndSubscribe(minXMR, maxXMR, exRate, types.EthAssetETH, useRelayer)
@@ -209,10 +206,8 @@ func TestRunSwapDaemon_NoRelayersAvailable_Refund(t *testing.T) {
timeout := 8 * time.Minute
ctx, _ := LaunchDaemons(t, timeout, bobConf, aliceConf)
bc, err := wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", bobConf.RPCPort))
require.NoError(t, err)
ac, err := wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", aliceConf.RPCPort))
require.NoError(t, err)
bc := rpcclient.NewClient(ctx, bobConf.RPCPort)
ac := rpcclient.NewClient(ctx, aliceConf.RPCPort)
useRelayer := false // Bob will use unsuccessfully use the relayer regardless, because he has no ETH
makeResp, bobStatusCh, err := bc.MakeOfferAndSubscribe(minXMR, maxXMR, exRate, types.EthAssetETH, useRelayer)
@@ -294,10 +289,8 @@ func TestRunSwapDaemon_CharlieRelays(t *testing.T) {
timeout := 7 * time.Minute
ctx, _ := LaunchDaemons(t, timeout, bobConf, aliceConf, charlieConf)
bc, err := wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", bobConf.RPCPort))
require.NoError(t, err)
ac, err := wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", aliceConf.RPCPort))
require.NoError(t, err)
bc := rpcclient.NewClient(ctx, bobConf.RPCPort)
ac := rpcclient.NewClient(ctx, aliceConf.RPCPort)
useRelayer := false // Bob will use the relayer regardless, because he has no ETH
makeResp, bobStatusCh, err := bc.MakeOfferAndSubscribe(minXMR, maxXMR, exRate, types.EthAssetETH, useRelayer)
@@ -399,11 +392,8 @@ func TestRunSwapDaemon_CharlieIsBroke_AliceRelays(t *testing.T) {
timeout := 7 * time.Minute
ctx, _ := LaunchDaemons(t, timeout, bobConf, aliceConf, charlieConf)
bc, err := wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", bobConf.RPCPort))
require.NoError(t, err)
ac, err := wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", aliceConf.RPCPort))
require.NoError(t, err)
bc := rpcclient.NewClient(ctx, bobConf.RPCPort)
ac := rpcclient.NewClient(ctx, aliceConf.RPCPort)
useRelayer := false // Bob will use the relayer regardless, because he has no ETH
makeResp, bobStatusCh, err := bc.MakeOfferAndSubscribe(minXMR, maxXMR, exRate, types.EthAssetETH, useRelayer)
require.NoError(t, err)
@@ -473,7 +463,7 @@ func TestRunSwapDaemon_RPC_Version(t *testing.T) {
timeout := time.Minute
ctx, _ := LaunchDaemons(t, timeout, conf)
c := rpcclient.NewClient(ctx, fmt.Sprintf("http://127.0.0.1:%d", conf.RPCPort))
c := rpcclient.NewClient(ctx, conf.RPCPort)
versionResp, err := c.Version()
require.NoError(t, err)
@@ -489,7 +479,7 @@ func TestRunSwapDaemon_RPC_Shutdown(t *testing.T) {
timeout := time.Minute
ctx, _ := LaunchDaemons(t, timeout, conf)
c := rpcclient.NewClient(ctx, fmt.Sprintf("http://127.0.0.1:%d", conf.RPCPort))
c := rpcclient.NewClient(ctx, conf.RPCPort)
err := c.Shutdown()
require.NoError(t, err)

View File

@@ -2,7 +2,6 @@ package daemon
import (
"context"
"fmt"
"sync"
"testing"
"time"
@@ -14,7 +13,6 @@ import (
"github.com/athanorlabs/atomic-swap/common/types"
"github.com/athanorlabs/atomic-swap/monero"
"github.com/athanorlabs/atomic-swap/rpcclient"
"github.com/athanorlabs/atomic-swap/rpcclient/wsclient"
"github.com/athanorlabs/atomic-swap/tests"
)
@@ -38,22 +36,17 @@ func TestAliceStoppedAndRestartedDuringXMRSweep(t *testing.T) {
timeout := 7 * time.Minute
ctx, cancel := LaunchDaemons(t, timeout, bobConf, aliceConf)
bws, err := wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", bobConf.RPCPort))
require.NoError(t, err)
aws, err := wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", aliceConf.RPCPort))
require.NoError(t, err)
// Use an independent context for these clients that will execute across 2 runs of the daemons
bc := rpcclient.NewClient(context.Background(), fmt.Sprintf("http://127.0.0.1:%d", bobConf.RPCPort))
ac := rpcclient.NewClient(context.Background(), fmt.Sprintf("http://127.0.0.1:%d", aliceConf.RPCPort))
bc := rpcclient.NewClient(context.Background(), bobConf.RPCPort)
ac := rpcclient.NewClient(context.Background(), aliceConf.RPCPort)
tokenAddr := GetMockTokens(t, aliceConf.EthereumClient)[MockTether]
tokenAsset := types.EthAsset(tokenAddr)
makeResp, bobStatusCh, err := bws.MakeOfferAndSubscribe(minXMR, maxXMR, exRate, tokenAsset, false)
makeResp, bobStatusCh, err := bc.MakeOfferAndSubscribe(minXMR, maxXMR, exRate, tokenAsset, false)
require.NoError(t, err)
aliceStatusCh, err := aws.TakeOfferAndSubscribe(makeResp.PeerID, makeResp.OfferID, providesAmt)
aliceStatusCh, err := ac.TakeOfferAndSubscribe(makeResp.PeerID, makeResp.OfferID, providesAmt)
require.NoError(t, err)
var statusWG sync.WaitGroup

View File

@@ -109,8 +109,7 @@ func CreateTestBootnode(t *testing.T) (uint16, string) {
}()
WaitForSwapdStart(t, conf.RPCPort)
endpoint := fmt.Sprintf("http://127.0.0.1:%d", conf.RPCPort)
addresses, err := rpcclient.NewClient(ctx, endpoint).Addresses()
addresses, err := rpcclient.NewClient(ctx, conf.RPCPort).Addresses()
require.NoError(t, err)
require.NotEmpty(t, addresses)

View File

@@ -237,38 +237,6 @@ curl -s -X POST http://127.0.0.1:5000 -H 'Content-Type: application/json' -d \
{"jsonrpc":"2.0","result":null,"id":"0"}
```
### `net_takeOfferSync`
Take an advertised swap offer. This call will initiate and execute an atomic swap. It will
not return until the swap has completed, after which it will return whether the swap was
successful or not. **Note:** You must be the ETH holder to take a swap.
Parameters:
- `peerID`: ID of the peer to swap with.
- `offerID`: ID of the swap offer.
- `providesAmount`: amount of ETH you will be providing. Must be between the offer's
`minimumAmount * exchangeRate` and `maximumAmount * exchangeRate`. For example, if the
offer has a minimum of 1 XMR and a maximum of 5 XMR and an exchange rate of 0.1, you
must provide between 0.1 ETH and 0.5 ETH.
Returns:
- `status`: the swap's status, one of `Success`, `Refunded`, or `Aborted`.
Example:
```bash
curl -s -X POST http://127.0.0.1:5000 -H 'Content-Type: application/json' -d \
'{"jsonrpc":"2.0","id":"0","method":"net_takeOfferSync","params":{
"peerID": "12D3KooWGBw6ScWiL6k3pKNT2LR9o6MVh5CtYj1X8E1rdKueYLjv",
"offerID":"0xa7429fdb7ce0c0b19bd2450cb6f8274aa9d86b3e5f9386279e95671c24fd8381",
"providesAmount": "0.03"
}
}'
```
```json
{"jsonrpc":"2.0","result":{"status":"Success"},"id":"0"}
```
## `personal` namespace
### `personal_balances`

View File

@@ -13,7 +13,6 @@ var (
errUnsupportedForBootnode = errors.New("unsupported for bootnode")
// ws errors
errUnimplemented = errors.New("unimplemented")
errInvalidMethod = errors.New("invalid method")
errNamespaceNotEnabled = errors.New("namespace not enabled")
)

View File

@@ -205,47 +205,6 @@ func (s *NetService) takeOffer(makerPeerID peer.ID, offerID types.Hash, provides
return nil
}
// TakeOfferSyncResponse ...
type TakeOfferSyncResponse struct {
Status types.Status `json:"status" validate:"required"`
}
// TakeOfferSync initiates a swap with the given peer by taking an offer they've made.
// It synchronously waits until the swap is completed before returning its status.
func (s *NetService) TakeOfferSync(
_ *http.Request,
req *rpctypes.TakeOfferRequest,
resp *TakeOfferSyncResponse,
) error {
if s.isBootnode {
return errUnsupportedForBootnode
}
if err := s.takeOffer(req.PeerID, req.OfferID, req.ProvidesAmount); err != nil {
return err
}
const checkSwapSleepDuration = time.Millisecond * 100
for {
time.Sleep(checkSwapSleepDuration)
info, err := s.sm.GetPastSwap(req.OfferID)
if err != nil {
return err
}
if info == nil {
continue
}
resp.Status = info.Status
break
}
return nil
}
// MakeOffer creates and advertises a new swap offer.
func (s *NetService) MakeOffer(
_ *http.Request,

View File

@@ -173,14 +173,9 @@ func NewServer(cfg *Config) (*Server, error) {
}, nil
}
// HttpURL returns the URL used for HTTP requests
func (s *Server) HttpURL() string { //nolint:revive
return fmt.Sprintf("http://%s", s.httpServer.Addr)
}
// WsURL returns the URL used for websocket requests
func (s *Server) WsURL() string {
return fmt.Sprintf("ws://%s/ws", s.httpServer.Addr)
// Port returns the localhost port used for HTTP and websocket requests
func (s *Server) Port() uint16 {
return uint16(s.listener.Addr().(*net.TCPAddr).Port)
}
// Start starts the JSON-RPC and Websocket server.
@@ -189,8 +184,7 @@ func (s *Server) Start() error {
return s.ctx.Err()
}
log.Infof("Starting RPC server on %s", s.HttpURL())
log.Infof("Starting websockets server on %s", s.WsURL())
log.Infof("Starting RPC/websockets server on 127.0.0.1:%d", s.Port())
serverErr := make(chan error, 1)
go func() {

View File

@@ -90,42 +90,7 @@ func (s *wsServer) handleRequest(conn *websocket.Conn, req *rpctypes.Request) er
}
return s.handleSigner(s.ctx, conn, params.OfferID, params.EthAddress, params.XMRAddress)
case rpctypes.SubscribeNewPeer:
return errUnimplemented
case rpctypes.NetDiscover:
if s.ns == nil {
return errNamespaceNotEnabled
}
params := new(rpctypes.DiscoverRequest)
if err := vjson.UnmarshalStruct(req.Params, params); err != nil {
return fmt.Errorf("failed to unmarshal parameters: %w", err)
}
resp := new(rpctypes.DiscoverResponse)
err := s.ns.Discover(nil, params, resp)
if err != nil {
return err
}
return writeResponse(conn, resp)
case rpctypes.NetQueryPeer:
if s.ns == nil {
return errNamespaceNotEnabled
}
params := new(rpctypes.QueryPeerRequest)
if err := vjson.UnmarshalStruct(req.Params, params); err != nil {
return fmt.Errorf("failed to unmarshal parameters: %w", err)
}
resp := new(rpctypes.QueryPeerResponse)
err := s.ns.QueryPeer(nil, params, resp)
if err != nil {
return err
}
return writeResponse(conn, resp)
case rpctypes.SubscribeSwapStatus:
params := new(rpctypes.SubscribeSwapStatusRequest)
if err := vjson.UnmarshalStruct(req.Params, params); err != nil {
@@ -291,6 +256,7 @@ func (s *wsServer) subscribeSwapStatus(ctx context.Context, conn *websocket.Conn
statusCh := s.backend.SwapManager().GetStatusChan(offerID)
if !s.sm.HasOngoingSwap(offerID) {
s.backend.SwapManager().DeleteStatusChan(offerID)
return s.writeSwapExitStatus(conn, offerID)
}

View File

@@ -15,7 +15,7 @@ func (c *Client) Addresses() (*rpctypes.AddressesResponse, error) {
res := &rpctypes.AddressesResponse{}
if err := c.Post(method, nil, res); err != nil {
if err := c.post(method, nil, res); err != nil {
return nil, err
}

View File

@@ -19,7 +19,7 @@ func (c *Client) Cancel(offerID types.Hash) (types.Status, error) {
}
res := &rpc.CancelResponse{}
if err := c.Post(method, req, res); err != nil {
if err := c.post(method, req, res); err != nil {
return 0, err
}

View File

@@ -2,7 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only
// Package rpcclient provides client libraries for interacting with a local swapd instance using
// the JSON-RPC remote procedure call protocol.
// the JSON-RPC remote procedure call protocol and websockets.
package rpcclient
import (
@@ -38,24 +38,26 @@ var (
// running on the local host of a single use system. TLS and authentication are not
// currently supported.
type Client struct {
ctx context.Context
endpoint string
ctx context.Context
endpoint string
wsEndpoint string
}
// NewClient creates a new JSON-RPC client for the specified endpoint. The passed context
// is used for the full lifetime of the client.
func NewClient(ctx context.Context, endpoint string) *Client {
func NewClient(ctx context.Context, port uint16) *Client {
return &Client{
ctx: ctx,
endpoint: endpoint,
ctx: ctx,
endpoint: fmt.Sprintf("http://127.0.0.1:%d", port),
wsEndpoint: fmt.Sprintf("ws://127.0.0.1:%d/ws", port),
}
}
// Post makes a JSON-RPC call to the client's endpoint, serializing any passed request
// post makes a JSON-RPC call to the client's endpoint, serializing any passed request
// object and deserializing any passed response object from the POST response body. Nil
// can be passed as the request or response when no data needs to be serialized or
// deserialized respectively.
func (c *Client) Post(method string, request any, response any) error {
func (c *Client) post(method string, request any, response any) error {
data, err := json2.EncodeClientRequest(method, request)
if err != nil {
return err

View File

@@ -9,7 +9,7 @@ func (c *Client) Shutdown() error {
const (
method = "daemon_shutdown"
)
if err := c.Post(method, nil, nil); err != nil {
if err := c.post(method, nil, nil); err != nil {
return err
}
return nil
@@ -21,7 +21,7 @@ func (c *Client) Version() (*rpc.VersionResponse, error) {
method = "daemon_version"
)
resp := &rpc.VersionResponse{}
if err := c.Post(method, nil, resp); err != nil {
if err := c.post(method, nil, resp); err != nil {
return nil, err
}
return resp, nil

View File

@@ -21,7 +21,7 @@ func (c *Client) GetContractSwapInfo(offerID types.Hash) (*rpc.GetContractSwapIn
}
res := &rpc.GetContractSwapInfoResponse{}
if err := c.Post(method, req, res); err != nil {
if err := c.post(method, req, res); err != nil {
return nil, err
}
@@ -39,7 +39,7 @@ func (c *Client) GetSwapSecret(offerID types.Hash) (*rpc.GetSwapSecretResponse,
}
res := &rpc.GetSwapSecretResponse{}
if err := c.Post(method, req, res); err != nil {
if err := c.post(method, req, res); err != nil {
return nil, err
}

View File

@@ -22,7 +22,7 @@ func (c *Client) Discover(provides string, searchTime uint64) ([]peer.ID, error)
}
res := &rpctypes.DiscoverResponse{}
if err := c.Post(method, req, res); err != nil {
if err := c.post(method, req, res); err != nil {
return nil, err
}
@@ -41,7 +41,7 @@ func (c *Client) QueryAll(provides coins.ProvidesCoin, searchTime uint64) ([]*rp
}
res := &rpctypes.QueryAllResponse{}
if err := c.Post(method, req, res); err != nil {
if err := c.post(method, req, res); err != nil {
return nil, err
}

View File

@@ -15,7 +15,7 @@ func (c *Client) GetOffers() (*rpc.GetOffersResponse, error) {
resp := &rpc.GetOffersResponse{}
if err := c.Post(method, nil, resp); err != nil {
if err := c.post(method, nil, resp); err != nil {
return nil, err
}

View File

@@ -31,7 +31,7 @@ func (c *Client) MakeOffer(
}
res := &rpctypes.MakeOfferResponse{}
if err := c.Post(method, req, res); err != nil {
if err := c.post(method, req, res); err != nil {
return nil, err
}

View File

@@ -1,7 +1,7 @@
// Copyright 2023 The AthanorLabs/atomic-swap Authors
// SPDX-License-Identifier: LGPL-3.0-only
package rpc
package rpcclient
import (
"context"

View File

@@ -1,7 +1,7 @@
// Copyright 2023 The AthanorLabs/atomic-swap Authors
// SPDX-License-Identifier: LGPL-3.0-only
package rpc
package rpcclient
import (
"testing"
@@ -9,12 +9,13 @@ import (
"github.com/cockroachdb/apd/v3"
"github.com/athanorlabs/atomic-swap/common/rpctypes"
"github.com/athanorlabs/atomic-swap/rpc"
"github.com/stretchr/testify/require"
)
func TestNet_Discover(t *testing.T) {
ns := NewNetService(new(mockNet), new(mockXMRTaker), nil, mockSwapManager(t), false)
ns := rpc.NewNetService(new(mockNet), new(mockXMRTaker), nil, mockSwapManager(t), false)
req := &rpctypes.DiscoverRequest{
Provides: "",
@@ -28,7 +29,7 @@ func TestNet_Discover(t *testing.T) {
}
func TestNet_Query(t *testing.T) {
ns := NewNetService(new(mockNet), new(mockXMRTaker), nil, mockSwapManager(t), false)
ns := rpc.NewNetService(new(mockNet), new(mockXMRTaker), nil, mockSwapManager(t), false)
req := &rpctypes.QueryPeerRequest{
PeerID: "12D3KooWDqCzbjexHEa8Rut7bzxHFpRMZyDRW1L6TGkL1KY24JH5",
@@ -42,7 +43,7 @@ func TestNet_Query(t *testing.T) {
}
func TestNet_TakeOffer(t *testing.T) {
ns := NewNetService(new(mockNet), new(mockXMRTaker), nil, mockSwapManager(t), false)
ns := rpc.NewNetService(new(mockNet), new(mockXMRTaker), nil, mockSwapManager(t), false)
req := &rpctypes.TakeOfferRequest{
PeerID: "12D3KooWDqCzbjexHEa8Rut7bzxHFpRMZyDRW1L6TGkL1KY24JH5",
@@ -53,18 +54,3 @@ func TestNet_TakeOffer(t *testing.T) {
err := ns.TakeOffer(nil, req, nil)
require.NoError(t, err)
}
func TestNet_TakeOfferSync(t *testing.T) {
ns := NewNetService(new(mockNet), new(mockXMRTaker), nil, mockSwapManager(t), false)
req := &rpctypes.TakeOfferRequest{
PeerID: "12D3KooWDqCzbjexHEa8Rut7bzxHFpRMZyDRW1L6TGkL1KY24JH5",
OfferID: testSwapID,
ProvidesAmount: apd.New(1, 0),
}
resp := new(TakeOfferSyncResponse)
err := ns.TakeOfferSync(nil, req, resp)
require.NoError(t, err)
}

View File

@@ -15,7 +15,7 @@ func (c *Client) Peers() (*rpctypes.PeersResponse, error) {
res := &rpctypes.PeersResponse{}
if err := c.Post(method, nil, res); err != nil {
if err := c.post(method, nil, res); err != nil {
return nil, err
}

View File

@@ -21,7 +21,7 @@ func (c *Client) SetSwapTimeout(timeoutSeconds uint64) error {
Timeout: timeoutSeconds,
}
if err := c.Post(method, req, nil); err != nil {
if err := c.post(method, req, nil); err != nil {
return err
}
@@ -35,7 +35,7 @@ func (c *Client) GetSwapTimeout() (*rpc.GetSwapTimeoutResponse, error) {
)
swapTimeout := &rpc.GetSwapTimeoutResponse{}
if err := c.Post(method, nil, swapTimeout); err != nil {
if err := c.post(method, nil, swapTimeout); err != nil {
return nil, err
}
@@ -52,7 +52,7 @@ func (c *Client) TokenInfo(tokenAddr ethcommon.Address) (*coins.ERC20TokenInfo,
request := &rpctypes.TokenInfoRequest{TokenAddr: tokenAddr}
tokenInfo := new(rpctypes.TokenInfoResponse)
if err := c.Post(method, request, tokenInfo); err != nil {
if err := c.post(method, request, tokenInfo); err != nil {
return nil, err
}
@@ -66,7 +66,7 @@ func (c *Client) Balances(request *rpctypes.BalancesRequest) (*rpctypes.Balances
)
balances := &rpctypes.BalancesResponse{}
if err := c.Post(method, request, balances); err != nil {
if err := c.post(method, request, balances); err != nil {
return nil, err
}
@@ -80,7 +80,7 @@ func (c *Client) TransferXMR(request *rpc.TransferXMRRequest) (*rpc.TransferXMRR
)
resp := new(rpc.TransferXMRResponse)
if err := c.Post(method, request, resp); err != nil {
if err := c.post(method, request, resp); err != nil {
return nil, err
}
@@ -94,7 +94,7 @@ func (c *Client) SweepXMR(request *rpc.SweepXMRRequest) (*rpc.SweepXMRResponse,
)
resp := new(rpc.SweepXMRResponse)
if err := c.Post(method, request, resp); err != nil {
if err := c.post(method, request, resp); err != nil {
return nil, err
}
@@ -108,7 +108,7 @@ func (c *Client) TransferETH(request *rpc.TransferETHRequest) (*rpc.TransferETHR
)
resp := new(rpc.TransferETHResponse)
if err := c.Post(method, request, resp); err != nil {
if err := c.post(method, request, resp); err != nil {
return nil, err
}

View File

@@ -20,7 +20,7 @@ func (c *Client) Query(who peer.ID) (*rpctypes.QueryPeerResponse, error) {
}
res := &rpctypes.QueryPeerResponse{}
if err := c.Post(method, req, res); err != nil {
if err := c.post(method, req, res); err != nil {
return nil, err
}

View File

@@ -22,7 +22,7 @@ func (c *Client) GetOngoingSwap(id *types.Hash) (*rpc.GetOngoingResponse, error)
res := &rpc.GetOngoingResponse{}
if err := c.Post(method, req, res); err != nil {
if err := c.post(method, req, res); err != nil {
return nil, err
}
return res, nil
@@ -40,7 +40,7 @@ func (c *Client) GetPastSwap(id *types.Hash) (*rpc.GetPastResponse, error) {
res := &rpc.GetPastResponse{}
if err := c.Post(method, req, res); err != nil {
if err := c.post(method, req, res); err != nil {
return nil, err
}
@@ -58,7 +58,7 @@ func (c *Client) GetStatus(id types.Hash) (*rpc.GetStatusResponse, error) {
}
res := &rpc.GetStatusResponse{}
if err := c.Post(method, req, res); err != nil {
if err := c.post(method, req, res); err != nil {
return nil, err
}
@@ -75,7 +75,7 @@ func (c *Client) ClearOffers(offerIDs []types.Hash) error {
OfferIDs: offerIDs,
}
if err := c.Post(method, req, nil); err != nil {
if err := c.post(method, req, nil); err != nil {
return fmt.Errorf("failed to call %s: %w", method, err)
}
@@ -94,7 +94,7 @@ func (c *Client) Claim(offerID types.Hash) (*rpc.ManualTransactionResponse, erro
res := &rpc.ManualTransactionResponse{}
if err := c.Post(method, req, res); err != nil {
if err := c.post(method, req, res); err != nil {
return nil, err
}
@@ -113,7 +113,7 @@ func (c *Client) Refund(offerID types.Hash) (*rpc.ManualTransactionResponse, err
res := &rpc.ManualTransactionResponse{}
if err := c.Post(method, req, res); err != nil {
if err := c.post(method, req, res); err != nil {
return nil, err
}
@@ -127,7 +127,7 @@ func (c *Client) SuggestedExchangeRate() (*rpc.SuggestedExchangeRateResponse, er
)
res := &rpc.SuggestedExchangeRateResponse{}
if err := c.Post(method, nil, res); err != nil {
if err := c.post(method, nil, res); err != nil {
return nil, err
}

View File

@@ -23,7 +23,7 @@ func (c *Client) TakeOffer(peerID peer.ID, offerID types.Hash, providesAmount *a
ProvidesAmount: providesAmount,
}
if err := c.Post(method, req, nil); err != nil {
if err := c.post(method, req, nil); err != nil {
return err
}

View File

@@ -1,57 +1,26 @@
// Copyright 2023 The AthanorLabs/atomic-swap Authors
// SPDX-License-Identifier: LGPL-3.0-only
// Package wsclient provides client libraries for interacting with a local swapd instance
// over web sockets.
package wsclient
package rpcclient
import (
"context"
"fmt"
"sync"
"github.com/cockroachdb/apd/v3"
"github.com/gorilla/websocket"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/athanorlabs/atomic-swap/coins"
"github.com/athanorlabs/atomic-swap/common/rpctypes"
"github.com/athanorlabs/atomic-swap/common/types"
"github.com/athanorlabs/atomic-swap/common/vjson"
"github.com/gorilla/websocket"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("rpcclient")
// WsClient ...
type WsClient interface {
Close()
Discover(provides string, searchTime uint64) ([]peer.ID, error)
Query(who peer.ID) (*rpctypes.QueryPeerResponse, error)
SubscribeSwapStatus(id types.Hash) (<-chan types.Status, error)
TakeOfferAndSubscribe(peerID peer.ID, offerID types.Hash, providesAmount *apd.Decimal) (
ch <-chan types.Status,
err error,
)
MakeOfferAndSubscribe(
min *apd.Decimal,
max *apd.Decimal,
exchangeRate *coins.ExchangeRate,
ethAsset types.EthAsset,
useRelayer bool,
) (*rpctypes.MakeOfferResponse, <-chan types.Status, error)
}
type wsClient struct {
wmu sync.Mutex
rmu sync.Mutex
conn *websocket.Conn
}
// NewWsClient ...
func NewWsClient(ctx context.Context, endpoint string) (*wsClient, error) { ///nolint:revive
conn, resp, err := websocket.DefaultDialer.DialContext(ctx, endpoint, nil)
func (c *Client) wsConnect() (*websocket.Conn, error) {
conn, resp, err := websocket.DefaultDialer.DialContext(c.ctx, c.wsEndpoint, nil)
if err != nil {
return nil, fmt.Errorf("failed to dial WS endpoint: %w", err)
}
@@ -60,25 +29,15 @@ func NewWsClient(ctx context.Context, endpoint string) (*wsClient, error) { ///n
return nil, err
}
return &wsClient{
conn: conn,
}, nil
return conn, nil
}
func (c *wsClient) Close() {
_ = c.conn.Close()
func (c *Client) writeJSON(conn *websocket.Conn, msg *rpctypes.Request) error {
return conn.WriteJSON(msg)
}
func (c *wsClient) writeJSON(msg *rpctypes.Request) error {
c.wmu.Lock()
defer c.wmu.Unlock()
return c.conn.WriteJSON(msg)
}
func (c *wsClient) read() ([]byte, error) {
c.rmu.Lock()
defer c.rmu.Unlock()
_, message, err := c.conn.ReadMessage()
func (c *Client) read(conn *websocket.Conn) ([]byte, error) {
_, message, err := conn.ReadMessage()
if err != nil {
return nil, err
}
@@ -86,101 +45,9 @@ func (c *wsClient) read() ([]byte, error) {
return message, nil
}
func (c *wsClient) Discover(provides string, searchTime uint64) ([]peer.ID, error) {
params := &rpctypes.DiscoverRequest{
Provides: provides,
SearchTime: searchTime,
}
bz, err := vjson.MarshalStruct(params)
if err != nil {
return nil, err
}
req := &rpctypes.Request{
JSONRPC: rpctypes.DefaultJSONRPCVersion,
Method: rpctypes.NetDiscover,
Params: bz,
ID: 0,
}
if err = c.writeJSON(req); err != nil {
return nil, err
}
message, err := c.read()
if err != nil {
return nil, fmt.Errorf("failed to read websockets message: %s", err)
}
resp := new(rpctypes.Response)
err = vjson.UnmarshalStruct(message, resp)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
if resp.Error != nil {
return nil, fmt.Errorf("websocket server returned error: %w", resp.Error)
}
log.Debugf("received message over websockets: %s", message)
dresp := new(rpctypes.DiscoverResponse)
if err := vjson.UnmarshalStruct(resp.Result, dresp); err != nil {
return nil, fmt.Errorf("failed to unmarshal swap ID response: %s", err)
}
return dresp.PeerIDs, nil
}
func (c *wsClient) Query(id peer.ID) (*rpctypes.QueryPeerResponse, error) {
params := &rpctypes.QueryPeerRequest{
PeerID: id,
}
bz, err := vjson.MarshalStruct(params)
if err != nil {
return nil, err
}
req := &rpctypes.Request{
JSONRPC: rpctypes.DefaultJSONRPCVersion,
Method: rpctypes.NetQueryPeer,
Params: bz,
ID: 0,
}
if err = c.writeJSON(req); err != nil {
return nil, err
}
// read ID from connection
message, err := c.read()
if err != nil {
return nil, fmt.Errorf("failed to read websockets message: %s", err)
}
resp := new(rpctypes.Response)
err = vjson.UnmarshalStruct(message, resp)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
if resp.Error != nil {
return nil, fmt.Errorf("websocket server returned error: %w", resp.Error)
}
log.Debugf("received message over websockets: %s", message)
dresp := new(rpctypes.QueryPeerResponse)
if err := vjson.UnmarshalStruct(resp.Result, dresp); err != nil {
return nil, fmt.Errorf("failed to unmarshal swap ID response: %s", err)
}
return dresp, nil
}
// SubscribeSwapStatus returns a channel that is written to each time the swap's status updates.
// If there is no swap with the given ID, it returns an error.
func (c *wsClient) SubscribeSwapStatus(id types.Hash) (<-chan types.Status, error) {
func (c *Client) SubscribeSwapStatus(id types.Hash) (<-chan types.Status, error) {
params := &rpctypes.SubscribeSwapStatusRequest{
OfferID: id,
}
@@ -197,17 +64,24 @@ func (c *wsClient) SubscribeSwapStatus(id types.Hash) (<-chan types.Status, erro
ID: 0,
}
if err = c.writeJSON(req); err != nil {
conn, err := c.wsConnect()
if err != nil {
return nil, err
}
if err = c.writeJSON(conn, req); err != nil {
_ = conn.Close()
return nil, err
}
respCh := make(chan types.Status)
go func() {
defer func() { _ = conn.Close() }()
defer close(respCh)
for {
message, err := c.read()
message, err := c.read(conn)
if err != nil {
log.Warnf("failed to read websockets message: %s", err)
break
@@ -243,7 +117,9 @@ func (c *wsClient) SubscribeSwapStatus(id types.Hash) (<-chan types.Status, erro
return respCh, nil
}
func (c *wsClient) TakeOfferAndSubscribe(
// TakeOfferAndSubscribe calls the server-side net_takeOfferAndSubscribe method
// to take and offer and get status updates over websockets.
func (c *Client) TakeOfferAndSubscribe(
peerID peer.ID,
offerID types.Hash,
providesAmount *apd.Decimal,
@@ -266,19 +142,27 @@ func (c *wsClient) TakeOfferAndSubscribe(
ID: 0,
}
if err = c.writeJSON(req); err != nil {
conn, err := c.wsConnect()
if err != nil {
return nil, err
}
if err = c.writeJSON(conn, req); err != nil {
_ = conn.Close()
return nil, err
}
// read resp from connection to see if there's an immediate error
status, err := c.readTakeOfferResponse()
status, err := c.readTakeOfferResponse(conn)
if err != nil {
_ = conn.Close()
return nil, err
}
respCh := make(chan types.Status)
go func() {
defer func() { _ = conn.Close() }()
defer close(respCh)
for {
@@ -287,7 +171,7 @@ func (c *wsClient) TakeOfferAndSubscribe(
return
}
status, err = c.readTakeOfferResponse()
status, err = c.readTakeOfferResponse(conn)
if err != nil {
log.Warnf("%s", err)
break
@@ -298,8 +182,8 @@ func (c *wsClient) TakeOfferAndSubscribe(
return respCh, nil
}
func (c *wsClient) readTakeOfferResponse() (types.Status, error) {
message, err := c.read()
func (c *Client) readTakeOfferResponse(conn *websocket.Conn) (types.Status, error) {
message, err := c.read(conn)
if err != nil {
return 0, fmt.Errorf("failed to read websockets message: %s", err)
}
@@ -323,7 +207,9 @@ func (c *wsClient) readTakeOfferResponse() (types.Status, error) {
return statusResp.Status, nil
}
func (c *wsClient) MakeOfferAndSubscribe(
// MakeOfferAndSubscribe calls the server-side net_makeOfferAndSubscribe method
// to make an offer and get status updates over websockets.
func (c *Client) MakeOfferAndSubscribe(
min *apd.Decimal,
max *apd.Decimal,
exchangeRate *coins.ExchangeRate,
@@ -350,39 +236,51 @@ func (c *wsClient) MakeOfferAndSubscribe(
ID: 0,
}
if err = c.writeJSON(req); err != nil {
conn, err := c.wsConnect()
if err != nil {
_ = conn.Close()
return nil, nil, err
}
if err = c.writeJSON(conn, req); err != nil {
_ = conn.Close()
return nil, nil, err
}
// read ID from connection
message, err := c.read()
message, err := c.read(conn)
if err != nil {
_ = conn.Close()
return nil, nil, fmt.Errorf("failed to read websockets message: %s", err)
}
resp := new(rpctypes.Response)
err = vjson.UnmarshalStruct(message, resp)
if err != nil {
_ = conn.Close()
return nil, nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
if resp.Error != nil {
_ = conn.Close()
return nil, nil, fmt.Errorf("websocket server returned error: %w", resp.Error)
}
// read synchronous response (offer ID)
respData := new(rpctypes.MakeOfferResponse)
if err := vjson.UnmarshalStruct(resp.Result, respData); err != nil {
_ = conn.Close()
return nil, nil, fmt.Errorf("failed to unmarshal response: %s", err)
}
respCh := make(chan types.Status)
go func() {
defer func() { _ = conn.Close() }()
defer close(respCh)
for {
message, err := c.read()
message, err := c.read(conn)
if err != nil {
log.Warnf("failed to read websockets message: %s", err)
break

View File

@@ -1,7 +1,7 @@
// Copyright 2023 The AthanorLabs/atomic-swap Authors
// SPDX-License-Identifier: LGPL-3.0-only
package rpc
package rpcclient
import (
"context"
@@ -17,7 +17,7 @@ import (
"github.com/athanorlabs/atomic-swap/coins"
"github.com/athanorlabs/atomic-swap/common"
"github.com/athanorlabs/atomic-swap/common/types"
"github.com/athanorlabs/atomic-swap/rpcclient/wsclient"
"github.com/athanorlabs/atomic-swap/rpc"
)
var (
@@ -26,10 +26,10 @@ var (
testTimeout = time.Second * 5
)
func newServer(t *testing.T) (*Server, *Config) {
func newServer(t *testing.T) (*rpc.Server, *rpc.Config) {
ctx, cancel := context.WithCancel(context.Background())
cfg := &Config{
cfg := &rpc.Config{
Ctx: ctx,
Env: common.Development,
Address: "127.0.0.1:0", // OS assigned port
@@ -37,10 +37,10 @@ func newServer(t *testing.T) (*Server, *Config) {
ProtocolBackend: newMockProtocolBackend(t),
XMRTaker: new(mockXMRTaker),
XMRMaker: new(mockXMRMaker),
Namespaces: AllNamespaces(),
Namespaces: rpc.AllNamespaces(),
}
s, err := NewServer(cfg)
s, err := rpc.NewServer(cfg)
require.NoError(t, err)
var wg sync.WaitGroup
@@ -64,27 +64,27 @@ func newServer(t *testing.T) (*Server, *Config) {
}
func TestSubscribeSwapStatus(t *testing.T) {
ctx := context.Background()
s, _ := newServer(t)
c, err := wsclient.NewWsClient(s.ctx, s.WsURL())
require.NoError(t, err)
c := NewClient(ctx, s.Port())
ch, err := c.SubscribeSwapStatus(testSwapID)
require.NoError(t, err)
select {
case status := <-ch:
require.Equal(t, types.CompletedSuccess, status)
require.Equal(t, types.CompletedSuccess.String(), status.String())
case <-time.After(testTimeout):
t.Fatal("test timed out")
}
}
func TestSubscribeMakeOffer(t *testing.T) {
ctx := context.Background()
s, cfg := newServer(t)
c, err := wsclient.NewWsClient(s.ctx, s.WsURL())
require.NoError(t, err)
c := NewClient(ctx, s.Port())
min := coins.StrToDecimal("0.1")
max := coins.StrToDecimal("1")
@@ -110,8 +110,7 @@ func TestSubscribeTakeOffer(t *testing.T) {
t.Cleanup(func() {
cancel()
})
c, err := wsclient.NewWsClient(cliCtx, s.WsURL())
require.NoError(t, err)
c := NewClient(cliCtx, s.Port())
ch, err := c.TakeOfferAndSubscribe(testPeerID, testSwapID, apd.New(1, 0))
require.NoError(t, err)

View File

@@ -50,7 +50,7 @@ func deployTestERC20(t *testing.T) ethcommon.Address {
MineTransaction(t, ec.Raw(), erc20Tx)
// Query Charlie's Ethereum address
charlieCli := rpcclient.NewClient(ctx, defaultCharlieSwapdEndpoint)
charlieCli := rpcclient.NewClient(ctx, defaultCharlieSwapdPort)
balResp, err := charlieCli.Balances(nil)
require.NoError(t, err)
charlieAddr := balResp.EthAddress
@@ -67,7 +67,7 @@ func deployTestERC20(t *testing.T) ethcommon.Address {
}
// verify that the XMR Taker has exactly 1000 tokens
aliceCli := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
aliceCli := rpcclient.NewClient(ctx, defaultXMRTakerSwapdPort)
balResp, err = aliceCli.Balances(tokenBalReq)
require.NoError(t, err)
require.Equal(t, "1000", balResp.TokenBalances[0].AsStandardString())

View File

@@ -28,7 +28,6 @@ import (
"github.com/athanorlabs/atomic-swap/common/types"
"github.com/athanorlabs/atomic-swap/monero"
"github.com/athanorlabs/atomic-swap/rpcclient"
"github.com/athanorlabs/atomic-swap/rpcclient/wsclient"
)
const (
@@ -37,12 +36,9 @@ const (
generateBlocksEnv = "GENERATEBLOCKS"
falseStr = "false"
defaultXMRTakerSwapdEndpoint = "http://localhost:5000"
defaultXMRTakerSwapdWSEndpoint = "ws://localhost:5000/ws"
defaultXMRMakerSwapdEndpoint = "http://localhost:5001"
defaultXMRMakerSwapdWSEndpoint = "ws://localhost:5001/ws"
defaultCharlieSwapdEndpoint = "http://localhost:5002"
defaultCharlieSwapdWSEndpoint = "ws://localhost:5002/ws"
defaultXMRTakerSwapdPort = 5000
defaultXMRMakerSwapdPort = 5001
defaultCharlieSwapdPort = 5002
defaultDiscoverTimeout = 2 // 2 seconds
@@ -79,10 +75,10 @@ func (s *IntegrationTestSuite) SetupTest() {
}
// Reset XMR Maker and Taker between tests, so tests starts in a known state
ac := rpcclient.NewClient(context.Background(), defaultXMRTakerSwapdEndpoint)
ac := rpcclient.NewClient(context.Background(), defaultXMRTakerSwapdPort)
err := ac.SetSwapTimeout(defaultSwapTimeout)
require.NoError(s.T(), err)
bc := rpcclient.NewClient(context.Background(), defaultXMRMakerSwapdEndpoint)
bc := rpcclient.NewClient(context.Background(), defaultXMRMakerSwapdPort)
err = bc.ClearOffers(nil)
require.NoError(s.T(), err)
}
@@ -94,7 +90,7 @@ func mineMinXMRMakerBalance(t *testing.T, minBalance *coins.PiconeroAmount) {
daemonCli := monerorpc.New(monero.MonerodRegtestEndpoint, nil).Daemon
ctx := context.Background()
for {
balances, err := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint).Balances(nil)
balances, err := rpcclient.NewClient(ctx, defaultXMRMakerSwapdPort).Balances(nil)
require.NoError(t, err)
if balances.PiconeroUnlockedBalance.Cmp(minBalance) >= 0 {
break
@@ -110,25 +106,16 @@ func mineMinXMRMakerBalance(t *testing.T, minBalance *coins.PiconeroAmount) {
}
}
func (s *IntegrationTestSuite) newSwapdWSClient(ctx context.Context, endpoint string) wsclient.WsClient {
wsc, err := wsclient.NewWsClient(ctx, endpoint)
require.NoError(s.T(), err)
s.T().Cleanup(func() {
wsc.Close()
})
return wsc
}
func (s *IntegrationTestSuite) TestXMRTaker_Discover() {
ctx := context.Background()
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdPort)
_, err := bc.MakeOffer(xmrmakerProvideAmount, xmrmakerProvideAmount, exchangeRate, types.EthAssetETH, false)
require.NoError(s.T(), err)
// Give offer advertisement time to propagate
require.NoError(s.T(), common.SleepWithContext(ctx, time.Second))
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdPort)
peerIDs, err := ac.Discover(string(coins.ProvidesXMR), defaultDiscoverTimeout)
require.NoError(s.T(), err)
require.Equal(s.T(), 1, len(peerIDs))
@@ -136,7 +123,7 @@ func (s *IntegrationTestSuite) TestXMRTaker_Discover() {
func (s *IntegrationTestSuite) TestXMRMaker_Discover() {
ctx := context.Background()
c := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
c := rpcclient.NewClient(ctx, defaultXMRMakerSwapdPort)
peerIDs, err := c.Discover(string(coins.ProvidesETH), defaultDiscoverTimeout)
require.NoError(s.T(), err)
require.Equal(s.T(), 0, len(peerIDs))
@@ -148,13 +135,13 @@ func (s *IntegrationTestSuite) TestXMRTaker_Query() {
func (s *IntegrationTestSuite) testXMRTakerQuery(asset types.EthAsset) {
ctx := context.Background()
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdPort)
offerResp, err := bc.MakeOffer(xmrmakerProvideAmount, xmrmakerProvideAmount, exchangeRate, asset, false)
require.NoError(s.T(), err)
require.NoError(s.T(), common.SleepWithContext(ctx, time.Second)) // Give offer advertisement time to propagate
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdPort)
peerIDs, err := ac.Discover(string(coins.ProvidesXMR), defaultDiscoverTimeout)
require.NoError(s.T(), err)
require.Equal(s.T(), 1, len(peerIDs))
@@ -186,13 +173,12 @@ func (s *IntegrationTestSuite) testSuccessOneSwap(asset types.EthAsset, useRelay
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
bwsc := s.newSwapdWSClient(ctx, defaultXMRMakerSwapdWSEndpoint)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdPort)
min := coins.StrToDecimal("0.21")
offerResp, statusCh, err := bwsc.MakeOfferAndSubscribe(min, xmrmakerProvideAmount,
offerResp, statusCh, err := bc.MakeOfferAndSubscribe(min, xmrmakerProvideAmount,
exchangeRate, asset, useRelayer)
require.NoError(s.T(), err)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
beforeResp, err := bc.GetOffers()
require.NoError(s.T(), err)
@@ -225,8 +211,7 @@ func (s *IntegrationTestSuite) testSuccessOneSwap(asset types.EthAsset, useRelay
}
}()
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
awsc := s.newSwapdWSClient(ctx, defaultXMRTakerSwapdWSEndpoint)
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdPort)
// Give offer advertisement time to propagate
require.NoError(s.T(), common.SleepWithContext(ctx, time.Second))
@@ -238,7 +223,7 @@ func (s *IntegrationTestSuite) testSuccessOneSwap(asset types.EthAsset, useRelay
assert.Equal(s.T(), peerIDs[0], offerResp.PeerID)
providesAmt := coins.StrToDecimal("0.05")
takerStatusCh, err := awsc.TakeOfferAndSubscribe(offerResp.PeerID, offerResp.OfferID, providesAmt)
takerStatusCh, err := ac.TakeOfferAndSubscribe(offerResp.PeerID, offerResp.OfferID, providesAmt)
require.NoError(s.T(), err)
go func() {
@@ -281,12 +266,11 @@ func (s *IntegrationTestSuite) testRefundXMRTakerCancels(asset types.EthAsset) {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
bwsc := s.newSwapdWSClient(ctx, defaultXMRMakerSwapdWSEndpoint)
offerResp, statusCh, err := bwsc.MakeOfferAndSubscribe(xmrmakerProvideAmount, xmrmakerProvideAmount,
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdPort)
offerResp, statusCh, err := bc.MakeOfferAndSubscribe(xmrmakerProvideAmount, xmrmakerProvideAmount,
exchangeRate, asset, false)
require.NoError(s.T(), err)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
beforeResp, err := bc.GetOffers()
require.NoError(s.T(), err)
@@ -321,8 +305,7 @@ func (s *IntegrationTestSuite) testRefundXMRTakerCancels(asset types.EthAsset) {
}
}()
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
awsc := s.newSwapdWSClient(ctx, defaultXMRTakerSwapdWSEndpoint)
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdPort)
err = ac.SetSwapTimeout(swapTimeout)
require.NoError(s.T(), err)
@@ -336,7 +319,7 @@ func (s *IntegrationTestSuite) testRefundXMRTakerCancels(asset types.EthAsset) {
assert.Equal(s.T(), offerResp.PeerID, peerIDs[0])
providesAmt := coins.StrToDecimal("0.05")
takerStatusCh, err := awsc.TakeOfferAndSubscribe(offerResp.PeerID, offerResp.OfferID, providesAmt)
takerStatusCh, err := ac.TakeOfferAndSubscribe(offerResp.PeerID, offerResp.OfferID, providesAmt)
require.NoError(s.T(), err)
go func() {
@@ -416,10 +399,9 @@ func (s *IntegrationTestSuite) testRefundXMRMakerCancels(
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
bwsc := s.newSwapdWSClient(ctx, defaultXMRMakerSwapdWSEndpoint)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdPort)
offerResp, statusCh, err := bwsc.MakeOfferAndSubscribe(xmrmakerProvideAmount, xmrmakerProvideAmount,
offerResp, statusCh, err := bc.MakeOfferAndSubscribe(xmrmakerProvideAmount, xmrmakerProvideAmount,
exchangeRate, types.EthAssetETH, false)
require.NoError(s.T(), err)
@@ -463,8 +445,7 @@ func (s *IntegrationTestSuite) testRefundXMRMakerCancels(
}
}()
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
awsc := s.newSwapdWSClient(ctx, defaultXMRTakerSwapdWSEndpoint)
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdPort)
err = ac.SetSwapTimeout(swapTimeout)
require.NoError(s.T(), err)
@@ -476,7 +457,7 @@ func (s *IntegrationTestSuite) testRefundXMRMakerCancels(
require.NoError(s.T(), err)
require.Equal(s.T(), 1, len(peerIDs))
providesAmt := coins.StrToDecimal("0.05")
takerStatusCh, err := awsc.TakeOfferAndSubscribe(offerResp.PeerID, offerResp.OfferID, providesAmt)
takerStatusCh, err := ac.TakeOfferAndSubscribe(offerResp.PeerID, offerResp.OfferID, providesAmt)
require.NoError(s.T(), err)
go func() {
@@ -527,14 +508,13 @@ func (s *IntegrationTestSuite) testAbortXMRTakerCancels(asset types.EthAsset) {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
bwsc := s.newSwapdWSClient(ctx, defaultXMRMakerSwapdWSEndpoint)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdPort)
min := coins.StrToDecimal("0.21")
offerResp, statusCh, err := bwsc.MakeOfferAndSubscribe(min, xmrmakerProvideAmount,
offerResp, statusCh, err := bc.MakeOfferAndSubscribe(min, xmrmakerProvideAmount,
exchangeRate, asset, false)
require.NoError(s.T(), err)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
beforeResp, err := bc.GetOffers()
require.NoError(s.T(), err)
@@ -566,8 +546,7 @@ func (s *IntegrationTestSuite) testAbortXMRTakerCancels(asset types.EthAsset) {
}
}()
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
awsc := s.newSwapdWSClient(ctx, defaultXMRTakerSwapdWSEndpoint)
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdPort)
// Bob making an offer above only queues the DHT advertisement for the XMR
// namespace (the namespace for swapd hosts providing XMR offers). We need
@@ -581,7 +560,7 @@ func (s *IntegrationTestSuite) testAbortXMRTakerCancels(asset types.EthAsset) {
assert.Equal(s.T(), offerResp.PeerID, peerIDs[0])
amount := coins.StrToDecimal("0.05")
takerStatusCh, err := awsc.TakeOfferAndSubscribe(offerResp.PeerID, offerResp.OfferID, amount)
takerStatusCh, err := ac.TakeOfferAndSubscribe(offerResp.PeerID, offerResp.OfferID, amount)
require.NoError(s.T(), err)
go func() {
@@ -636,14 +615,13 @@ func (s *IntegrationTestSuite) testAbortXMRMakerCancels(asset types.EthAsset) {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
bcli := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
bwsc := s.newSwapdWSClient(ctx, defaultXMRMakerSwapdWSEndpoint)
bcli := rpcclient.NewClient(ctx, defaultXMRMakerSwapdPort)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdPort)
offerResp, statusCh, err := bwsc.MakeOfferAndSubscribe(xmrmakerProvideAmount, xmrmakerProvideAmount,
offerResp, statusCh, err := bc.MakeOfferAndSubscribe(xmrmakerProvideAmount, xmrmakerProvideAmount,
exchangeRate, asset, false)
require.NoError(s.T(), err)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
beforeResp, err := bc.GetOffers()
require.NoError(s.T(), err)
@@ -678,18 +656,17 @@ func (s *IntegrationTestSuite) testAbortXMRMakerCancels(asset types.EthAsset) {
}
}()
c := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
wsc := s.newSwapdWSClient(ctx, defaultXMRTakerSwapdWSEndpoint)
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdPort)
// Give offer advertisement time to propagate
require.NoError(s.T(), common.SleepWithContext(ctx, time.Second))
peerIDs, err := c.Discover(string(coins.ProvidesXMR), defaultDiscoverTimeout)
peerIDs, err := ac.Discover(string(coins.ProvidesXMR), defaultDiscoverTimeout)
require.NoError(s.T(), err)
require.Equalf(s.T(), 1, len(peerIDs), "peer count mismatch")
providesAmount := coins.StrToDecimal("0.05")
takerStatusCh, err := wsc.TakeOfferAndSubscribe(offerResp.PeerID, offerResp.OfferID, providesAmount)
takerStatusCh, err := ac.TakeOfferAndSubscribe(offerResp.PeerID, offerResp.OfferID, providesAmount)
require.NoError(s.T(), err)
go func() {
@@ -738,14 +715,14 @@ func (s *IntegrationTestSuite) testErrorShouldOnlyTakeOfferOnce(asset types.EthA
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdPort)
offerResp, err := bc.MakeOffer(xmrmakerProvideAmount, xmrmakerProvideAmount, exchangeRate, asset, false)
require.NoError(s.T(), err)
// Give offer advertisement time to propagate
require.NoError(s.T(), common.SleepWithContext(ctx, time.Second))
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdPort)
peerIDs, err := ac.Discover(string(coins.ProvidesXMR), defaultDiscoverTimeout)
require.NoError(s.T(), err)
require.Equal(s.T(), 1, len(peerIDs))
@@ -758,10 +735,9 @@ func (s *IntegrationTestSuite) testErrorShouldOnlyTakeOfferOnce(asset types.EthA
go func() {
defer wg.Done()
wsc := s.newSwapdWSClient(ctx, defaultXMRTakerSwapdWSEndpoint)
providesAmount := coins.StrToDecimal("0.05")
takerStatusCh, err := wsc.TakeOfferAndSubscribe(offerResp.PeerID, offerResp.OfferID, providesAmount) //nolint:govet
takerStatusCh, err := ac.TakeOfferAndSubscribe(offerResp.PeerID, offerResp.OfferID, providesAmount) //nolint:govet
if err != nil {
errCh <- err
return
@@ -785,10 +761,10 @@ func (s *IntegrationTestSuite) testErrorShouldOnlyTakeOfferOnce(asset types.EthA
go func() {
defer wg.Done()
wsc := s.newSwapdWSClient(ctx, defaultCharlieSwapdWSEndpoint)
cc := rpcclient.NewClient(ctx, defaultCharlieSwapdPort)
providesAmount := coins.StrToDecimal("0.05")
takerStatusCh, err := wsc.TakeOfferAndSubscribe(offerResp.PeerID, offerResp.OfferID, providesAmount) //nolint:govet
takerStatusCh, err := cc.TakeOfferAndSubscribe(offerResp.PeerID, offerResp.OfferID, providesAmount) //nolint:govet
if err != nil {
errCh <- err
return
@@ -843,7 +819,7 @@ func (s *IntegrationTestSuite) testSuccessConcurrentSwaps(asset types.EthAsset)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdEndpoint)
ac := rpcclient.NewClient(ctx, defaultXMRTakerSwapdPort)
err := ac.SetSwapTimeout(swapTimeout)
require.NoError(s.T(), err)
@@ -857,8 +833,8 @@ func (s *IntegrationTestSuite) testSuccessConcurrentSwaps(asset types.EthAsset)
// Create the XMRMaker offers synchronously
makerTests := make([]*makerTest, numConcurrentSwaps)
for i := 0; i < numConcurrentSwaps; i++ {
bwsc := s.newSwapdWSClient(ctx, defaultXMRMakerSwapdWSEndpoint)
offerResp, statusCh, err := bwsc.MakeOfferAndSubscribe(xmrmakerProvideAmount, xmrmakerProvideAmount, //nolint:govet
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdPort)
offerResp, statusCh, err := bc.MakeOfferAndSubscribe(xmrmakerProvideAmount, xmrmakerProvideAmount, //nolint:govet
exchangeRate, asset, false)
require.NoError(s.T(), err)
@@ -872,7 +848,7 @@ func (s *IntegrationTestSuite) testSuccessConcurrentSwaps(asset types.EthAsset)
}
}
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
bc := rpcclient.NewClient(ctx, defaultXMRMakerSwapdPort)
beforeResp, err := bc.GetOffers()
require.NoError(s.T(), err)
@@ -912,16 +888,13 @@ func (s *IntegrationTestSuite) testSuccessConcurrentSwaps(asset types.EthAsset)
// Create the XMRTakers synchronously
takerTests := make([]*takerTest, numConcurrentSwaps)
for i := 0; i < numConcurrentSwaps; i++ {
awsc := s.newSwapdWSClient(ctx, defaultXMRTakerSwapdWSEndpoint)
// TODO: implement discovery over websockets (#97)
peerIDs, err := ac.Discover(string(coins.ProvidesXMR), defaultDiscoverTimeout) //nolint:govet
require.NoError(s.T(), err)
require.Equal(s.T(), 1, len(peerIDs))
offerID := makerTests[i].offerID
providesAmount := coins.StrToDecimal("0.05")
takerStatusCh, err := awsc.TakeOfferAndSubscribe(peerIDs[0], offerID, providesAmount)
takerStatusCh, err := ac.TakeOfferAndSubscribe(peerIDs[0], offerID, providesAmount)
require.NoError(s.T(), err)
s.T().Logf("XMRTaker[%d] took offer %s", i, offerID)

View File

@@ -1 +0,0 @@
bb3d95d434ac30e2a5804331b63c493c3db01fd6e47d9f49f19078ae91442075269ab76435bef805b8f8f3f2f456e2dc48ff8e37fde560ced09ff13b8cc9ba61

View File

@@ -23,7 +23,7 @@ func (s *IntegrationTestSuite) TestERC20_Success_ClaimRelayer() {
func (s *IntegrationTestSuite) TestXMRMaker_DiscoverRelayer() {
ctx := context.Background()
c := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
c := rpcclient.NewClient(ctx, defaultXMRMakerSwapdPort)
peerIDs, err := c.Discover("relayer", defaultDiscoverTimeout)
require.NoError(s.T(), err)