Files
2024-12-04 14:21:50 -04:00

216 lines
5.0 KiB
Go

package main
import (
"context"
"fmt"
"math"
"math/rand/v2"
"strings"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-msgio/pbio"
"github.com/multiformats/go-multiaddr"
)
const customProtocolID = "/custom/1.0.0"
type CustomProtocol struct {
host host.Host
mutex sync.Mutex
hasConn bool
origin multiaddr.Multiaddr
parentID peer.ID
C chan *Payload
listOfCustomNodes []multiaddr.Multiaddr
}
func NewCustomProtocol(host host.Host, listOfCustomNodes []multiaddr.Multiaddr) (*CustomProtocol, error) {
for _, x := range listOfCustomNodes {
peerID, err := GetPeerID(x)
if err != nil {
return nil, err
}
host.Peerstore().AddAddrs(peerID, []multiaddr.Multiaddr{x}, peerstore.PermanentAddrTTL)
}
return &CustomProtocol{
host: host,
C: make(chan *Payload),
listOfCustomNodes: listOfCustomNodes,
}, nil
}
func (m *CustomProtocol) Conn() *CustomConn {
for {
if m.hasConn {
ctx, cancel := context.WithCancel(context.Background())
return &CustomConn{
ctx: ctx,
cancel: cancel,
myCustomNode: m,
remotePeer: m.origin,
}
}
time.Sleep(100 * time.Millisecond)
}
}
func (m *CustomProtocol) setParentID(p peer.ID) {
m.parentID = p
}
func (m *CustomProtocol) GetRandomPath(destination multiaddr.Multiaddr) []string {
if m.parentID == "" {
panic("parentID not set")
}
rand.Shuffle(len(m.listOfCustomNodes), func(i, j int) {
m.listOfCustomNodes[i], m.listOfCustomNodes[j] = m.listOfCustomNodes[j], m.listOfCustomNodes[i]
})
fmt.Println("DESTINATION", destination)
// Remove both the origin and destination
var filteredSlice []multiaddr.Multiaddr
destinationPeerIdStr, _ := destination.ValueForProtocol(P_CUSTOM)
destinationPeerId, _ := peer.Decode(destinationPeerIdStr)
for _, v := range m.listOfCustomNodes {
vpeerID, _ := GetPeerID(v)
if vpeerID == m.host.ID() || vpeerID == destinationPeerId {
continue
}
filteredSlice = append(filteredSlice, v)
}
// Return random 3 nodes
n := 1
if len(filteredSlice) < n {
panic("More nodes required!!!")
}
destinationStrParts := strings.Split(destination.String(), "/p2p")
destinationStr := strings.Replace(destinationStrParts[0], "/custom", "/p2p", -1)
destinationMa, _ := multiaddr.NewMultiaddr(destinationStr)
resultMa := append(filteredSlice[:n], destinationMa)
result := make([]string, len(resultMa))
for i, addr := range resultMa {
result[i] = addr.String()
}
fmt.Println("THE RANDOM PATH: ", result)
return result
}
func (m *CustomProtocol) Start(ctx context.Context) {
m.host.SetStreamHandler(customProtocolID, func(s network.Stream) {
fmt.Println("I am ", m.host.ID(), "and received request from ", s.Conn().RemotePeer())
reader := pbio.NewDelimitedReader(s, math.MaxInt32)
message := &Message{}
reader.ReadMsg(message)
if len(message.Path) > 0 {
nextNode := message.Path[0]
nextNodeMA, err := multiaddr.NewMultiaddr(nextNode)
if err != nil {
fmt.Println("ERROR building peerID!!!", err)
s.Reset()
return
}
peerID, err := GetPeerID(nextNodeMA)
if err != nil {
fmt.Println("ERROR obtaining peerID!!!", err)
s.Reset()
return
}
m.host.Peerstore().AddAddr(peerID, nextNodeMA, peerstore.PermanentAddrTTL)
message := &Message{
Path: message.Path[1:], // Remove current node from the path
Payload: message.Payload,
}
err = m.Send(ctx, nextNodeMA.String(), message)
if err != nil {
fmt.Println("ERROR sending message", err)
s.Reset()
return
}
} else {
fmt.Println("PUSHING TO CHANNEL IN", m.host.ID(), string(message.Payload.Payload))
m.mutex.Lock()
if !m.hasConn {
m.origin, _ = multiaddr.NewMultiaddr(message.Payload.Origin)
m.hasConn = true
}
m.mutex.Unlock()
m.C <- message.Payload
fmt.Println("PUSHED TO CHANNEL IN", m.host.ID())
}
s.Close()
})
}
func (m *CustomProtocol) Write(stream network.Stream, message *Message) error {
writer := pbio.NewDelimitedWriter(stream)
err := writer.WriteMsg(message)
if err != nil {
stream.Reset()
return err
}
return nil
}
func (m *CustomProtocol) Send(ctx context.Context, nextHopStr string, message *Message) error {
nextHop, err := multiaddr.NewMultiaddr(nextHopStr)
if err != nil {
return err
}
peerID, err := GetPeerID(nextHop)
if err != nil {
return err
}
fmt.Println("Send: I am", m.host.ID())
fmt.Println("Send: the next hop:", nextHop)
stream, err := m.host.NewStream(ctx, peerID, customProtocolID)
if err != nil {
return nil
}
writer := pbio.NewDelimitedWriter(stream)
message = &Message{
Path: message.Path, // Remove current node from the path
Payload: message.Payload,
}
fmt.Println("Send: Now writing msg to", peerID)
err = writer.WriteMsg(message)
if err != nil {
stream.Reset()
return err
}
fmt.Println("Closing in ", m.host.ID())
return stream.Close()
}