mirror of
https://github.com/vacp2p/mix.git
synced 2026-01-08 04:53:55 -05:00
177 lines
5.4 KiB
Nim
177 lines
5.4 KiB
Nim
import chronicles, sequtils, strutils, chronos, results
|
|
import std/[enumerate, strformat]
|
|
import ../mix
|
|
import libp2p
|
|
import
|
|
libp2p/[
|
|
crypto/secp,
|
|
protocols/pubsub/gossipsub,
|
|
protocols/pubsub/pubsubpeer,
|
|
protocols/pubsub/rpc/messages,
|
|
]
|
|
import ./poc_gossipsub_utils
|
|
|
|
type Node = tuple[switch: Switch, gossip: GossipSub, mix: MixProtocol, id: int]
|
|
|
|
proc createSwitch(libp2pPrivKey: SkPrivateKey, multiAddr: MultiAddress): Switch =
|
|
result = SwitchBuilder
|
|
.new()
|
|
.withPrivateKey(PrivateKey(scheme: Secp256k1, skkey: libp2pPrivKey))
|
|
.withAddress(multiAddr)
|
|
.withRng(crypto.newRng())
|
|
.withYamux()
|
|
.withTcpTransport()
|
|
.withNoise()
|
|
.build()
|
|
|
|
proc connectNodesTCP(nodes: seq[Node]) {.async.} =
|
|
for index, node in nodes:
|
|
for otherNodeIdx in index - 1 .. index + 2:
|
|
if otherNodeIdx notin 0 ..< nodes.len or otherNodeIdx == index:
|
|
continue
|
|
let
|
|
otherNode = nodes[otherNodeIdx]
|
|
tcpAddr = otherNode.switch.peerInfo.addrs.filterIt(TCP.match(it))
|
|
|
|
if tcpAddr.len > 0:
|
|
try:
|
|
await node.switch.connect(otherNode.switch.peerInfo.peerId, tcpAddr)
|
|
except CatchableError as e:
|
|
warn "Failed to connect nodes", src = index, dst = otherNodeIdx, error = e.msg
|
|
|
|
proc setUpNodes(numNodes: int): seq[Switch] =
|
|
# This is not actually GC-safe
|
|
{.gcsafe.}:
|
|
# Initialize mix nodes
|
|
let mixNodes = initializeMixNodes(numNodes).valueOr:
|
|
error "Could not initialize mixnodes", err = error
|
|
return
|
|
|
|
var nodes: seq[Switch] = @[]
|
|
|
|
for index, node in enumerate(mixNodes):
|
|
# Write public info of all mix nodes
|
|
let nodePubInfo = mixNodes.getMixPubInfoByIndex(index).valueOr:
|
|
error "Get mix pub info by index error", err = error
|
|
continue
|
|
|
|
let writePubRes = writeMixPubInfoToFile(nodePubInfo, index)
|
|
if writePubRes.isErr:
|
|
error "Failed to write pub info to file", nodeIndex = index
|
|
continue
|
|
|
|
# Write info of all mix nodes
|
|
let writeNodeRes = writeMixNodeInfoToFile(node, index)
|
|
if writeNodeRes.isErr:
|
|
error "Failed to write mix info to file", nodeIndex = index
|
|
continue
|
|
|
|
# Extract private key and multiaddress
|
|
let (multiAddrStr, _, _, _, libp2pPrivKey) = getMixNodeInfo(node)
|
|
|
|
let multiAddr = MultiAddress.init(multiAddrStr.split("/p2p/")[0]).valueOr:
|
|
error "Failed to initialize MultiAddress", err = error
|
|
return
|
|
|
|
# Create switch
|
|
let switch = createSwitch(libp2pPrivKey, multiAddr)
|
|
if not switch.isNil:
|
|
nodes.add(switch)
|
|
else:
|
|
warn "Failed to set up node", nodeIndex = index
|
|
|
|
return nodes
|
|
|
|
proc oneNode(node: Node) {.async.} =
|
|
node.gossip.addValidator(
|
|
["message"],
|
|
proc(topic: string, message: Message): Future[ValidationResult] {.async.} =
|
|
return ValidationResult.Accept,
|
|
)
|
|
|
|
if node.id == 0:
|
|
let handler = proc(topic: string, data: seq[byte]) {.async.} =
|
|
info "Message received", nodeId = node.id, msg = cast[string](data)
|
|
|
|
node.gossip.subscribe("message", handler)
|
|
else:
|
|
node.gossip.subscribe("message", nil)
|
|
|
|
for msgNum in 0 ..< 5:
|
|
await sleepAsync(500.milliseconds)
|
|
let msg = fmt"Hello from Node {node.id}, Message No: {msgNum + 1}"
|
|
discard await node.gossip.publish(
|
|
"message",
|
|
cast[seq[byte]](msg),
|
|
publishParams = some(PublishParams(skipMCache: true, useCustomConn: true)),
|
|
)
|
|
|
|
await sleepAsync(1000.milliseconds)
|
|
await node.switch.stop()
|
|
|
|
proc makeMixConnCb(mixProto: MixProtocol): CustomConnCreationProc =
|
|
return proc(
|
|
destAddr: Option[MultiAddress], destPeerId: PeerId, codec: string
|
|
): Connection {.gcsafe, raises: [].} =
|
|
try:
|
|
let dest = destAddr.valueOr:
|
|
debug "No destination address available"
|
|
return
|
|
return mixProto.toConnection(MixDestination.init(destPeerId, dest), codec).get()
|
|
except CatchableError as e:
|
|
error "Error during execution of MixEntryConnection callback: ", err = e.msg
|
|
return nil
|
|
|
|
proc mixnet_gossipsub_test() {.async: (raises: [Exception]).} =
|
|
let
|
|
numberOfNodes = 5
|
|
switch = setUpNodes(numberOfNodes)
|
|
var nodes: seq[Node]
|
|
|
|
for i in 0 ..< numberOfNodes:
|
|
let mixProto = MixProtocol.new(i, numberOfNodes, switch[i]).valueOr:
|
|
error "Mix protocol initialization failed", err = error
|
|
return
|
|
|
|
let mixConnCb = makeMixConnCb(mixProto)
|
|
|
|
let mixPeerSelect = proc(
|
|
allPeers: HashSet[PubSubPeer],
|
|
directPeers: HashSet[PubSubPeer],
|
|
meshPeers: HashSet[PubSubPeer],
|
|
fanoutPeers: HashSet[PubSubPeer],
|
|
): HashSet[PubSubPeer] {.gcsafe, raises: [].} =
|
|
try:
|
|
return mixPeerSelection(allPeers, directPeers, meshPeers, fanoutPeers)
|
|
except CatchableError as e:
|
|
error "Error during execution of MixPeerSelection callback: ", err = e.msg
|
|
return initHashSet[PubSubPeer]()
|
|
|
|
let gossip = GossipSub.init(
|
|
switch = switch[i],
|
|
triggerSelf = true,
|
|
customConnCallbacks = some(
|
|
CustomConnectionCallbacks(
|
|
customConnCreationCB: mixConnCb, customPeerSelectionCB: mixPeerSelect
|
|
)
|
|
),
|
|
)
|
|
switch[i].mount(gossip)
|
|
switch[i].mount(mixProto)
|
|
await switch[i].start()
|
|
nodes.add((switch[i], gossip, mixProto, i))
|
|
|
|
await connectNodesTCP(nodes)
|
|
|
|
var allFuts: seq[Future[void]]
|
|
for node in nodes:
|
|
allFuts.add(oneNode(node))
|
|
|
|
await allFutures(allFuts)
|
|
|
|
deleteNodeInfoFolder()
|
|
deletePubInfoFolder()
|
|
|
|
when isMainModule:
|
|
waitFor(mixnet_gossipsub_test())
|