Files
try-dandelion/main.go
2022-09-18 05:49:43 -04:00

352 lines
6.4 KiB
Go

package main
import (
"context"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"fmt"
"math/big"
"os"
"time"
"github.com/libp2p/go-libp2p"
libp2phost "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-pubsub"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
ma "github.com/multiformats/go-multiaddr"
logging "github.com/ipfs/go-log"
"github.com/urfave/cli/v2"
)
const (
protocolID = "/dandelion"
defaultKeyFile = "net.key"
testTopic = "test"
)
var log = logging.Logger("main")
var (
flagCount = "count"
flagDuration = "duration"
flagDandelion = "dandelion"
app = &cli.App{
Name: "try-dandelion",
Usage: "try out libp2p nodes with dandelion++ gossip",
Action: run,
EnableBashCompletion: true,
Suggest: true,
Flags: []cli.Flag{
&cli.UintFlag{
Name: flagCount,
Usage: "number of nodes to run",
Value: 10,
},
&cli.UintFlag{
Name: flagDuration,
Usage: "length of time to run simulation in seconds",
Value: 60,
},
&cli.BoolFlag{
Name: flagDandelion,
Usage: "whether to run with dandelion or normal gossipsub",
Value: false,
},
},
}
)
func main() {
if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
}
func run(c *cli.Context) error {
_ = logging.SetLogLevel("main", "info")
const basePort = 6000
bootnodes := []peer.AddrInfo{}
hosts := []*host{}
count := int(c.Uint(flagCount))
mlt := newMessageLatencyTracker(count)
for i := 0; i < count; i++ {
log.Infof("starting node %d", i)
cfg := &config{
Ctx: context.Background(),
Port: uint16(basePort + i),
Bootnodes: bootnodes,
Index: i,
Tracker: mlt,
dandelion: c.Bool(flagDandelion),
}
h, err := NewHost(cfg)
if err != nil {
return err
}
err = h.Start()
if err != nil {
return err
}
time.Sleep(time.Millisecond * 200)
log.Infof("node %d started: %s", i, h.addrInfo())
bootnodes = append(bootnodes, h.addrInfo())
hosts = append(hosts, h)
}
duration, err := time.ParseDuration(fmt.Sprintf("%ds", c.Uint(flagDuration)))
if err != nil {
return err
}
<-time.After(duration)
for _, h := range hosts {
err := h.Stop()
if err != nil {
return err
}
}
mlt.log()
return nil
}
type config struct {
Ctx context.Context
Port uint16
KeyFile string
Bootnodes []peer.AddrInfo
Index int
Tracker *messageLatencyTracker
dandelion bool
}
type host struct {
ctx context.Context
cancel context.CancelFunc
h libp2phost.Host
bootnodes []peer.AddrInfo
topics map[string]*pubsub.Topic
tracker *messageLatencyTracker
}
func NewHost(cfg *config) (*host, error) {
if cfg.KeyFile == "" {
cfg.KeyFile = fmt.Sprintf("node-%d.key", cfg.Index)
}
key, err := loadKey(cfg.KeyFile)
if err != nil {
log.Infof("failed to load libp2p key, generating key %s...", cfg.KeyFile)
key, err = generateKey(0, cfg.KeyFile)
if err != nil {
return nil, err
}
}
addr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", cfg.Port))
if err != nil {
return nil, err
}
opts := []libp2p.Option{
libp2p.ListenAddrs(addr),
libp2p.Identity(key),
libp2p.NATPortMap(),
}
h, err := libp2p.New(opts...)
if err != nil {
return nil, err
}
psOpts := []pubsub.Option{
pubsub.WithMessageIdFn(messageIDFn),
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
}
if cfg.dandelion {
psOpts = append(psOpts, pubsub.WithDandelion())
}
ps, err := pubsub.NewGossipSub(cfg.Ctx, h, psOpts...)
if err != nil {
return nil, err
}
topic, err := ps.Join(testTopic)
if err != nil {
return nil, err
}
topics := map[string]*pubsub.Topic{
testTopic: topic,
}
ourCtx, cancel := context.WithCancel(cfg.Ctx)
return &host{
ctx: ourCtx,
cancel: cancel,
h: h,
bootnodes: cfg.Bootnodes,
topics: topics,
tracker: cfg.Tracker,
}, nil
}
func messageIDFn(pmsg *pb.Message) string {
return getMessageIDFromData(pmsg.Data)
}
func getMessageIDFromData(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}
func (h *host) addrInfo() peer.AddrInfo {
return peer.AddrInfo{
ID: h.h.ID(),
Addrs: h.h.Addrs(),
}
}
func (h *host) Start() error {
err := h.bootstrap()
if err != nil {
return err
}
for topic := range h.topics {
go func() {
err := h.receive(topic)
if err != nil {
log.Warnf("receive loop exiting: %s", err)
return
}
}()
}
randDuration, err := rand.Int(rand.Reader, big.NewInt(20))
if err != nil {
return err
}
ticker := time.NewTicker(time.Second * time.Duration(20+randDuration.Int64()))
go func() {
for {
select {
case <-h.ctx.Done():
ticker.Stop()
return
case <-ticker.C:
ok := true
go func() {
err := h.publishRandomMessages()
if err != nil {
log.Warnf("send loop exiting: %s", err)
ok = false
return
}
}()
if !ok {
return
}
}
}
}()
return nil
}
func (h *host) Stop() error {
h.cancel()
if err := h.h.Close(); err != nil {
return fmt.Errorf("failed to close libp2p host: %w", err)
}
return nil
}
func (h *host) publishRandomMessages() error {
const messageLength = 16
for topic := range h.topics {
msg := make([]byte, messageLength)
_, err := rand.Read(msg)
if err != nil {
return err
}
err = h.publish(topic, msg)
if err != nil {
return err
}
}
return nil
}
// bootstrap connects the host to the configured bootnodes
func (h *host) bootstrap() error {
failed := 0
for _, addrInfo := range h.bootnodes {
log.Debugf("bootstrapping to peer: peer=%s", addrInfo.ID)
err := h.h.Connect(h.ctx, addrInfo)
if err != nil {
log.Debugf("failed to bootstrap to peer: err=%s", err)
failed++
}
}
if failed == len(h.bootnodes) && len(h.bootnodes) != 0 {
return errFailedToBootstrap
}
return nil
}
func (h *host) publish(topic string, msg []byte) error {
t, has := h.topics[topic]
if !has {
return errNoTopic
}
id := getMessageIDFromData(msg)
log.Infof("sent message on topic %s: id %s", topic, id)
h.tracker.logSent(id)
return t.Publish(h.ctx, msg)
}
func (h *host) receive(topic string) error {
t, has := h.topics[topic]
if !has {
return errNoTopic
}
sub, err := t.Subscribe()
if err != nil {
return err
}
for {
msg, err := sub.Next(h.ctx)
if err != nil {
return err
}
log.Infof("got message on topic %s: id %s", topic, msg.ID)
h.tracker.logReceived(msg.ID)
}
return nil
}