enhancement/request-retransmission (#29)

* todo

* minor changes

* seemingly fixes race
This commit is contained in:
Dean Eigenmann
2019-06-15 16:42:54 -04:00
committed by GitHub
parent 4046fbed44
commit 61a94cab47
2 changed files with 43 additions and 14 deletions

View File

@@ -140,7 +140,9 @@ func (n *Node) AppendMessage(group state.GroupID, data []byte) (state.MessageID,
}
if n.mode == INTERACTIVE {
s := state.State{}
s := state.State{
Type: state.OFFER,
}
s.SendEpoch = n.epoch + 1
err := n.syncState.Set(group, id, p, s)
@@ -189,7 +191,13 @@ func (n *Node) sendMessages() {
return s
}
n.payloads.AddOffers(g, p, m[:])
switch s.Type {
case state.OFFER:
n.payloads.AddOffers(g, p, m[:])
case state.REQUEST:
n.payloads.AddRequests(g, p, m[:])
}
return n.updateSendEpoch(s)
})
@@ -216,7 +224,7 @@ func (n *Node) onPayload(group state.GroupID, sender state.PeerID, payload proto
}
if payload.Offer != nil {
n.payloads.AddRequests(group, sender, n.onOffer(group, sender, *payload.Offer)...)
n.onOffer(group, sender, *payload.Offer)
}
if payload.Messages != nil {
@@ -224,9 +232,7 @@ func (n *Node) onPayload(group state.GroupID, sender state.PeerID, payload proto
}
}
func (n *Node) onOffer(group state.GroupID, sender state.PeerID, msg protobuf.Offer) [][]byte {
r := make([][]byte, 0)
func (n *Node) onOffer(group state.GroupID, sender state.PeerID, msg protobuf.Offer) {
for _, raw := range msg.Id {
id := toMessageID(raw)
log.Printf("[%x] OFFER (%x -> %x): %x received.\n", group[:4], sender[:4], n.ID[:4], id[:4])
@@ -236,11 +242,18 @@ func (n *Node) onOffer(group state.GroupID, sender state.PeerID, msg protobuf.Of
continue
}
r = append(r, raw)
s := state.State{
Type: state.REQUEST,
SendEpoch: n.epoch + 1, // @todo we wanna update send time here because from this block we are already sending in current epoch
}
err := n.syncState.Set(group, id, sender, s)
if err != nil {
log.Printf("error (%s) setting sync state group: %x id: %x peer: %x", err.Error(), group[:4], id[:4], sender[:4])
}
log.Printf("[%x] sending REQUEST (%x -> %x): %x\n", group[:4], n.ID[:4], sender[:4], id[:4])
}
return r
}
func (n *Node) onRequest(group state.GroupID, sender state.PeerID, msg protobuf.Request) []*protobuf.Message {
@@ -318,14 +331,22 @@ func (n *Node) onMessage(group state.GroupID, sender state.PeerID, msg protobuf.
id := state.ID(msg)
log.Printf("[%x] MESSAGE (%x -> %x): %x received.\n", group[:4], sender[:4], n.ID[:4], id[:4])
err := n.syncState.Remove(group, id, sender)
if err != nil {
return err
}
go func() {
for _, peer := range n.peers[group] {
if peer == sender {
continue
}
s := state.State{}
s.SendEpoch = n.epoch + 1
s := state.State{
Type: state.OFFER,
SendEpoch: n.epoch + 1,
}
err := n.syncState.Set(group, id, peer, s)
if err != nil {
log.Printf("error while setting sync state %s", err.Error())
@@ -333,7 +354,7 @@ func (n *Node) onMessage(group state.GroupID, sender state.PeerID, msg protobuf.
}
}()
err := n.store.Add(msg)
err = n.store.Add(msg)
if err != nil {
return err
// @todo process, should this function ever even have an error?

View File

@@ -1,9 +1,17 @@
// Package state contains everything related to the synchronization state for MVDS.
package state
type MessageType int
const (
OFFER MessageType = iota
REQUEST
)
type State struct {
SendCount uint64
SendEpoch int64
Type MessageType
SendCount uint64
SendEpoch int64
}
type SyncState interface {