import chronicles, sequtils, strutils, chronos, results import std/[enumerate, strformat] import ../mix import libp2p import libp2p/[ crypto/secp, protocols/pubsub/gossipsub, protocols/pubsub/pubsubpeer, protocols/pubsub/rpc/messages, ] import ./poc_gossipsub_utils type Node = tuple[switch: Switch, gossip: GossipSub, mix: MixProtocol, id: int] proc createSwitch(libp2pPrivKey: SkPrivateKey, multiAddr: MultiAddress): Switch = result = SwitchBuilder .new() .withPrivateKey(PrivateKey(scheme: Secp256k1, skkey: libp2pPrivKey)) .withAddress(multiAddr) .withRng(crypto.newRng()) .withYamux() .withTcpTransport() .withNoise() .build() proc connectNodesTCP(nodes: seq[Node]) {.async.} = for index, node in nodes: for otherNodeIdx in index - 1 .. index + 2: if otherNodeIdx notin 0 ..< nodes.len or otherNodeIdx == index: continue let otherNode = nodes[otherNodeIdx] tcpAddr = otherNode.switch.peerInfo.addrs.filterIt(TCP.match(it)) if tcpAddr.len > 0: try: await node.switch.connect(otherNode.switch.peerInfo.peerId, tcpAddr) except CatchableError as e: warn "Failed to connect nodes", src = index, dst = otherNodeIdx, error = e.msg proc setUpNodes(numNodes: int): seq[Switch] = # This is not actually GC-safe {.gcsafe.}: # Initialize mix nodes let mixNodes = initializeMixNodes(numNodes).valueOr: error "Could not initialize mixnodes", err = error return var nodes: seq[Switch] = @[] for index, node in enumerate(mixNodes): # Write public info of all mix nodes let nodePubInfo = mixNodes.getMixPubInfoByIndex(index).valueOr: error "Get mix pub info by index error", err = error continue let writePubRes = writeMixPubInfoToFile(nodePubInfo, index) if writePubRes.isErr: error "Failed to write pub info to file", nodeIndex = index continue # Write info of all mix nodes let writeNodeRes = writeMixNodeInfoToFile(node, index) if writeNodeRes.isErr: error "Failed to write mix info to file", nodeIndex = index continue # Extract private key and multiaddress let (multiAddrStr, _, _, _, libp2pPrivKey) = getMixNodeInfo(node) let multiAddr = MultiAddress.init(multiAddrStr.split("/p2p/")[0]).valueOr: error "Failed to initialize MultiAddress", err = error return # Create switch let switch = createSwitch(libp2pPrivKey, multiAddr) if not switch.isNil: nodes.add(switch) else: warn "Failed to set up node", nodeIndex = index return nodes proc oneNode(node: Node) {.async.} = node.gossip.addValidator( ["message"], proc(topic: string, message: Message): Future[ValidationResult] {.async.} = return ValidationResult.Accept, ) if node.id == 0: let handler = proc(topic: string, data: seq[byte]) {.async.} = info "Message received", nodeId = node.id, msg = cast[string](data) node.gossip.subscribe("message", handler) else: node.gossip.subscribe("message", nil) for msgNum in 0 ..< 5: await sleepAsync(500.milliseconds) let msg = fmt"Hello from Node {node.id}, Message No: {msgNum + 1}" discard await node.gossip.publish( "message", cast[seq[byte]](msg), publishParams = some(PublishParams(skipMCache: true, useCustomConn: true)), ) await sleepAsync(1000.milliseconds) await node.switch.stop() proc makeMixConnCb(mixProto: MixProtocol): CustomConnCreationProc = return proc( destAddr: Option[MultiAddress], destPeerId: PeerId, codec: string ): Connection {.gcsafe, raises: [].} = try: let dest = destAddr.valueOr: debug "No destination address available" return return mixProto.toConnection(MixDestination.init(destPeerId, dest), codec).get() except CatchableError as e: error "Error during execution of MixEntryConnection callback: ", err = e.msg return nil proc mixnet_gossipsub_test() {.async: (raises: [Exception]).} = let numberOfNodes = 5 switch = setUpNodes(numberOfNodes) var nodes: seq[Node] for i in 0 ..< numberOfNodes: let mixProto = MixProtocol.new(i, numberOfNodes, switch[i]).valueOr: error "Mix protocol initialization failed", err = error return let mixConnCb = makeMixConnCb(mixProto) let mixPeerSelect = proc( allPeers: HashSet[PubSubPeer], directPeers: HashSet[PubSubPeer], meshPeers: HashSet[PubSubPeer], fanoutPeers: HashSet[PubSubPeer], ): HashSet[PubSubPeer] {.gcsafe, raises: [].} = try: return mixPeerSelection(allPeers, directPeers, meshPeers, fanoutPeers) except CatchableError as e: error "Error during execution of MixPeerSelection callback: ", err = e.msg return initHashSet[PubSubPeer]() let gossip = GossipSub.init( switch = switch[i], triggerSelf = true, customConnCallbacks = some( CustomConnectionCallbacks( customConnCreationCB: mixConnCb, customPeerSelectionCB: mixPeerSelect ) ), ) switch[i].mount(gossip) switch[i].mount(mixProto) await switch[i].start() nodes.add((switch[i], gossip, mixProto, i)) await connectNodesTCP(nodes) var allFuts: seq[Future[void]] for node in nodes: allFuts.add(oneNode(node)) await allFutures(allFuts) deleteNodeInfoFolder() deletePubInfoFolder() when isMainModule: waitFor(mixnet_gossipsub_test())