mirror of
https://github.com/noot/try-dandelion.git
synced 2026-01-10 14:28:08 -05:00
352 lines
6.4 KiB
Go
352 lines
6.4 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"math/big"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p"
|
|
libp2phost "github.com/libp2p/go-libp2p-core/host"
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
"github.com/libp2p/go-libp2p-pubsub"
|
|
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
|
|
logging "github.com/ipfs/go-log"
|
|
"github.com/urfave/cli/v2"
|
|
)
|
|
|
|
const (
|
|
protocolID = "/dandelion"
|
|
defaultKeyFile = "net.key"
|
|
|
|
testTopic = "test"
|
|
)
|
|
|
|
var log = logging.Logger("main")
|
|
|
|
var (
|
|
flagCount = "count"
|
|
flagDuration = "duration"
|
|
flagDandelion = "dandelion"
|
|
|
|
app = &cli.App{
|
|
Name: "try-dandelion",
|
|
Usage: "try out libp2p nodes with dandelion++ gossip",
|
|
Action: run,
|
|
EnableBashCompletion: true,
|
|
Suggest: true,
|
|
Flags: []cli.Flag{
|
|
&cli.UintFlag{
|
|
Name: flagCount,
|
|
Usage: "number of nodes to run",
|
|
Value: 10,
|
|
},
|
|
&cli.UintFlag{
|
|
Name: flagDuration,
|
|
Usage: "length of time to run simulation in seconds",
|
|
Value: 60,
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: flagDandelion,
|
|
Usage: "whether to run with dandelion or normal gossipsub",
|
|
Value: false,
|
|
},
|
|
},
|
|
}
|
|
)
|
|
|
|
func main() {
|
|
if err := app.Run(os.Args); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
}
|
|
|
|
func run(c *cli.Context) error {
|
|
_ = logging.SetLogLevel("main", "info")
|
|
|
|
const basePort = 6000
|
|
|
|
bootnodes := []peer.AddrInfo{}
|
|
hosts := []*host{}
|
|
|
|
count := int(c.Uint(flagCount))
|
|
mlt := newMessageLatencyTracker(count)
|
|
|
|
for i := 0; i < count; i++ {
|
|
log.Infof("starting node %d", i)
|
|
cfg := &config{
|
|
Ctx: context.Background(),
|
|
Port: uint16(basePort + i),
|
|
Bootnodes: bootnodes,
|
|
Index: i,
|
|
Tracker: mlt,
|
|
dandelion: c.Bool(flagDandelion),
|
|
}
|
|
|
|
h, err := NewHost(cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = h.Start()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
time.Sleep(time.Millisecond * 200)
|
|
log.Infof("node %d started: %s", i, h.addrInfo())
|
|
bootnodes = append(bootnodes, h.addrInfo())
|
|
hosts = append(hosts, h)
|
|
}
|
|
|
|
duration, err := time.ParseDuration(fmt.Sprintf("%ds", c.Uint(flagDuration)))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
<-time.After(duration)
|
|
|
|
for _, h := range hosts {
|
|
err := h.Stop()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
mlt.log()
|
|
return nil
|
|
}
|
|
|
|
type config struct {
|
|
Ctx context.Context
|
|
Port uint16
|
|
KeyFile string
|
|
Bootnodes []peer.AddrInfo
|
|
Index int
|
|
Tracker *messageLatencyTracker
|
|
dandelion bool
|
|
}
|
|
|
|
type host struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
h libp2phost.Host
|
|
bootnodes []peer.AddrInfo
|
|
topics map[string]*pubsub.Topic
|
|
tracker *messageLatencyTracker
|
|
}
|
|
|
|
func NewHost(cfg *config) (*host, error) {
|
|
if cfg.KeyFile == "" {
|
|
cfg.KeyFile = fmt.Sprintf("node-%d.key", cfg.Index)
|
|
}
|
|
|
|
key, err := loadKey(cfg.KeyFile)
|
|
if err != nil {
|
|
log.Infof("failed to load libp2p key, generating key %s...", cfg.KeyFile)
|
|
key, err = generateKey(0, cfg.KeyFile)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
addr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", cfg.Port))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
opts := []libp2p.Option{
|
|
libp2p.ListenAddrs(addr),
|
|
libp2p.Identity(key),
|
|
libp2p.NATPortMap(),
|
|
}
|
|
|
|
h, err := libp2p.New(opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
psOpts := []pubsub.Option{
|
|
pubsub.WithMessageIdFn(messageIDFn),
|
|
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
|
|
}
|
|
|
|
if cfg.dandelion {
|
|
psOpts = append(psOpts, pubsub.WithDandelion())
|
|
}
|
|
|
|
ps, err := pubsub.NewGossipSub(cfg.Ctx, h, psOpts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
topic, err := ps.Join(testTopic)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
topics := map[string]*pubsub.Topic{
|
|
testTopic: topic,
|
|
}
|
|
|
|
ourCtx, cancel := context.WithCancel(cfg.Ctx)
|
|
return &host{
|
|
ctx: ourCtx,
|
|
cancel: cancel,
|
|
h: h,
|
|
bootnodes: cfg.Bootnodes,
|
|
topics: topics,
|
|
tracker: cfg.Tracker,
|
|
}, nil
|
|
}
|
|
|
|
func messageIDFn(pmsg *pb.Message) string {
|
|
return getMessageIDFromData(pmsg.Data)
|
|
}
|
|
|
|
func getMessageIDFromData(data []byte) string {
|
|
h := sha256.Sum256(data)
|
|
return hex.EncodeToString(h[:])
|
|
}
|
|
|
|
func (h *host) addrInfo() peer.AddrInfo {
|
|
return peer.AddrInfo{
|
|
ID: h.h.ID(),
|
|
Addrs: h.h.Addrs(),
|
|
}
|
|
}
|
|
|
|
func (h *host) Start() error {
|
|
err := h.bootstrap()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for topic := range h.topics {
|
|
go func() {
|
|
err := h.receive(topic)
|
|
if err != nil {
|
|
log.Warnf("receive loop exiting: %s", err)
|
|
return
|
|
}
|
|
}()
|
|
}
|
|
|
|
randDuration, err := rand.Int(rand.Reader, big.NewInt(20))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ticker := time.NewTicker(time.Second * time.Duration(20+randDuration.Int64()))
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-h.ctx.Done():
|
|
ticker.Stop()
|
|
return
|
|
case <-ticker.C:
|
|
ok := true
|
|
go func() {
|
|
err := h.publishRandomMessages()
|
|
if err != nil {
|
|
log.Warnf("send loop exiting: %s", err)
|
|
ok = false
|
|
return
|
|
}
|
|
}()
|
|
if !ok {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *host) Stop() error {
|
|
h.cancel()
|
|
if err := h.h.Close(); err != nil {
|
|
return fmt.Errorf("failed to close libp2p host: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *host) publishRandomMessages() error {
|
|
const messageLength = 16
|
|
for topic := range h.topics {
|
|
msg := make([]byte, messageLength)
|
|
_, err := rand.Read(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = h.publish(topic, msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// bootstrap connects the host to the configured bootnodes
|
|
func (h *host) bootstrap() error {
|
|
failed := 0
|
|
for _, addrInfo := range h.bootnodes {
|
|
log.Debugf("bootstrapping to peer: peer=%s", addrInfo.ID)
|
|
err := h.h.Connect(h.ctx, addrInfo)
|
|
if err != nil {
|
|
log.Debugf("failed to bootstrap to peer: err=%s", err)
|
|
failed++
|
|
}
|
|
}
|
|
|
|
if failed == len(h.bootnodes) && len(h.bootnodes) != 0 {
|
|
return errFailedToBootstrap
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *host) publish(topic string, msg []byte) error {
|
|
t, has := h.topics[topic]
|
|
if !has {
|
|
return errNoTopic
|
|
}
|
|
|
|
id := getMessageIDFromData(msg)
|
|
log.Infof("sent message on topic %s: id %s", topic, id)
|
|
h.tracker.logSent(id)
|
|
return t.Publish(h.ctx, msg)
|
|
}
|
|
|
|
func (h *host) receive(topic string) error {
|
|
t, has := h.topics[topic]
|
|
if !has {
|
|
return errNoTopic
|
|
}
|
|
|
|
sub, err := t.Subscribe()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
msg, err := sub.Next(h.ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Infof("got message on topic %s: id %s", topic, msg.ID)
|
|
h.tracker.logReceived(msg.ID)
|
|
}
|
|
|
|
return nil
|
|
}
|