enhancement/node-stopping (#46)

* made node stoppable

* updated

* added todo

* comments

* remove/ecsda (#50)
This commit is contained in:
Dean Eigenmann
2019-06-14 09:49:56 -04:00
committed by GitHub
parent 37f5afc426
commit 4046fbed44
4 changed files with 43 additions and 26 deletions

13
main.go
View File

@@ -1,8 +1,6 @@
package main
import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"flag"
"fmt"
@@ -78,7 +76,7 @@ func main() {
}
for _, n := range nodes {
n.Run()
n.Start()
}
chat(group, nodes[:communicating-1]...)
@@ -138,8 +136,13 @@ func Calc(count uint64, epoch int64) int64 {
}
func peerID() state.PeerID {
key, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
return state.PublicKeyToPeerID(key.PublicKey)
bytes := make([]byte, 65)
rand.Read(bytes)
id := state.PeerID{}
copy(id[:], bytes)
return id
}
func groupId() state.GroupID {

View File

@@ -5,6 +5,7 @@ package node
import (
"bytes"
"context"
"fmt"
"log"
"sync/atomic"
@@ -28,6 +29,9 @@ type calculateNextEpoch func(count uint64, epoch int64) int64
// Node represents an MVDS node, it runs all the logic like sending and receiving protocol messages.
type Node struct {
ctx context.Context
cancel context.CancelFunc
store store.MessageStore
transport transport.Transport
@@ -55,7 +59,11 @@ func NewNode(
id state.PeerID,
mode Mode,
) *Node {
ctx, cancel := context.WithCancel(context.Background())
return &Node{
ctx: ctx,
cancel: cancel,
store: ms,
transport: st,
syncState: ss,
@@ -68,26 +76,43 @@ func NewNode(
}
}
// Run listens for new messages received by the node and sends out those required every epoch.
func (n *Node) Run() {
// Start listens for new messages received by the node and sends out those required every epoch.
func (n *Node) Start() {
go func() {
for {
p := n.transport.Watch()
go n.onPayload(p.Group, p.Sender, p.Payload)
select {
case <-n.ctx.Done():
log.Print("Watch stopped")
return
default:
p := n.transport.Watch()
go n.onPayload(p.Group, p.Sender, p.Payload)
}
}
}()
go func() {
for {
log.Printf("Node: %x Epoch: %d", n.ID[:4], n.epoch)
time.Sleep(1 * time.Second)
select {
case <-n.ctx.Done():
log.Print("Epoch processing stopped")
return
default:
log.Printf("Node: %x Epoch: %d", n.ID[:4], n.epoch)
time.Sleep(1 * time.Second)
n.sendMessages()
atomic.AddInt64(&n.epoch, 1)
n.sendMessages()
atomic.AddInt64(&n.epoch, 1)
}
}
}()
}
// Stop message reading and epoch processing
func (n *Node) Stop() {
n.cancel()
}
// AppendMessage sends a message to a given group.
func (n *Node) AppendMessage(group state.GroupID, data []byte) (state.MessageID, error) {
m := protobuf.Message{

View File

@@ -1,16 +1,3 @@
package state
import (
"crypto/ecdsa"
"github.com/ethereum/go-ethereum/crypto"
)
type PeerID [65]byte
// Turns an ECSDA PublicKey to a PeerID
func PublicKeyToPeerID(k ecdsa.PublicKey) PeerID {
var p PeerID
copy(p[:], crypto.FromECDSAPub(&k))
return p
}

View File

@@ -37,6 +37,8 @@ func (t *ChannelTransport) Watch() Packet {
}
func (t *ChannelTransport) Send(group state.GroupID, sender state.PeerID, peer state.PeerID, payload protobuf.Payload) error {
// @todo we can do this better, we put node onlineness into a goroutine where we just stop the nodes for x seconds
// outside of this class
math.Seed(time.Now().UnixNano())
if math.Intn(100) < t.offline {
return nil