libp2p QUIC and NAT traversal features (#251)

Enables additional libp2p NAT traversal features and enables nodes to communicate using UDP/QUIC in addition to TCP. Switches to using libp2p's "routed host". Most commands that previously accepted multiaddress values now accept a Peer IDs instead.
This commit is contained in:
Dmitry Holodov
2022-12-19 18:25:24 -06:00
committed by GitHub
parent 5a4ccb332f
commit ed15ed9c82
60 changed files with 1643 additions and 1342 deletions

View File

@@ -16,3 +16,7 @@ var (
errMustSetRelayerCommission = fmt.Errorf("%s must be set if %s is set", flagRelayerCommission, flagRelayerEndpoint)
errMustSetRelayerEndpoint = fmt.Errorf("%s must be set if %s is set", flagRelayerEndpoint, flagRelayerCommission)
)
func errInvalidFlagValue(flagName string, err error) error {
return fmt.Errorf("invalid value passed to --%s: %w", flagName, err)
}

View File

@@ -8,26 +8,33 @@ import (
"strings"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/urfave/cli/v2"
"github.com/athanorlabs/atomic-swap/cliutil"
"github.com/athanorlabs/atomic-swap/common"
"github.com/athanorlabs/atomic-swap/common/rpctypes"
"github.com/athanorlabs/atomic-swap/common/types"
"github.com/athanorlabs/atomic-swap/rpcclient"
"github.com/athanorlabs/atomic-swap/rpcclient/wsclient"
)
const (
defaultSwapdPort = 5001
defaultDiscoverSearchTimeSecs = 12
flagSwapdPort = "swapd-port"
flagMinAmount = "min-amount"
flagMaxAmount = "max-amount"
flagPeerID = "peer-id"
flagOfferID = "offer-id"
flagOfferIDs = "offer-ids"
flagExchangeRate = "exchange-rate"
flagProvides = "provides"
flagProvidesAmount = "provides-amount"
flagRelayerCommission = "relayer-commission"
flagRelayerEndpoint = "relayer-endpoint"
flagSearchTime = "search-time"
flagSubscribe = "subscribe"
)
var (
@@ -47,6 +54,15 @@ var (
swapdPortFlag,
},
},
{
Name: "peers",
Aliases: []string{"p"},
Usage: "List peers that are currently connected",
Action: runPeers,
Flags: []cli.Flag{
swapdPortFlag,
},
},
{
Name: "balances",
Aliases: []string{"b"},
@@ -63,13 +79,13 @@ var (
Action: runDiscover,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "provides",
Name: flagProvides,
Usage: fmt.Sprintf("Coin to find providers for: one of [%s, %s]",
types.ProvidesXMR, types.ProvidesETH),
Value: string(types.ProvidesXMR),
},
&cli.UintFlag{
Name: "search-time",
&cli.Uint64Flag{
Name: flagSearchTime,
Usage: "Duration of time to search for, in seconds",
Value: defaultDiscoverSearchTimeSecs,
},
@@ -83,8 +99,8 @@ var (
Action: runQuery,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "multiaddr",
Usage: "Peer's multiaddress, as provided by discover",
Name: flagPeerID,
Usage: "Peer's ID, as provided by discover",
Required: true,
},
swapdPortFlag,
@@ -97,13 +113,13 @@ var (
Action: runQueryAll,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "provides",
Name: flagProvides,
Usage: fmt.Sprintf("Coin to find providers for: one of [%s, %s]",
types.ProvidesXMR, types.ProvidesETH),
Value: string(types.ProvidesXMR),
},
&cli.UintFlag{
Name: "search-time",
&cli.Uint64Flag{
Name: flagSearchTime,
Usage: "Duration of time to search for, in seconds",
Value: defaultDiscoverSearchTimeSecs,
},
@@ -132,7 +148,7 @@ var (
Required: true,
},
&cli.BoolFlag{
Name: "subscribe",
Name: flagSubscribe,
Usage: "Subscribe to push notifications about the swap's status",
},
&cli.StringFlag{
@@ -158,12 +174,12 @@ var (
Action: runTake,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "multiaddr",
Usage: "Peer's multiaddress, as provided by discover",
Name: flagPeerID,
Usage: "Peer's ID, as provided by discover",
Required: true,
},
&cli.StringFlag{
Name: "offer-id",
Name: flagOfferID,
Usage: "ID of the offer being taken",
Required: true,
},
@@ -173,7 +189,7 @@ var (
Required: true,
},
&cli.BoolFlag{
Name: "subscribe",
Name: flagSubscribe,
Usage: "Subscribe to push notifications about the swap's status",
},
swapdPortFlag,
@@ -191,7 +207,7 @@ var (
Action: runGetOngoingSwap,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "offer-id",
Name: flagOfferID,
Usage: "ID of swap to retrieve info for",
Required: true,
},
@@ -204,7 +220,7 @@ var (
Action: runGetPastSwap,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "offer-id",
Name: flagOfferID,
Usage: "ID of swap to retrieve info for",
Required: true,
},
@@ -217,7 +233,7 @@ var (
Action: runRefund,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "offer-id",
Name: flagOfferID,
Usage: "ID of swap to retrieve info for",
Required: true,
},
@@ -230,7 +246,7 @@ var (
Action: runCancel,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "offer-id",
Name: flagOfferID,
Usage: "ID of swap to retrieve info for",
},
swapdPortFlag,
@@ -242,7 +258,7 @@ var (
Action: runClearOffers,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "offer-ids",
Name: flagOfferIDs,
Usage: "A comma-separated list of offer IDs to delete",
},
swapdPortFlag,
@@ -262,7 +278,7 @@ var (
Action: runGetStage,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "offer-id",
Name: flagOfferID,
Usage: "ID of swap to retrieve info for",
Required: true,
},
@@ -295,8 +311,9 @@ var (
swapdPortFlag = &cli.UintFlag{
Name: flagSwapdPort,
Aliases: []string{"p"},
Usage: "RPC port of swap daemon",
Value: defaultSwapdPort,
Value: common.DefaultSwapdPort,
EnvVars: []string{"SWAPD_PORT"},
}
)
@@ -322,12 +339,35 @@ func newWSClient(ctx *cli.Context) (wsclient.WsClient, error) {
func runAddresses(ctx *cli.Context) error {
c := newRRPClient(ctx)
addrs, err := c.Addresses()
resp, err := c.Addresses()
if err != nil {
return err
}
fmt.Printf("Listening addresses: %v\n", addrs)
fmt.Println("Local listening multi-addresses:")
for i, a := range resp.Addrs {
fmt.Printf("%d: %s\n", i+1, a)
}
if len(resp.Addrs) == 0 {
fmt.Println("[none]")
}
return nil
}
func runPeers(ctx *cli.Context) error {
c := newRRPClient(ctx)
resp, err := c.Peers()
if err != nil {
return err
}
fmt.Println("Connected peer multi-addresses:")
for i, a := range resp.Addrs {
fmt.Printf("%d: %s\n", i+1, a)
}
if len(resp.Addrs) == 0 {
fmt.Println("[none]")
}
return nil
}
@@ -350,61 +390,68 @@ func runBalances(ctx *cli.Context) error {
}
func runDiscover(ctx *cli.Context) error {
provides, err := types.NewProvidesCoin(ctx.String("provides"))
provides, err := providesStrToVal(ctx.String(flagProvides))
if err != nil {
return err
}
searchTime := ctx.Uint("search-time")
c := newRRPClient(ctx)
peers, err := c.Discover(provides, uint64(searchTime))
peerIDs, err := c.Discover(provides, ctx.Uint64(flagSearchTime))
if err != nil {
return err
}
for i, peer := range peers {
fmt.Printf("Peer %d: %v\n", i, peer)
for i, peerID := range peerIDs {
fmt.Printf("Peer %d: %v\n", i, peerID)
}
if len(peerIDs) == 0 {
fmt.Println("[none]")
}
return nil
}
func runQuery(ctx *cli.Context) error {
maddr := ctx.String("multiaddr")
peerID, err := peer.Decode(ctx.String(flagPeerID))
if err != nil {
return errInvalidFlagValue(flagPeerID, err)
}
c := newRRPClient(ctx)
res, err := c.Query(maddr)
res, err := c.Query(peerID)
if err != nil {
return err
}
for _, o := range res.Offers {
fmt.Printf("%v\n", o)
for i, o := range res.Offers {
printOffer(o, i, "")
}
return nil
}
func runQueryAll(ctx *cli.Context) error {
provides, err := types.NewProvidesCoin(ctx.String("provides"))
provides, err := providesStrToVal(ctx.String(flagProvides))
if err != nil {
return err
}
searchTime := ctx.Uint("search-time")
searchTime := ctx.Uint64(flagSearchTime)
c := newRRPClient(ctx)
peers, err := c.QueryAll(provides, uint64(searchTime))
peerOffers, err := c.QueryAll(provides, searchTime)
if err != nil {
return err
}
for i, peer := range peers {
for i, po := range peerOffers {
if i > 0 {
fmt.Println("---")
}
fmt.Printf("Peer %d:\n", i)
fmt.Printf("\tMultiaddress: %v\n", peer.Peer)
fmt.Printf("\tOffers:\n")
for _, o := range peer.Offers {
fmt.Printf("\t%v\n", o)
fmt.Printf(" Peer ID: %v\n", po.PeerID)
fmt.Printf(" Offers:\n")
for j, o := range po.Offers {
printOffer(o, j, " ")
}
}
@@ -436,10 +483,6 @@ func runMake(ctx *cli.Context) error {
}
c := newRRPClient(ctx)
ourAddresses, err := c.Addresses()
if err != nil {
return err
}
relayerEndpoint := ctx.String(flagRelayerEndpoint)
relayerCommission := ctx.Float64(flagRelayerCommission)
@@ -459,21 +502,22 @@ func runMake(ctx *cli.Context) error {
return errMustSetRelayerEndpoint
}
printOfferSummary := func(offerID string) {
fmt.Printf("Published offer with ID: %s\n", offerID)
fmt.Printf("On addresses: %v\n", ourAddresses)
fmt.Printf("Takers can provide between %s to %s %s\n",
common.FmtFloat(otherMin), common.FmtFloat(otherMax), ethAsset)
printOfferSummary := func(offerResp *rpctypes.MakeOfferResponse) {
fmt.Println("Published:")
fmt.Printf("\tOffer ID: %s\n", offerResp.OfferID)
fmt.Printf("\tPeer ID: %s\n", offerResp.PeerID)
fmt.Printf("\tTaker Min: %s %s\n", common.FmtFloat(otherMin), ethAsset)
fmt.Printf("\tTaker Max: %s %s\n", common.FmtFloat(otherMax), ethAsset)
}
if ctx.Bool("subscribe") {
wsc, err := newWSClient(ctx) //nolint:govet
if ctx.Bool(flagSubscribe) {
wsc, err := newWSClient(ctx)
if err != nil {
return err
}
defer wsc.Close()
id, statusCh, err := wsc.MakeOfferAndSubscribe(
resp, statusCh, err := wsc.MakeOfferAndSubscribe(
min,
max,
types.ExchangeRate(exchangeRate),
@@ -485,7 +529,7 @@ func runMake(ctx *cli.Context) error {
return err
}
printOfferSummary(id)
printOfferSummary(resp)
for stage := range statusCh {
fmt.Printf("> Stage updated: %s\n", stage)
@@ -497,36 +541,43 @@ func runMake(ctx *cli.Context) error {
return nil
}
id, err := c.MakeOffer(min, max, exchangeRate, ethAsset, relayerEndpoint, relayerCommission)
resp, err := c.MakeOffer(min, max, exchangeRate, ethAsset, relayerEndpoint, relayerCommission)
if err != nil {
return err
}
printOfferSummary(id)
printOfferSummary(resp)
return nil
}
func runTake(ctx *cli.Context) error {
maddr := ctx.String("multiaddr")
offerID := ctx.String("offer-id")
peerID, err := peer.Decode(ctx.String(flagPeerID))
if err != nil {
return errInvalidFlagValue(flagPeerID, err)
}
offerID, err := types.HexToHash(ctx.String(flagOfferID))
if err != nil {
return errInvalidFlagValue(flagOfferID, err)
}
providesAmount := ctx.Float64(flagProvidesAmount)
if providesAmount == 0 {
return errNoProvidesAmount
}
if ctx.Bool("subscribe") {
if ctx.Bool(flagSubscribe) {
wsc, err := newWSClient(ctx)
if err != nil {
return err
}
defer wsc.Close()
statusCh, err := wsc.TakeOfferAndSubscribe(maddr, offerID, providesAmount)
statusCh, err := wsc.TakeOfferAndSubscribe(peerID, offerID, providesAmount)
if err != nil {
return err
}
fmt.Printf("Initiated swap with ID %s\n", offerID)
fmt.Printf("Initiated swap with offer ID %s\n", offerID)
for stage := range statusCh {
fmt.Printf("> Stage updated: %s\n", stage)
@@ -539,12 +590,11 @@ func runTake(ctx *cli.Context) error {
}
c := newRRPClient(ctx)
err := c.TakeOffer(maddr, offerID, providesAmount)
if err != nil {
if err := c.TakeOffer(peerID, offerID, providesAmount); err != nil {
return err
}
fmt.Printf("Initiated swap with ID %s\n", offerID)
fmt.Printf("Initiated swap with offer ID %s\n", offerID)
return nil
}
@@ -555,12 +605,18 @@ func runGetPastSwapIDs(ctx *cli.Context) error {
return err
}
fmt.Printf("Past swap IDs: %v\n", ids)
fmt.Println("Past swap offer IDs:")
for i, id := range ids {
fmt.Printf("%d: %s\n", i, id)
}
if len(ids) == 0 {
fmt.Println("[none]")
}
return nil
}
func runGetOngoingSwap(ctx *cli.Context) error {
offerID := ctx.String("offer-id")
offerID := ctx.String(flagOfferID)
c := newRRPClient(ctx)
info, err := c.GetOngoingSwap(offerID)
@@ -568,7 +624,7 @@ func runGetOngoingSwap(ctx *cli.Context) error {
return err
}
fmt.Printf("Provided: %s\n ProvidedAmount: %v\n ReceivedAmount: %v\n ExchangeRate: %v\n Status: %s\n",
fmt.Printf("Provided: %s\nProvidedAmount: %v\nReceivedAmount: %v\nExchangeRate: %v\nStatus: %s\n",
info.Provided,
info.ProvidedAmount,
info.ReceivedAmount,
@@ -579,7 +635,7 @@ func runGetOngoingSwap(ctx *cli.Context) error {
}
func runGetPastSwap(ctx *cli.Context) error {
offerID := ctx.String("offer-id")
offerID := ctx.String(flagOfferID)
c := newRRPClient(ctx)
info, err := c.GetPastSwap(offerID)
@@ -598,7 +654,7 @@ func runGetPastSwap(ctx *cli.Context) error {
}
func runRefund(ctx *cli.Context) error {
offerID := ctx.String("offer-id")
offerID := ctx.String(flagOfferID)
c := newRRPClient(ctx)
resp, err := c.Refund(offerID)
@@ -611,7 +667,10 @@ func runRefund(ctx *cli.Context) error {
}
func runCancel(ctx *cli.Context) error {
offerID := ctx.String("offer-id")
offerID, err := types.HexToHash(ctx.String(flagOfferID))
if err != nil {
return errInvalidFlagValue(flagOfferID, err)
}
c := newRRPClient(ctx)
resp, err := c.Cancel(offerID)
@@ -626,7 +685,7 @@ func runCancel(ctx *cli.Context) error {
func runClearOffers(ctx *cli.Context) error {
c := newRRPClient(ctx)
ids := ctx.String("offer-ids")
ids := ctx.String(flagOfferIDs)
if ids == "" {
err := c.ClearOffers(nil)
if err != nil {
@@ -637,7 +696,15 @@ func runClearOffers(ctx *cli.Context) error {
return nil
}
err := c.ClearOffers(strings.Split(ids, ","))
var offerIDs []types.Hash
for _, offerIDStr := range strings.Split(ids, ",") {
id, err := types.HexToHash(strings.TrimSpace(offerIDStr))
if err != nil {
return errInvalidFlagValue(flagOfferIDs, err)
}
offerIDs = append(offerIDs, id)
}
err := c.ClearOffers(offerIDs)
if err != nil {
return err
}
@@ -653,16 +720,20 @@ func runGetOffers(ctx *cli.Context) error {
return err
}
fmt.Println("Peer ID (self):", resp.PeerID)
fmt.Println("Offers:")
for _, offer := range resp {
fmt.Printf("\t%v\n", offer)
for i, offer := range resp.Offers {
printOffer(offer, i, " ")
}
if len(resp.Offers) == 0 {
fmt.Println("[no offers]")
}
return nil
}
func runGetStage(ctx *cli.Context) error {
offerID := ctx.String("offer-id")
offerID := ctx.String(flagOfferID)
c := newRRPClient(ctx)
resp, err := c.GetStage(offerID)
@@ -700,3 +771,28 @@ func runGetSwapTimeout(ctx *cli.Context) error {
fmt.Printf("Swap Timeout Duration: %d seconds\n", resp.Timeout)
return nil
}
func printOffer(o *types.Offer, index int, indent string) {
if index > 0 {
fmt.Printf("%s---\n", indent)
}
fmt.Printf("%sOffer ID: %s\n", indent, o.ID)
fmt.Printf("%sProvides: %s\n", indent, o.Provides)
fmt.Printf("%sMin Amount: %s\n", indent, common.FmtFloat(o.MinAmount))
fmt.Printf("%sMax Amount: %s\n", indent, common.FmtFloat(o.MaxAmount))
fmt.Printf("%sExchange Rate: %s\n", indent, common.FmtFloat(float64(o.ExchangeRate)))
fmt.Printf("%sETH Asset: %s\n", indent, o.EthAsset)
}
func providesStrToVal(providesStr string) (types.ProvidesCoin, error) {
var provides types.ProvidesCoin
// The provides flag value defaults to XMR, but the user can still specify the empty
// string explicitly, which they can do to search the empty DHT namespace for all
// peers. `NewProvidesCoin` gives an error if you pass the empty string, so we
// special case the empty string.
if providesStr == "" {
return provides, nil
}
return types.NewProvidesCoin(providesStr)
}

View File

@@ -40,9 +40,9 @@ const (
defaultXMRMakerLibp2pPort = 9934
// default RPC port
defaultRPCPort = 5005
defaultXMRTakerRPCPort = 5001
defaultXMRMakerRPCPort = 5002
defaultRPCPort = common.DefaultSwapdPort
defaultXMRTakerRPCPort = defaultRPCPort
defaultXMRMakerRPCPort = defaultXMRTakerRPCPort + 1
)
var (
@@ -84,6 +84,7 @@ const (
flagTransferBack = "transfer-back"
flagLogLevel = "log-level"
flagProfile = "profile"
)
var (
@@ -162,6 +163,7 @@ var (
Name: flagBootnodes,
Aliases: []string{"bn"},
Usage: "libp2p bootnode, comma separated if passing multiple to a single flag",
EnvVars: []string{"SWAPD_BOOTNODES"},
},
&cli.UintFlag{
Name: flagGasPrice,
@@ -201,6 +203,11 @@ var (
Name: flagUseExternalSigner,
Usage: "Use external signer, for usage with the swap UI",
},
&cli.StringFlag{
Name: flagProfile,
Usage: "BIND_IP:PORT to provide profiling information on",
Hidden: true, // flag is only for developers
},
},
}
)
@@ -274,6 +281,10 @@ func runDaemon(c *cli.Context) error {
return err
}
if err := maybeStartProfiler(c); err != nil {
return err
}
d := newEmptyDaemon(ctx, cancel)
if err := d.make(c); err != nil {
log.Errorf("RPC/Websocket server exited: %s", err)
@@ -337,11 +348,16 @@ func (d *daemon) stop() error {
// can be specified individually with multiple flags, but can also contain
// multiple boot nodes passed to single flag separated by commas.
func expandBootnodes(nodesCLI []string) []string {
var nodes []string
for _, n := range nodesCLI {
splitNodes := strings.Split(n, ",")
for _, ns := range splitNodes {
nodes = append(nodes, strings.TrimSpace(ns))
var nodes []string // nodes from all flag values combined
for _, flagVal := range nodesCLI {
splitNodes := strings.Split(flagVal, ",")
for _, n := range splitNodes {
n = strings.TrimSpace(n)
// Handle the empty string to not use default bootnodes. Doing it here after
// the split has the arguably positive side effect of skipping empty entries.
if len(n) > 0 {
nodes = append(nodes, strings.TrimSpace(n))
}
}
}
return nodes
@@ -378,7 +394,7 @@ func (d *daemon) make(c *cli.Context) error { //nolint:gocyclo
return err
}
if len(c.StringSlice(flagBootnodes)) > 0 {
if c.IsSet(flagBootnodes) {
cfg.Bootnodes = expandBootnodes(c.StringSlice(flagBootnodes))
}

View File

@@ -68,12 +68,15 @@ func TestDaemon_DevXMRTaker(t *testing.T) {
"test --dev-xmrtaker",
map[string]any{
flagEnv: "dev",
flagDeploy: true,
flagDevXMRTaker: true,
flagDataDir: t.TempDir(),
flagRPCPort: uint(0),
flagLibp2pPort: uint(0),
},
)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(c.Context)
defer cancel()
d := &daemon{
@@ -102,6 +105,8 @@ func TestDaemon_DevXMRMaker(t *testing.T) {
flagDevXMRMaker: true,
flagDeploy: true,
flagDataDir: t.TempDir(),
flagRPCPort: uint(0),
flagLibp2pPort: uint(0),
},
)
@@ -146,8 +151,15 @@ func Test_expandBootnodes(t *testing.T) {
require.EqualValues(t, expected, expandBootnodes(cliNodes))
}
func Test_expandBootnodes_noNodes(t *testing.T) {
// This can happen when the user specifies a single `--bootnodes ""` flag
// to not use the default bootnodes for an environment.
cliNodes := []string{""}
nodes := expandBootnodes(cliNodes)
require.Zero(t, len(nodes))
}
func TestDaemon_PersistOffers(t *testing.T) {
defaultXMRMakerSwapdEndpoint := fmt.Sprintf("http://localhost:%d", defaultXMRMakerRPCPort)
startupTimeout := time.Millisecond * 100
dataDir := t.TempDir()
@@ -160,6 +172,8 @@ func TestDaemon_PersistOffers(t *testing.T) {
flagEnv: "dev",
flagDevXMRMaker: true,
flagDeploy: true,
flagRPCPort: uint(0),
flagLibp2pPort: uint(0),
flagDataDir: dataDir,
flagMoneroWalletPath: path.Join(dataDir, "test-wallet"),
},
@@ -182,12 +196,12 @@ func TestDaemon_PersistOffers(t *testing.T) {
time.Sleep(startupTimeout) // let the server start
// make an offer
client := rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
client := rpcclient.NewClient(ctx, d.rpcServer.HttpURL())
balance, err := client.Balances()
require.NoError(t, err)
require.GreaterOrEqual(t, balance.PiconeroUnlockedBalance, common.MoneroToPiconero(1))
offerID, err := client.MakeOffer(0.1, 1, float64(1), types.EthAssetETH, "", 0)
offerResp, err := client.MakeOffer(0.1, 1, float64(1), types.EthAssetETH, "", 0)
require.NoError(t, err)
// shut down daemon
@@ -206,7 +220,6 @@ func TestDaemon_PersistOffers(t *testing.T) {
defer func() {
require.NoError(t, d.stop())
}()
client = rpcclient.NewClient(ctx, defaultXMRMakerSwapdEndpoint)
wg.Add(1)
go func() {
@@ -218,8 +231,10 @@ func TestDaemon_PersistOffers(t *testing.T) {
<-d.startedCh
time.Sleep(startupTimeout) // let the server start
offers, err := client.GetOffers()
client = rpcclient.NewClient(ctx, d.rpcServer.HttpURL())
resp, err := client.GetOffers()
require.NoError(t, err)
require.Equal(t, 1, len(offers))
require.Equal(t, offerID, offers[0].ID.String())
require.Equal(t, offerResp.PeerID, resp.PeerID)
require.Equal(t, 1, len(resp.Offers))
require.Equal(t, offerResp.OfferID, resp.Offers[0].ID)
}

39
cmd/swapd/profile.go Normal file
View File

@@ -0,0 +1,39 @@
package main
import (
"net/http"
"net/http/pprof"
"time"
"github.com/urfave/cli/v2"
)
func maybeStartProfiler(c *cli.Context) error {
bindIPAndPort := c.String(flagProfile)
if bindIPAndPort == "" {
return nil
}
mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
go func() {
err := http.ListenAndServe(bindIPAndPort, mux)
log.Fatalf("Profiling server failed: %s", err)
}()
time.Sleep(100 * time.Millisecond) // let the profiler start
// While some data is browsable directly via http://127.0.0.1:${YOUR_PORT}/debug/pprof,
// other parts like cpu profiling need to be parsed using "pprof". Example:
// go tool pprof http://127.0.0.1:${YOUR_PORT}/debug/pprof/profile
// (pprof) top
// [shows top CPU using functions]
// (pprof) list FUNCTION_NAME_FROM_ABOVE_OUTPUT
// [shows cpu of line numbers in function]
log.Infof("Serving pprof data (browsable): http://%s/debug/pprof", bindIPAndPort)
return nil
}

View File

@@ -305,22 +305,22 @@ func (d *daemon) takeOffer(done <-chan struct{}) {
defer wsc.Close()
const defaultDiscoverTimeout = uint64(3) // 3s
providers, err := wsc.Discover(types.ProvidesXMR, defaultDiscoverTimeout)
peerIDs, err := wsc.Discover(types.ProvidesXMR, defaultDiscoverTimeout)
if err != nil {
d.errCh <- err
return
}
if len(providers) == 0 {
if len(peerIDs) == 0 {
return
}
makerIdx := getRandomInt(len(providers))
peer := providers[makerIdx][0]
makerIdx := getRandomInt(len(peerIDs))
peerID := peerIDs[makerIdx]
log.Debugf("node %d querying peer %s...", d.idx, peer)
log.Debugf("node %d querying peer %s...", d.idx, peerID)
resp, err := wsc.Query(peer)
resp, err := wsc.Query(peerID)
if err != nil {
d.errCh <- err
return
@@ -334,14 +334,13 @@ func (d *daemon) takeOffer(done <-chan struct{}) {
offer := resp.Offers[offerIdx]
// pick random amount between min and max
amount := offer.MinimumAmount + mrand.Float64()*(offer.MaximumAmount-offer.MinimumAmount) //nolint:gosec
amount := offer.MinAmount + mrand.Float64()*(offer.MaxAmount-offer.MinAmount) //nolint:gosec
providesAmount := offer.ExchangeRate.ToETH(amount)
start := time.Now()
log.Infof("node %d taking offer %s", d.idx, offer.ID.String())
log.Infof("node %d taking offer %s", d.idx, offer.ID)
takerStatusCh, err := wsc.TakeOfferAndSubscribe(peer,
offer.ID.String(), providesAmount)
takerStatusCh, err := wsc.TakeOfferAndSubscribe(peerID, offer.ID, providesAmount)
if err != nil {
d.errCh <- err
return