fix: fix swap status channel setting on restart (#444)

This commit is contained in:
noot
2023-04-28 23:29:39 +02:00
committed by GitHub
parent 19014027a0
commit 906a5c69a3
8 changed files with 40 additions and 67 deletions

View File

@@ -127,47 +127,36 @@ func TestAliceDoubleRestartAfterXMRLock(t *testing.T) {
// relaunch the daemons (2nd and final time) overwriting ctx
t.Logf("daemons stopped, now re-launching them")
ctx, _ = LaunchDaemons(t, 5*time.Minute, bobConf, aliceConf)
t.Logf("daemons relaunched, waiting a few seconds before checking swap status")
t.Logf("daemons relaunched, checking swap status")
/*
* We'll switch to the commented out solution when the status channel is recreated
* on restart.
*/
// // Give alice a fresh client with a fresh context
// aws, err = wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", aliceConf.RPCPort))
// require.NoError(t, err)
// aliceStatusCh, err = aws.SubscribeSwapStatus(makeResp.OfferID)
// require.NoError(t, err)
//
// // In the loop below, Alice's initial state is most likely still SweepingXMR.
//endLoop:
// for {
// select {
// case status := <-aliceStatusCh:
// t.Log("> Alice got status:", status)
// if !status.IsOngoing() {
// break endLoop
// }
// case <-ctx.Done():
// t.Logf("Alice's context cancelled before she completed the swap [expected]")
// break endLoop
// }
// }
// Give alice a fresh client with a fresh context
aws, err = wsclient.NewWsClient(ctx, fmt.Sprintf("ws://127.0.0.1:%d/ws", aliceConf.RPCPort))
require.NoError(t, err)
aliceStatusCh, err = aws.SubscribeSwapStatus(makeResp.OfferID)
require.NoError(t, err)
t.Logf("subscribed to Alice's swap status")
// Temporary solution until solution above can be uncommented
for i := 0; i < 5; i++ {
pastSwap, err := ac.GetPastSwap(&makeResp.OfferID) //nolint:govet
require.NoError(t, err)
status := pastSwap.Swaps[0].Status
t.Logf("Alice past status: %s", status)
if status.IsOngoing() {
time.Sleep(2 * time.Second)
continue
// In the loop below, Alice's initial state is most likely still SweepingXMR.
endLoop:
for {
select {
case status := <-aliceStatusCh:
t.Log("> Alice got status:", status)
if !status.IsOngoing() {
break endLoop
}
case <-ctx.Done():
t.Logf("Alice's context cancelled before she completed the swap [expected]")
break endLoop
}
require.Equal(t, types.CompletedSuccess.String(), pastSwap.Swaps[0].Status.String())
}
pastSwap, err := bc.GetPastSwap(&makeResp.OfferID)
pastSwap, err := ac.GetPastSwap(&makeResp.OfferID)
require.NoError(t, err)
t.Logf("Alice past status: %s", pastSwap.Swaps[0].Status)
require.Equal(t, types.CompletedSuccess.String(), pastSwap.Swaps[0].Status.String())
pastSwap, err = bc.GetPastSwap(&makeResp.OfferID)
require.NoError(t, err)
t.Logf("Bob past status: %s", pastSwap.Swaps[0].Status)
require.Equal(t, types.CompletedSuccess.String(), pastSwap.Swaps[0].Status.String())

View File

@@ -105,11 +105,7 @@ func setSweepStatus(info *swap.Info, sm SwapManager) error {
info.SetStatus(types.SweepingXMR)
err := sm.WriteSwapToDB(info)
if err != nil {
return err
}
if info.StatusCh() != nil {
info.StatusCh() <- types.SweepingXMR
return fmt.Errorf("failed to write swap to db: %w", err)
}
return nil

View File

@@ -18,6 +18,8 @@ import (
"github.com/athanorlabs/atomic-swap/common/vjson"
)
const statusChSize = 6 // the max number of stages a swap can potentially go through
var (
// CurInfoVersion is the latest supported version of a serialised Info struct
CurInfoVersion, _ = semver.NewVersion("0.3.0")
@@ -97,7 +99,7 @@ func NewInfo(
}
// StatusCh returns the swap's status update channel.
func (i *Info) StatusCh() chan types.Status {
func (i *Info) StatusCh() <-chan types.Status {
return i.statusCh
}
@@ -105,6 +107,11 @@ func (i *Info) StatusCh() chan types.Status {
func (i *Info) SetStatus(s Status) {
i.Status = s
i.LastStatusUpdateTime = time.Now()
if i.statusCh == nil {
// this case only happens in tests.
return
}
i.statusCh <- s
}
// IsTaker returns true if the node is the xmr-taker in the swap.
@@ -135,6 +142,8 @@ func UnmarshalInfo(jsonData []byte) (*Info, error) {
return nil, err
}
info.statusCh = make(chan types.Status, statusChSize)
// TODO: Are there additional sanity checks we can perform on the Provided and Received amounts
// (or other fields) here when decoding the JSON?
return info, nil

View File

@@ -54,9 +54,6 @@ func (s *swapState) HandleProtocolMessage(msg common.Message) error {
func (s *swapState) clearNextExpectedEvent(status types.Status) {
s.nextExpectedEvent = EventNoneType
s.info.SetStatus(status)
if s.offerExtra.StatusCh != nil {
s.offerExtra.StatusCh <- status
}
}
func (s *swapState) setNextExpectedEvent(event EventType) error {
@@ -82,10 +79,6 @@ func (s *swapState) setNextExpectedEvent(event EventType) error {
return err
}
if s.offerExtra.StatusCh != nil {
s.offerExtra.StatusCh <- status
}
return nil
}

View File

@@ -246,7 +246,7 @@ func TestSwapState_HandleProtocolMessage_NotifyETHLocked_timeout(t *testing.T) {
go s.runT0ExpirationHandler()
for status := range s.offerExtra.StatusCh {
for status := range s.info.StatusCh() {
if status == types.CompletedSuccess {
break
} else if !status.IsOngoing() {

View File

@@ -41,9 +41,6 @@ func (s *swapState) HandleProtocolMessage(msg common.Message) error {
func (s *swapState) clearNextExpectedEvent(status types.Status) {
s.nextExpectedEvent = EventNoneType
s.info.SetStatus(status)
if s.statusCh != nil {
s.statusCh <- status
}
}
func (s *swapState) setNextExpectedEvent(event EventType) error {
@@ -65,16 +62,7 @@ func (s *swapState) setNextExpectedEvent(event EventType) error {
log.Debugf("setting status to %s", status)
s.info.SetStatus(status)
err := s.Backend.SwapManager().WriteSwapToDB(s.info)
if err != nil {
return err
}
if s.info.StatusCh() != nil {
s.info.StatusCh() <- status
}
return nil
return s.Backend.SwapManager().WriteSwapToDB(s.info)
}
func (s *swapState) handleSendKeysMessage(msg *message.SendKeysMessage) (common.Message, error) {

View File

@@ -53,7 +53,6 @@ type swapState struct {
noTransferBack bool
info *pswap.Info
statusCh chan types.Status
providedAmount coins.EthAssetAmount
// our keys for this session
@@ -296,7 +295,6 @@ func newSwapState(
done: make(chan struct{}),
info: info,
providedAmount: providedAmt,
statusCh: info.StatusCh(),
}
go s.runHandleEvents()

View File

@@ -250,7 +250,7 @@ func TestSwapState_HandleProtocolMessage_SendKeysMessage_Refund(t *testing.T) {
require.Equal(t, xmrmakerKeysAndProof.PrivateKeyPair.ViewKey().String(), s.xmrmakerPrivateViewKey.String())
// ensure we refund before t0
for status := range s.statusCh {
for status := range s.info.StatusCh() {
if status == types.CompletedRefund {
// check this is before t0
// TODO: remove the 10-second buffer, this is needed for now
@@ -344,7 +344,7 @@ func TestSwapState_NotifyXMRLock_Refund(t *testing.T) {
require.NoError(t, err)
require.Equal(t, EventETHClaimedType, s.nextExpectedEvent)
for status := range s.statusCh {
for status := range s.info.StatusCh() {
if status == types.CompletedRefund {
// check this is after t1
require.Less(t, s.t1, time.Now())