From f380a0eb8c54bee70ae1f3c49f8f483268e31d62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Mon, 8 Sep 2025 10:09:34 -0400 Subject: [PATCH] refactor: remove `exit == destination` (#81) --- examples/poc_gossipsub.nim | 5 +- examples/poc_gossipsub_repeated_runs.nim | 5 +- examples/poc_noresp_ping.nim | 2 +- examples/poc_resp_ping.nim | 4 +- mix.nim | 16 +- mix/entry_connection.nim | 59 ++----- mix/exit_connection.nim | 189 ----------------------- mix/exit_layer.nim | 67 ++------ mix/mix_protocol.nim | 91 ++++------- 9 files changed, 71 insertions(+), 367 deletions(-) delete mode 100644 mix/exit_connection.nim diff --git a/examples/poc_gossipsub.nim b/examples/poc_gossipsub.nim index 9fd37d8..c89869c 100644 --- a/examples/poc_gossipsub.nim +++ b/examples/poc_gossipsub.nim @@ -114,7 +114,10 @@ proc makeMixConnCb(mixProto: MixProtocol): CustomConnCreationProc = destAddr: Option[MultiAddress], destPeerId: PeerId, codec: string ): Connection {.gcsafe, raises: [].} = try: - return mixProto.toConnection(destPeerId, codec).get() + let dest = destAddr.valueOr: + debug "No destination address available" + return + return mixProto.toConnection(MixDestination.init(destPeerId, dest), codec).get() except CatchableError as e: error "Error during execution of MixEntryConnection callback: ", err = e.msg return nil diff --git a/examples/poc_gossipsub_repeated_runs.nim b/examples/poc_gossipsub_repeated_runs.nim index c6329d6..43cf577 100644 --- a/examples/poc_gossipsub_repeated_runs.nim +++ b/examples/poc_gossipsub_repeated_runs.nim @@ -116,7 +116,10 @@ proc makeMixConnCb(mixProto: MixProtocol): CustomConnCreationProc = destAddr: Option[MultiAddress], destPeerId: PeerId, codec: string ): Connection {.gcsafe, raises: [].} = try: - return mixProto.toConnection(destPeerId, codec).get() + let dest = destAddr.valueOr: + debug "No destination address available" + return + return mixProto.toConnection(MixDestination.init(destPeerId, dest), codec).get() except CatchableError as e: error "Error during execution of MixEntryConnection callback: ", err = e.msg return nil diff --git a/examples/poc_noresp_ping.nim b/examples/poc_noresp_ping.nim index 1e29f36..771114e 100644 --- a/examples/poc_noresp_ping.nim +++ b/examples/poc_noresp_ping.nim @@ -105,7 +105,7 @@ proc mixnetSimulation() {.async: (raises: [Exception]).} = receiverIndex = senderIndex + 1 let conn = mixProto[senderIndex].toConnection( - Destination.forwardToAddr( + MixDestination.init( nodes[receiverIndex].peerInfo.peerId, nodes[receiverIndex].peerInfo.addrs[0] ), NoRespPingCodec, diff --git a/examples/poc_resp_ping.nim b/examples/poc_resp_ping.nim index bea10ce..dc23c06 100644 --- a/examples/poc_resp_ping.nim +++ b/examples/poc_resp_ping.nim @@ -85,7 +85,7 @@ proc mixnetSimulation() {.async: (raises: [Exception]).} = return # We'll fwd requests, so let's register how should the exit node will read responses - proto.registerFwdReadBehavior(PingCodec, readExactly(32)) + proto.registerDestReadBehavior(PingCodec, readExactly(32)) mixProto.add(proto) @@ -105,7 +105,7 @@ proc mixnetSimulation() {.async: (raises: [Exception]).} = receiverIndex = senderIndex + 1 let conn = mixProto[senderIndex].toConnection( - Destination.forwardToAddr( + MixDestination.init( nodes[receiverIndex].peerInfo.peerId, nodes[receiverIndex].peerInfo.addrs[0] ), PingCodec, diff --git a/mix.nim b/mix.nim index 621c530..062113a 100644 --- a/mix.nim +++ b/mix.nim @@ -15,27 +15,23 @@ export writeMixPubInfoToFile export writeMixNodeInfoToFile export getMixNodeInfo export `new` +export init export getMaxMessageSizeForCodec export deleteNodeInfoFolder export deletePubInfoFolder - -export Destination -export DestinationType -export forwardToAddr -export mixNode - +export MixDestination export MixParameters -export fwdReadBehaviorCb -export registerFwdReadBehavior +export destReadBehaviorCb +export registerDestReadBehavior export MixNodes -proc readLp*(maxSize: int): fwdReadBehaviorCb = +proc readLp*(maxSize: int): destReadBehaviorCb = return proc( conn: Connection ): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]).} = await conn.readLp(maxSize) -proc readExactly*(nBytes: int): fwdReadBehaviorCb = +proc readExactly*(nBytes: int): destReadBehaviorCb = return proc( conn: Connection ): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]).} = diff --git a/mix/entry_connection.nim b/mix/entry_connection.nim index 89d54a8..a678e84 100644 --- a/mix/entry_connection.nim +++ b/mix/entry_connection.nim @@ -5,34 +5,8 @@ import ./mix_protocol import ./config from fragmentation import dataSize -type - DestinationType* = enum - MixNode - ForwardAddr - - Destination* = object - peerId*: PeerId - case kind*: DestinationType - of ForwardAddr: - address*: MultiAddress - else: - discard - -proc mixNode*(T: typedesc[Destination], p: PeerId): T = - T(kind: DestinationType.MixNode, peerId: p) - -proc forwardToAddr*(T: typedesc[Destination], p: PeerId, address: MultiAddress): T = - T(kind: DestinationType.ForwardAddr, peerId: p, address: address) - -proc `$`*(d: Destination): string = - case d.kind - of MixNode: - "Destination[MixNode](" & $d.peerId & ")" - of ForwardAddr: - "Destination[ForwardAddr](" & $d.address & "/p2p/" & $d.peerId & ")" - type MixDialer* = proc( - msg: seq[byte], codec: string, destination: Destination + msg: seq[byte], codec: string, destination: MixDestination ): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true).} type MixParameters* = object @@ -40,7 +14,7 @@ type MixParameters* = object numSurbs*: Opt[uint8] type MixEntryConnection* = ref object of Connection - destination: Destination + destination: MixDestination codec: string mixDialer: MixDialer params: Opt[MixParameters] @@ -235,7 +209,7 @@ when defined(libp2p_agents_metrics): proc new*( T: typedesc[MixEntryConnection], srcMix: MixProtocol, - destination: Destination, + destination: MixDestination, codec: string, params: Opt[MixParameters], ): T {.raises: [].} = @@ -261,17 +235,11 @@ proc new*( instance.incomingFut = checkForIncoming() instance.mixDialer = proc( - msg: seq[byte], codec: string, dest: Destination + msg: seq[byte], codec: string, dest: MixDestination ): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} = try: - let (peerId, destination) = - if dest.kind == DestinationType.MixNode: - (Opt.some(dest.peerId), Opt.none(MixDestination)) - else: - (Opt.none(PeerId), Opt.some(MixDestination.init(dest.peerId, dest.address))) - await srcMix.anonymizeLocalProtocolSend( - instance.incoming, msg, codec, peerId, destination, numSurbs + instance.incoming, msg, codec, dest, numSurbs ) except CatchableError as e: error "Error during execution of anonymizeLocalProtocolSend: ", err = e.msg @@ -284,19 +252,14 @@ proc new*( proc toConnection*( srcMix: MixProtocol, - destination: Destination | PeerId, + destination: MixDestination, codec: string, params: Opt[MixParameters] = Opt.none(MixParameters), ): Result[Connection, string] {.gcsafe, raises: [].} = - let dest = - when destination is PeerId: - Destination.mixNode(destination) + if not srcMix.hasDestReadBehavior(codec): + if params.get(MixParameters()).expectReply.get(false): + return err("no destination read behavior for codec") else: - destination + warn "no destination read behavior for codec", codec - if dest.kind == DestinationType.ForwardAddr and - params.get(MixParameters()).expectReply.get(false) and - not srcMix.hasFwdBehavior(codec): - return err("no forward behavior for codec") - - ok(MixEntryConnection.new(srcMix, dest, codec, params)) + ok(MixEntryConnection.new(srcMix, destination, codec, params)) diff --git a/mix/exit_connection.nim b/mix/exit_connection.nim deleted file mode 100644 index 78744b8..0000000 --- a/mix/exit_connection.nim +++ /dev/null @@ -1,189 +0,0 @@ -import hashes, chronos, libp2p/varint, stew/byteutils -import libp2p/stream/connection -from fragmentation import dataSize - -type MixExitConnection* = ref object of Connection - message: seq[byte] - response: seq[byte] - -method join*( - self: MixExitConnection -): Future[void] {.async: (raises: [CancelledError], raw: true), public.} = - discard - -method readExactly*( - self: MixExitConnection, pbytes: pointer, nbytes: int -): Future[void] {.async: (raises: [CancelledError, LPStreamError]), public.} = - if nbytes == 0: - return - - if self.message.len < nbytes: - raise newException( - LPStreamError, "Not enough data in to read exactly " & $nbytes & " bytes." - ) - - var pbuffer = cast[ptr UncheckedArray[byte]](pbytes) - for i in 0 ..< nbytes: - pbuffer[i] = self.message[i] - - if nbytes < self.message.len: - self.message = self.message[nbytes .. ^1] - else: - self.isEof = true - self.message = @[] - -# ToDo: Check readLine, readVarint implementations -method readLine*( - self: MixExitConnection, limit = 0, sep = "\r\n" -): Future[string] {.async: (raises: [CancelledError, LPStreamError]), public.} = - var - lim = if limit <= 0: -1 else: limit - result: seq[byte] = @[] - state = 0 - - while true: - if state < len(sep): - if self.message.len == 0: - raise newException(LPStreamError, "Not enough data to read line.") - - let ch = self.message[0] - self.message.delete(0) - - if byte(sep[state]) == ch: - inc(state) - if state == len(sep): - break - else: - result.add(ch) - state = 0 - - if lim > 0 and len(result) == lim: - break - else: - break - - return cast[string](result) - -method readVarint*( - self: MixExitConnection -): Future[uint64] {.async: (raises: [CancelledError, LPStreamError]), public.} = - var - buffer: array[10, byte] - bytesRead = 0 - - while bytesRead < buffer.len: - if self.message.len == 0: - raise newException(LPStreamError, "Not enough data to read varint") - - buffer[bytesRead] = self.message[0] - self.message.delete(0) - bytesRead += 1 - - var - varint: uint64 - length: int - let res = PB.getUVarint(buffer.toOpenArray(0, bytesRead - 1), length, varint) - if res.isOk(): - return varint - if res.error() != VarintError.Incomplete: - break - - raise newException(LPStreamError, "Cannot parse varint") - -method readLp*( - self: MixExitConnection, maxSize: int -): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]), public.} = - let - length = await self.readVarint() - maxLen = uint64(if maxSize < 0: int.high else: maxSize) - - if length > maxLen: - raise (ref MaxSizeError)(msg: "Message exceeds maximum length") - - if length == 0: - self.isEof = true - return @[] - - if self.message.len < int(length): - raise newException(LPStreamError, "Not enough data to read " & $length & " bytes.") - - result = self.message[0 ..< int(length)] - if int(length) == self.message.len: - self.isEof = true - self.message = @[] - else: - self.message = self.message[int(length) .. ^1] - return result - -method write*( - self: MixExitConnection, msg: seq[byte] -): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} = - self.response.add(msg) - -proc write*( - self: MixExitConnection, msg: string -): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} = - self.write(msg.toBytes()) - -method writeLp*( - self: MixExitConnection, msg: openArray[byte] -): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} = - if msg.len() > dataSize: - let fut = newFuture[void]() - fut.fail( - newException(LPStreamError, "exceeds max msg size of " & $dataSize & " bytes") - ) - return fut - - var - vbytes: seq[byte] = @[] - value = msg.len().uint64 - - while value >= 128: - vbytes.add(byte((value and 127) or 128)) - value = value shr 7 - vbytes.add(byte(value)) - - var buf = newSeqUninitialized[byte](msg.len() + vbytes.len) - buf[0 ..< vbytes.len] = vbytes.toOpenArray(0, vbytes.len - 1) - buf[vbytes.len ..< buf.len] = msg - - self.write(buf) - -method writeLp*( - self: MixExitConnection, msg: string -): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} = - self.writeLp(msg.toOpenArrayByte(0, msg.high)) - -func shortLog*(self: MixExitConnection): string {.raises: [].} = - "MixExitConnection" - -method initStream*(self: MixExitConnection) = - discard - -method closeImpl*( - self: MixExitConnection -): Future[void] {.async: (raises: [], raw: true).} = - let fut = newFuture[void]() - fut.complete() - return fut - -func hash*(self: MixExitConnection): Hash = - discard - -proc getResponse*(self: MixExitConnection): seq[byte] = - let r = self.response - self.response = @[] - return r - -proc new*(T: typedesc[MixExitConnection], message: seq[byte]): T = - let instance = T(message: message) - - when defined(libp2p_agents_metrics): - instance.shortAgent = connection.shortAgent - - instance - -when defined(libp2p_agents_metrics): - proc setShortAgent*(self: MixExitConnection, shortAgent: string) = - discard diff --git a/mix/exit_layer.nim b/mix/exit_layer.nim index ae06763..0f52412 100644 --- a/mix/exit_layer.nim +++ b/mix/exit_layer.nim @@ -1,57 +1,27 @@ -import std/[enumerate, strutils] +import std/strutils import chronicles, chronos, metrics import libp2p, libp2p/[builders, stream/connection] -import - ./[ - mix_metrics, exit_connection, reply_connection, serialization, utils # fragmentation - ] +import ./[mix_metrics, reply_connection, serialization, utils] type OnReplyDialer* = proc(surb: SURB, message: seq[byte]) {.async: (raises: [CancelledError]).} -type ProtocolHandler* = proc(conn: Connection, codec: string): Future[void] {. - async: (raises: [CancelledError]) -.} - -type fwdReadBehaviorCb* = proc(conn: Connection): Future[seq[byte]] {. +type destReadBehaviorCb* = proc(conn: Connection): Future[seq[byte]] {. async: (raises: [CancelledError, LPStreamError]) .} type ExitLayer* = object switch: Switch - pHandler: ProtocolHandler onReplyDialer: OnReplyDialer - fwdRBehavior: TableRef[string, fwdReadBehaviorCb] - -proc callHandler( - switch: Switch, conn: Connection, codec: string -): Future[void] {.async: (raises: [CatchableError]).} = - for index, handler in enumerate(switch.ms.handlers): - if codec in handler.protos: - await handler.protocol.handler(conn, codec) - return - - error "Handler doesn't exist", codec = codec + destReadBehavior: TableRef[string, destReadBehaviorCb] proc init*( T: typedesc[ExitLayer], switch: Switch, onReplyDialer: OnReplyDialer, - fwdRBehavior: TableRef[string, fwdReadBehaviorCb], + destReadBehavior: TableRef[string, destReadBehaviorCb], ): T = - ExitLayer( - switch: switch, - onReplyDialer: onReplyDialer, - fwdRBehavior: fwdRBehavior, - pHandler: proc( - conn: Connection, codec: string - ): Future[void] {.async: (raises: [CancelledError]).} = - try: - await callHandler(switch, conn, codec) - except CatchableError as e: - error "Error during execution of MixProtocol handler: ", err = e.msg - , - ) + ExitLayer(switch: switch, onReplyDialer: onReplyDialer, destReadBehavior: destReadBehavior) proc replyDialerCbFactory(self: ExitLayer): MixReplyDialer = return proc( @@ -84,26 +54,11 @@ proc reply( error "could not reply", description = exc.msg mix_messages_error.inc(labelValues = ["ExitLayer", "REPLY_FAILED"]) -proc runHandler( - self: ExitLayer, codec: string, message: seq[byte], surbs: seq[SURB] -) {.async: (raises: [CancelledError]).} = - let exitConn = MixExitConnection.new(message) - defer: - if not exitConn.isNil: - await exitConn.close() - - await self.pHandler(exitConn, codec) - - if surbs.len != 0: - let response = exitConn.getResponse() - await self.reply(surbs, response) - proc onMessage*( self: ExitLayer, codec: string, message: seq[byte], nextHop: Hop, surbs: seq[SURB] ) {.async: (raises: [CancelledError]).} = if nextHop == Hop(): - trace "onMessage - exit is destination", codec, message - await self.runHandler(codec, message, surbs) + error "no destination available" return # Forward to destination @@ -140,13 +95,13 @@ proc onMessage*( await destConn.write(message) if surbs.len != 0: - if not self.fwdRBehavior.hasKey(codec): - error "No fwdRBehavior for codec", codec + if not self.destReadBehavior.hasKey(codec): + error "No destReadBehavior for codec", codec return - var behaviorCb: fwdReadBehaviorCb + var behaviorCb: destReadBehaviorCb try: - behaviorCb = self.fwdRBehavior[codec] + behaviorCb = self.destReadBehavior[codec] except KeyError: doAssert false, "checked with HasKey" diff --git a/mix/mix_protocol.nim b/mix/mix_protocol.nim index c927550..b7f5070 100644 --- a/mix/mix_protocol.nim +++ b/mix/mix_protocol.nim @@ -2,8 +2,8 @@ import chronicles, chronos, sequtils, strutils, os, results import std/[strformat, sysrand, tables], metrics import ./[ - config, curve25519, exit_connection, fragmentation, mix_message, mix_node, sphinx, - serialization, tag_manager, utils, mix_metrics, exit_layer, + config, curve25519, fragmentation, mix_message, mix_node, sphinx, serialization, + tag_manager, utils, mix_metrics, exit_layer, ] import libp2p import @@ -25,15 +25,15 @@ type MixProtocol* = ref object of LPProtocol rng: ref HmacDrbgContext # TODO: verify if this requires cleanup for cases in which response never arrives (and connection is closed) connCreds: Table[I, ConnCreds] - fwdRBehavior: TableRef[string, fwdReadBehaviorCb] + destReadBehavior: TableRef[string, destReadBehaviorCb] -proc hasFwdBehavior*(mixProto: MixProtocol, codec: string): bool = - return mixProto.fwdRBehavior.hasKey(codec) +proc hasDestReadBehavior*(mixProto: MixProtocol, codec: string): bool = + return mixProto.destReadBehavior.hasKey(codec) -proc registerFwdReadBehavior*( - mixProto: MixProtocol, codec: string, fwdBehavior: fwdReadBehaviorCb +proc registerDestReadBehavior*( + mixProto: MixProtocol, codec: string, fwdBehavior: destReadBehaviorCb ) = - mixProto.fwdRBehavior[codec] = fwdBehavior + mixProto.destReadBehavior[codec] = fwdBehavior proc loadMixNodeInfo*( index: int, nodeFolderInfoPath: string = "./nodeInfo" @@ -406,17 +406,9 @@ proc anonymizeLocalProtocolSend*( incoming: AsyncQueue[seq[byte]], msg: seq[byte], codec: string, - destPeerId: Opt[PeerId], - fwdDestination: Opt[MixDestination], + destination: MixDestination, numSurbs: uint8, ) {.async.} = - ## destPeerId: use when dest == exit - ## fwdDestination: use when dest != exit - - doAssert (destPeerId.isSome and fwdDestination.isNone) or - (destPeerId.isNone and fwdDestination.isSome), - "specify either the destPeerId or destination but not both" - let (multiAddr, _, _, _, _) = getMixNodeInfo(mixProto.mixNodeInfo) mix_messages_recvd.inc(labelValues = ["Entry"]) @@ -432,11 +424,9 @@ proc anonymizeLocalProtocolSend*( let numMixNodes = mixProto.pubNodeInfo.len var numAvailableNodes = numMixNodes - info "Destination data", destPeerId, fwdDestination + debug "Destination data", destination - let skipDest = destPeerId.valueOr: - fwdDestination.value.peerId - if mixProto.pubNodeInfo.hasKey(skipDest): + if mixProto.pubNodeInfo.hasKey(destination.peerId): numAvailableNodes = numMixNodes - 1 if numAvailableNodes < L: @@ -450,35 +440,22 @@ proc anonymizeLocalProtocolSend*( randPeerId: PeerId availableIndices = toSeq(0 ..< numMixNodes) - if destPeerId.isSome: - let index = pubNodeInfoKeys.find(destPeerId.value()) - if index != -1: - availableIndices.del(index) - else: - error "Destination does not support Mix" - return - var i = 0 while i < L: - if destPeerId.isSome and i == L - 1: - randPeerId = destPeerId.value() - exitPeerId = destPeerId.value() - else: - let randomIndexPosition = cryptoRandomInt(availableIndices.len).valueOr: - error "Failed to genanrate random number", err = error - mix_messages_error.inc(labelValues = ["Entry", "NON_RECOVERABLE"]) - return - let selectedIndex = availableIndices[randomIndexPosition] - randPeerId = pubNodeInfoKeys[selectedIndex] - availableIndices.del(randomIndexPosition) + let randomIndexPosition = cryptoRandomInt(availableIndices.len).valueOr: + error "Failed to genanrate random number", err = error + mix_messages_error.inc(labelValues = ["Entry", "NON_RECOVERABLE"]) + return + let selectedIndex = availableIndices[randomIndexPosition] + randPeerId = pubNodeInfoKeys[selectedIndex] + availableIndices.del(randomIndexPosition) - if fwdDestination.isSome: - # Skip the destination peer - if randPeerId == fwdDestination.value().peerId: - continue - # Last hop will be the exit node that will forward the request - if i == L - 1: - exitPeerId = randPeerId + # Skip the destination peer + if randPeerId == destination.peerId: + continue + # Last hop will be the exit node that will forward the request + if i == L - 1: + exitPeerId = randPeerId debug "Selected mix node: ", indexInPath = i, peerId = randPeerId @@ -510,16 +487,12 @@ proc anonymizeLocalProtocolSend*( i = i + 1 - let destHop = - if fwdDestination.isSome: - #Encode destination - let destAddrBytes = multiAddrToBytes($fwdDestination.value()).valueOr: - error "Failed to convert multiaddress to bytes", err = error - mix_messages_error.inc(labelValues = ["Entry", "INVALID_DEST"]) - return - Hop.init(destAddrBytes) - else: - Hop() + #Encode destination + let destAddrBytes = multiAddrToBytes($destination).valueOr: + error "Failed to convert multiaddress to bytes", err = error + mix_messages_error.inc(labelValues = ["Entry", "INVALID_DEST"]) + return + let destHop = Hop.init(destAddrBytes) let msgWithSurbs = mixProto.prepareMsgWithSurbs(incoming, msg, numSurbs, exitPeerId).valueOr: error "Could not prepend SURBs", err = error @@ -570,14 +543,14 @@ proc new*( mixProto.pubNodeInfo = pubNodeInfo mixProto.switch = switch mixProto.tagManager = tagManager - mixProto.fwdRBehavior = newTable[string, fwdReadBehaviorCb]() + mixProto.destReadBehavior = newTable[string, destReadBehaviorCb]() let onReplyDialer = proc( surb: SURB, message: seq[byte] ) {.async: (raises: [CancelledError]).} = await mixProto.reply(surb, message) - mixProto.exitLayer = ExitLayer.init(switch, onReplyDialer, mixProto.fwdRBehavior) + mixProto.exitLayer = ExitLayer.init(switch, onReplyDialer, mixProto.destReadBehavior) mixProto.codecs = @[MixProtocolID] mixProto.rng = rng mixProto.handler = proc(