mirror of
https://github.com/vacp2p/go-transport-experiment.git
synced 2026-01-06 22:13:53 -05:00
216 lines
5.0 KiB
Go
216 lines
5.0 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"math/rand/v2"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/network"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/core/peerstore"
|
|
"github.com/libp2p/go-msgio/pbio"
|
|
"github.com/multiformats/go-multiaddr"
|
|
)
|
|
|
|
const customProtocolID = "/custom/1.0.0"
|
|
|
|
type CustomProtocol struct {
|
|
host host.Host
|
|
|
|
mutex sync.Mutex
|
|
hasConn bool
|
|
origin multiaddr.Multiaddr
|
|
|
|
parentID peer.ID
|
|
C chan *Payload
|
|
listOfCustomNodes []multiaddr.Multiaddr
|
|
}
|
|
|
|
func NewCustomProtocol(host host.Host, listOfCustomNodes []multiaddr.Multiaddr) (*CustomProtocol, error) {
|
|
for _, x := range listOfCustomNodes {
|
|
peerID, err := GetPeerID(x)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
host.Peerstore().AddAddrs(peerID, []multiaddr.Multiaddr{x}, peerstore.PermanentAddrTTL)
|
|
}
|
|
return &CustomProtocol{
|
|
host: host,
|
|
C: make(chan *Payload),
|
|
listOfCustomNodes: listOfCustomNodes,
|
|
}, nil
|
|
}
|
|
|
|
func (m *CustomProtocol) Conn() *CustomConn {
|
|
for {
|
|
if m.hasConn {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
return &CustomConn{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
myCustomNode: m,
|
|
remotePeer: m.origin,
|
|
}
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func (m *CustomProtocol) setParentID(p peer.ID) {
|
|
m.parentID = p
|
|
}
|
|
|
|
func (m *CustomProtocol) GetRandomPath(destination multiaddr.Multiaddr) []string {
|
|
if m.parentID == "" {
|
|
panic("parentID not set")
|
|
}
|
|
|
|
rand.Shuffle(len(m.listOfCustomNodes), func(i, j int) {
|
|
m.listOfCustomNodes[i], m.listOfCustomNodes[j] = m.listOfCustomNodes[j], m.listOfCustomNodes[i]
|
|
})
|
|
|
|
fmt.Println("DESTINATION", destination)
|
|
|
|
// Remove both the origin and destination
|
|
var filteredSlice []multiaddr.Multiaddr
|
|
destinationPeerIdStr, _ := destination.ValueForProtocol(P_CUSTOM)
|
|
destinationPeerId, _ := peer.Decode(destinationPeerIdStr)
|
|
for _, v := range m.listOfCustomNodes {
|
|
vpeerID, _ := GetPeerID(v)
|
|
if vpeerID == m.host.ID() || vpeerID == destinationPeerId {
|
|
continue
|
|
}
|
|
filteredSlice = append(filteredSlice, v)
|
|
}
|
|
|
|
// Return random 3 nodes
|
|
n := 1
|
|
if len(filteredSlice) < n {
|
|
panic("More nodes required!!!")
|
|
}
|
|
|
|
destinationStrParts := strings.Split(destination.String(), "/p2p")
|
|
destinationStr := strings.Replace(destinationStrParts[0], "/custom", "/p2p", -1)
|
|
destinationMa, _ := multiaddr.NewMultiaddr(destinationStr)
|
|
|
|
resultMa := append(filteredSlice[:n], destinationMa)
|
|
result := make([]string, len(resultMa))
|
|
for i, addr := range resultMa {
|
|
result[i] = addr.String()
|
|
}
|
|
|
|
fmt.Println("THE RANDOM PATH: ", result)
|
|
|
|
return result
|
|
}
|
|
|
|
func (m *CustomProtocol) Start(ctx context.Context) {
|
|
|
|
m.host.SetStreamHandler(customProtocolID, func(s network.Stream) {
|
|
fmt.Println("I am ", m.host.ID(), "and received request from ", s.Conn().RemotePeer())
|
|
reader := pbio.NewDelimitedReader(s, math.MaxInt32)
|
|
|
|
message := &Message{}
|
|
reader.ReadMsg(message)
|
|
|
|
if len(message.Path) > 0 {
|
|
nextNode := message.Path[0]
|
|
nextNodeMA, err := multiaddr.NewMultiaddr(nextNode)
|
|
if err != nil {
|
|
fmt.Println("ERROR building peerID!!!", err)
|
|
s.Reset()
|
|
return
|
|
}
|
|
|
|
peerID, err := GetPeerID(nextNodeMA)
|
|
if err != nil {
|
|
fmt.Println("ERROR obtaining peerID!!!", err)
|
|
s.Reset()
|
|
return
|
|
}
|
|
|
|
m.host.Peerstore().AddAddr(peerID, nextNodeMA, peerstore.PermanentAddrTTL)
|
|
|
|
message := &Message{
|
|
Path: message.Path[1:], // Remove current node from the path
|
|
Payload: message.Payload,
|
|
}
|
|
|
|
err = m.Send(ctx, nextNodeMA.String(), message)
|
|
if err != nil {
|
|
fmt.Println("ERROR sending message", err)
|
|
s.Reset()
|
|
return
|
|
}
|
|
} else {
|
|
fmt.Println("PUSHING TO CHANNEL IN", m.host.ID(), string(message.Payload.Payload))
|
|
|
|
m.mutex.Lock()
|
|
if !m.hasConn {
|
|
m.origin, _ = multiaddr.NewMultiaddr(message.Payload.Origin)
|
|
m.hasConn = true
|
|
}
|
|
m.mutex.Unlock()
|
|
|
|
m.C <- message.Payload
|
|
|
|
fmt.Println("PUSHED TO CHANNEL IN", m.host.ID())
|
|
}
|
|
|
|
s.Close()
|
|
})
|
|
}
|
|
|
|
func (m *CustomProtocol) Write(stream network.Stream, message *Message) error {
|
|
writer := pbio.NewDelimitedWriter(stream)
|
|
err := writer.WriteMsg(message)
|
|
if err != nil {
|
|
stream.Reset()
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *CustomProtocol) Send(ctx context.Context, nextHopStr string, message *Message) error {
|
|
nextHop, err := multiaddr.NewMultiaddr(nextHopStr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
peerID, err := GetPeerID(nextHop)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
fmt.Println("Send: I am", m.host.ID())
|
|
fmt.Println("Send: the next hop:", nextHop)
|
|
|
|
stream, err := m.host.NewStream(ctx, peerID, customProtocolID)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
writer := pbio.NewDelimitedWriter(stream)
|
|
|
|
message = &Message{
|
|
Path: message.Path, // Remove current node from the path
|
|
Payload: message.Payload,
|
|
}
|
|
|
|
fmt.Println("Send: Now writing msg to", peerID)
|
|
err = writer.WriteMsg(message)
|
|
if err != nil {
|
|
stream.Reset()
|
|
return err
|
|
}
|
|
|
|
fmt.Println("Closing in ", m.host.ID())
|
|
return stream.Close()
|
|
}
|