mirror of
https://github.com/vacp2p/mvds.git
synced 2026-01-10 04:27:58 -05:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c6e703eade | ||
|
|
e786de21ff | ||
|
|
d04eae22cf | ||
|
|
9a5ef0aa5d | ||
|
|
337ca88bfd |
24
node.go
24
node.go
@@ -23,7 +23,7 @@ type Node struct {
|
||||
sharing map[GroupID][]PeerId
|
||||
peers map[GroupID][]PeerId
|
||||
|
||||
payloads Payloads
|
||||
payloads payloads
|
||||
|
||||
nextEpoch calculateNextEpoch
|
||||
|
||||
@@ -39,7 +39,7 @@ func NewNode(ms MessageStore, st Transport, nextEpoch calculateNextEpoch, id Pee
|
||||
syncState: newSyncState(),
|
||||
sharing: make(map[GroupID][]PeerId),
|
||||
peers: make(map[GroupID][]PeerId),
|
||||
payloads: Payloads{payloads: make(map[GroupID]map[PeerId]Payload)},
|
||||
payloads: payloads{payloads: make(map[GroupID]map[PeerId]Payload)},
|
||||
nextEpoch: nextEpoch,
|
||||
ID: id,
|
||||
epoch: 0,
|
||||
@@ -125,6 +125,16 @@ func (n *Node) Share(group GroupID, id PeerId) {
|
||||
n.sharing[group] = append(n.sharing[group], id)
|
||||
}
|
||||
|
||||
func (n Node) IsPeerInGroup(g GroupID, p PeerId) bool {
|
||||
for _, peer := range n.sharing[g] {
|
||||
if peer == p {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (n *Node) sendMessages() {
|
||||
n.syncState.Map(func(g GroupID, m MessageID, p PeerId, s state) state {
|
||||
if s.SendEpoch < n.epoch || !n.IsPeerInGroup(g, p) {
|
||||
@@ -261,16 +271,6 @@ func (n Node) updateSendEpoch(s state) state {
|
||||
return s
|
||||
}
|
||||
|
||||
func (n Node) IsPeerInGroup(g GroupID, p PeerId) bool {
|
||||
for _, peer := range n.sharing[g] {
|
||||
if peer == p {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func toMessageID(b []byte) MessageID {
|
||||
var id MessageID
|
||||
copy(id[:], b)
|
||||
|
||||
119
node_test.go
119
node_test.go
@@ -1,119 +0,0 @@
|
||||
package mvds
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
"crypto/rand"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestOnRequest(t *testing.T) {
|
||||
m := randomMessageId()
|
||||
g := GroupID{}
|
||||
|
||||
r := Request{}
|
||||
r.Id = append(r.Id, m[:])
|
||||
|
||||
n := getNodeForMessageHandlerTest()
|
||||
|
||||
p := randomPeerId()
|
||||
n.onRequest(g, p, r)
|
||||
|
||||
if n.syncState.Get(g, m, p).RequestFlag != true {
|
||||
t.Errorf("did not set Request flag to true")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOnAck(t *testing.T) {
|
||||
m := randomMessageId()
|
||||
g := GroupID{}
|
||||
|
||||
a := Ack{}
|
||||
a.Id = append(a.Id, m[:])
|
||||
|
||||
n := getNodeForMessageHandlerTest()
|
||||
|
||||
p := randomPeerId()
|
||||
n.onAck(g, p, a)
|
||||
|
||||
if n.syncState.Get(g, m, p).HoldFlag != true {
|
||||
t.Errorf("did not set Hold flag to true")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOnOffer(t *testing.T) {
|
||||
m := randomMessageId()
|
||||
g := GroupID{}
|
||||
|
||||
o := Offer{}
|
||||
o.Id = append(o.Id, m[:])
|
||||
|
||||
n := getNodeForMessageHandlerTest()
|
||||
|
||||
p := randomPeerId()
|
||||
n.onOffer(g, p, o)
|
||||
|
||||
if n.syncState.Get(g, m, p).HoldFlag != true {
|
||||
t.Errorf("did not set Hold flag to true")
|
||||
}
|
||||
|
||||
if n.offeredMessages[g][p][0] != m {
|
||||
t.Errorf("message was not added to offered list")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOnMessage(t *testing.T) {
|
||||
n := getNodeForMessageHandlerTest()
|
||||
g := GroupID{}
|
||||
|
||||
ds := NewDummyStore()
|
||||
n.store = &ds
|
||||
|
||||
id := randomMessageId()
|
||||
|
||||
m := Message{
|
||||
GroupId: id[:],
|
||||
Timestamp: time.Now().Unix(),
|
||||
Body: []byte("hello world"),
|
||||
}
|
||||
|
||||
p := randomPeerId()
|
||||
|
||||
n.onMessage(g, p, m)
|
||||
|
||||
sm, _ := n.store.Get(m.ID())
|
||||
if !reflect.DeepEqual(sm, m) {
|
||||
t.Errorf("message was not stored correctly")
|
||||
}
|
||||
|
||||
s := n.state(g, m.ID(), p)
|
||||
|
||||
if s.HoldFlag != true || s.AckFlag != true {
|
||||
t.Errorf("did not set flags")
|
||||
}
|
||||
}
|
||||
|
||||
func getNodeForMessageHandlerTest() Node {
|
||||
n := Node{}
|
||||
n.syncState = syncState{state: make(map[GroupID]map[MessageID]map[PeerId]*State)}
|
||||
n.offeredMessages = make(map[GroupID]map[PeerId][]MessageID)
|
||||
n.ID = randomPeerId()
|
||||
return n
|
||||
}
|
||||
|
||||
func randomMessageId() MessageID {
|
||||
bytes := make([]byte, 32)
|
||||
rand.Read(bytes)
|
||||
|
||||
id := MessageID{}
|
||||
copy(id[:], bytes)
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
func randomPeerId() PeerId {
|
||||
key, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
|
||||
return PeerId(key.PublicKey)
|
||||
}
|
||||
43
payloads.go
43
payloads.go
@@ -4,29 +4,14 @@ import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Payloads struct {
|
||||
type payloads struct {
|
||||
sync.Mutex
|
||||
|
||||
payloads map[GroupID]map[PeerId]Payload
|
||||
}
|
||||
|
||||
func (p *Payloads) get(id GroupID, peerId PeerId) Payload {
|
||||
payload, _ := p.payloads[id][peerId]
|
||||
return payload
|
||||
}
|
||||
|
||||
func (p *Payloads) set(id GroupID, peerId PeerId, payload Payload) {
|
||||
_, ok := p.payloads[id]
|
||||
if !ok {
|
||||
p.payloads[id] = make(map[PeerId]Payload)
|
||||
}
|
||||
|
||||
p.payloads[id][peerId] = payload
|
||||
}
|
||||
|
||||
// @todo check in all the functions below that we aren't duplicating stuff
|
||||
|
||||
func (p *Payloads) AddOffers(group GroupID, peer PeerId, offers ...[]byte) {
|
||||
func (p *payloads) AddOffers(group GroupID, peer PeerId, offers ...[]byte) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
@@ -40,7 +25,7 @@ func (p *Payloads) AddOffers(group GroupID, peer PeerId, offers ...[]byte) {
|
||||
p.set(group, peer, payload)
|
||||
}
|
||||
|
||||
func (p *Payloads) AddAcks(group GroupID, peer PeerId, acks ...[]byte) {
|
||||
func (p *payloads) AddAcks(group GroupID, peer PeerId, acks ...[]byte) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
@@ -54,7 +39,7 @@ func (p *Payloads) AddAcks(group GroupID, peer PeerId, acks ...[]byte) {
|
||||
p.set(group, peer, payload)
|
||||
}
|
||||
|
||||
func (p *Payloads) AddRequests(group GroupID, peer PeerId, request ...[]byte) {
|
||||
func (p *payloads) AddRequests(group GroupID, peer PeerId, request ...[]byte) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
@@ -68,7 +53,7 @@ func (p *Payloads) AddRequests(group GroupID, peer PeerId, request ...[]byte) {
|
||||
p.set(group, peer, payload)
|
||||
}
|
||||
|
||||
func (p *Payloads) AddMessages(group GroupID, peer PeerId, messages ...*Message) {
|
||||
func (p *payloads) AddMessages(group GroupID, peer PeerId, messages ...*Message) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
@@ -81,7 +66,7 @@ func (p *Payloads) AddMessages(group GroupID, peer PeerId, messages ...*Message)
|
||||
p.set(group, peer, payload)
|
||||
}
|
||||
|
||||
func (p *Payloads) Map(f func(GroupID, PeerId, Payload)) {
|
||||
func (p *payloads) Map(f func(GroupID, PeerId, Payload)) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
@@ -94,9 +79,23 @@ func (p *Payloads) Map(f func(GroupID, PeerId, Payload)) {
|
||||
p.payloads = make(map[GroupID]map[PeerId]Payload)
|
||||
}
|
||||
|
||||
func (p *Payloads) RemoveAll() {
|
||||
func (p *payloads) RemoveAll() {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
p.payloads = make(map[GroupID]map[PeerId]Payload)
|
||||
}
|
||||
|
||||
func (p *payloads) get(id GroupID, peerId PeerId) Payload {
|
||||
payload, _ := p.payloads[id][peerId]
|
||||
return payload
|
||||
}
|
||||
|
||||
func (p *payloads) set(id GroupID, peerId PeerId, payload Payload) {
|
||||
_, ok := p.payloads[id]
|
||||
if !ok {
|
||||
p.payloads[id] = make(map[PeerId]Payload)
|
||||
}
|
||||
|
||||
p.payloads[id][peerId] = payload
|
||||
}
|
||||
|
||||
60
state.go
60
state.go
@@ -1,63 +1,13 @@
|
||||
package mvds
|
||||
|
||||
import "sync"
|
||||
|
||||
type state struct {
|
||||
SendCount uint64
|
||||
SendEpoch int64
|
||||
}
|
||||
|
||||
type syncState struct {
|
||||
sync.Mutex
|
||||
|
||||
state map[GroupID]map[MessageID]map[PeerId]state
|
||||
}
|
||||
|
||||
func newSyncState() syncState {
|
||||
return syncState{
|
||||
state: make(map[GroupID]map[MessageID]map[PeerId]state),
|
||||
}
|
||||
}
|
||||
|
||||
func (s syncState) Get(group GroupID, id MessageID, sender PeerId) state {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
state, _ := s.state[group][id][sender]
|
||||
return state
|
||||
}
|
||||
|
||||
func (s *syncState) Set(group GroupID, id MessageID, sender PeerId, newState state) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
if _, ok := s.state[group]; !ok {
|
||||
s.state[group] = make(map[MessageID]map[PeerId]state)
|
||||
}
|
||||
|
||||
if _, ok := s.state[group][id]; !ok {
|
||||
s.state[group][id] = make(map[PeerId]state)
|
||||
}
|
||||
|
||||
s.state[group][id][sender] = newState
|
||||
}
|
||||
|
||||
func (s *syncState) Remove(group GroupID, id MessageID, sender PeerId) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
delete(s.state[group][id], sender)
|
||||
}
|
||||
|
||||
func (s *syncState) Map(process func(g GroupID, m MessageID, p PeerId, s state) state) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
for group, syncstate := range s.state {
|
||||
for id, peers := range syncstate {
|
||||
for peer, state := range peers {
|
||||
s.state[group][id][peer] = process(group, id, peer, state)
|
||||
}
|
||||
}
|
||||
}
|
||||
type syncState interface {
|
||||
Get(group GroupID, id MessageID, sender PeerId) state
|
||||
Set(group GroupID, id MessageID, sender PeerId, newState state)
|
||||
Remove(group GroupID, id MessageID, sender PeerId)
|
||||
Map(process func(g GroupID, m MessageID, p PeerId, s state) state)
|
||||
}
|
||||
|
||||
58
state_memory.go
Normal file
58
state_memory.go
Normal file
@@ -0,0 +1,58 @@
|
||||
package mvds
|
||||
|
||||
import "sync"
|
||||
|
||||
type memorySyncState struct {
|
||||
sync.Mutex
|
||||
|
||||
state map[GroupID]map[MessageID]map[PeerId]state
|
||||
}
|
||||
|
||||
func newSyncState() *memorySyncState {
|
||||
return &memorySyncState{
|
||||
state: make(map[GroupID]map[MessageID]map[PeerId]state),
|
||||
}
|
||||
}
|
||||
|
||||
func (s memorySyncState) Get(group GroupID, id MessageID, sender PeerId) state {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
state, _ := s.state[group][id][sender]
|
||||
return state
|
||||
}
|
||||
|
||||
func (s *memorySyncState) Set(group GroupID, id MessageID, sender PeerId, newState state) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
if _, ok := s.state[group]; !ok {
|
||||
s.state[group] = make(map[MessageID]map[PeerId]state)
|
||||
}
|
||||
|
||||
if _, ok := s.state[group][id]; !ok {
|
||||
s.state[group][id] = make(map[PeerId]state)
|
||||
}
|
||||
|
||||
s.state[group][id][sender] = newState
|
||||
}
|
||||
|
||||
func (s *memorySyncState) Remove(group GroupID, id MessageID, sender PeerId) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
delete(s.state[group][id], sender)
|
||||
}
|
||||
|
||||
func (s *memorySyncState) Map(process func(g GroupID, m MessageID, p PeerId, s state) state) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
for group, syncstate := range s.state {
|
||||
for id, peers := range syncstate {
|
||||
for peer, state := range peers {
|
||||
s.state[group][id][peer] = process(group, id, peer, state)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user