refactor: exit layer (#64)

This commit is contained in:
richΛrd
2025-07-28 10:04:14 -04:00
committed by GitHub
parent c4f3d2d511
commit 3290358585
6 changed files with 113 additions and 204 deletions

View File

@@ -83,11 +83,11 @@ proc mixnetSimulation() {.async: (raises: [Exception]).} =
for index, _ in enumerate(nodes):
noRespPingProto.add(noresp_ping.NoRespPing.new(rng = rng))
let protoRes = MixProtocol.new(index, numberOfNodes, nodes[index])
if protoRes.isErr:
error "Mix protocol initialization failed", err = protoRes.error
let proto = MixProtocol.new(index, numberOfNodes, nodes[index]).valueOr:
error "Mix protocol initialization failed", err = error
return
mixProto.add(protoRes.get())
mixProto.add(proto)
nodes[index].mount(noRespPingProto[index])
nodes[index].mount(mixProto[index])

93
mix/exit_layer.nim Normal file
View File

@@ -0,0 +1,93 @@
import std/[enumerate, strutils]
import chronicles, chronos, metrics
import libp2p, libp2p/[builders, stream/connection]
import ./[mix_metrics, exit_connection, serialization, utils]
type ProtocolHandler* = proc(conn: Connection, codec: string): Future[void] {.
async: (raises: [CancelledError])
.}
type ExitLayer* = object
switch: Switch
pHandler: ProtocolHandler
proc callHandler(
switch: Switch, conn: Connection, codec: string
): Future[void] {.async: (raises: [CatchableError]).} =
for index, handler in enumerate(switch.ms.handlers):
if codec in handler.protos:
await handler.protocol.handler(conn, codec)
return
error "Handler doesn't exist", codec = codec
proc init*(T: typedesc[ExitLayer], switch: Switch): T =
ExitLayer(
switch: switch,
pHandler: proc(
conn: Connection, codec: string
): Future[void] {.async: (raises: [CancelledError]).} =
try:
await callHandler(switch, conn, codec)
except CatchableError as e:
error "Error during execution of MixProtocol handler: ", err = e.msg
,
)
proc runHandler(
self: ExitLayer, codec: string, message: seq[byte]
) {.async: (raises: [CancelledError]).} =
trace "Received: ", receiver = multiAddr, codec, message
let exitConn = MixExitConnection.new(message)
await self.pHandler(exitConn, codec)
try:
await exitConn.close()
except CatchableError as e:
error "Failed to close exit connection: ", err = e.msg
return
proc onMessage*(
self: ExitLayer, codec: string, message: seq[byte], nextHop: Hop
) {.async: (raises: [CancelledError]).} =
if nextHop == Hop():
await self.runHandler(codec, message)
return
# Forward to destination
let destBytes = getHop(nextHop)
let fullAddrStr = bytesToMultiAddr(destBytes).valueOr:
error "Failed to convert bytes to multiaddress", err = error
mix_messages_error.inc(labelValues = ["ExitLayer", "INVALID_DEST"])
return
let parts = fullAddrStr.split("/p2p/")
if parts.len != 2:
error "Invalid multiaddress format", parts
mix_messages_error.inc(labelValues = ["ExitLayer", "INVALID_DEST"])
return
# Create MultiAddress and PeerId
let locationAddr = MultiAddress.init(parts[0]).valueOr:
error "Failed to parse location multiaddress: ", err = error
mix_messages_error.inc(labelValues = ["ExitLayer", "INVALID_DEST"])
return
let peerId = PeerId.init(parts[1]).valueOr:
error "Failed to initialize PeerId", err = error
mix_messages_error.inc(labelValues = ["ExitLayer", "INVALID_DEST"])
return
var destConn: Connection
try:
destConn = await self.switch.dial(peerId, @[locationAddr], codec)
await destConn.writeLp(message)
#TODO: When response is implemented, we can read the response here
except CatchableError as e:
error "Failed to dial next hop: ", err = e.msg
mix_messages_error.inc(labelValues = ["ExitLayer", "DIAL_FAILED"])
finally:
if not destConn.isNil:
await destConn.close()

View File

@@ -1,5 +1,4 @@
import chronicles, results
import ./[config, utils]
import stew/[byteutils, leb128]
import libp2p/protobuf/minprotobuf
@@ -50,36 +49,3 @@ proc deserialize*(
message: data[varintLen + codecLen ..< data.len],
)
)
# TODO: These are not used anywhere
# TODO: consider changing the `dest` parameter to a multiaddress
proc serializeWithDestination*(
mixMsg: MixMessage, dest: string
): Result[seq[byte], string] =
let destBytes = multiAddrToBytes(dest).valueOr:
return err("Error in multiaddress conversion to bytes: " & error)
if len(destBytes) != addrSize:
error "Destination address must be exactly " & $addrSize & " bytes"
return err("Destination address must be exactly " & $addrSize & " bytes")
var serializedMixMsg = ?mixMsg.serialize()
let oldLen = serializedMixMsg.len
serializedMixMsg.setLen(oldLen + destBytes.len)
copyMem(addr serializedMixMsg[oldLen], unsafeAddr destBytes[0], destBytes.len)
return ok(serializedMixMsg)
# TODO: These are not used anywhere
proc deserializeWithDestination*(
T: typedesc[MixMessage], data: openArray[byte]
): Result[(T, string), string] =
if data.len <= addrSize:
return err("Deserialization with destination failed: not enough data")
let mixMsg = ?MixMessage.deserialize(data[0 ..^ (addrSize + 1)])
let dest = bytesToMultiAddr(data[^addrSize ..^ 1]).valueOr:
return err("Error in destination multiaddress conversion to bytes: " & error)
return ok((mixMsg, dest))

View File

@@ -2,8 +2,8 @@ import chronicles, chronos, sequtils, strutils, os, results
import std/[strformat, sysrand], metrics
import
./[
config, curve25519, exit_connection, fragmentation, mix_message, mix_node, protocol,
sphinx, serialization, tag_manager, utils, mix_metrics,
config, curve25519, exit_connection, fragmentation, mix_message, mix_node, sphinx,
serialization, tag_manager, utils, mix_metrics, exit_layer,
]
import libp2p
import
@@ -17,7 +17,7 @@ type MixProtocol* = ref object of LPProtocol
pubNodeInfo: Table[PeerId, MixPubInfo]
switch: Switch
tagManager: TagManager
pHandler: ProtocolHandler
exitLayer: ExitLayer
proc loadMixNodeInfo*(
index: int, nodeFolderInfoPath: string = "./nodeInfo"
@@ -52,18 +52,6 @@ proc cryptoRandomInt(max: int): Result[int, string] =
let value = cast[uint64](bytes)
return ok(int(value mod uint64(max)))
proc exitNodeIsDestination(
mixProto: MixProtocol, msg: MixMessage
) {.async: (raises: [CancelledError]).} =
let exitConn = MixExitConnection.new(msg.message)
trace "Received: ", receiver = multiAddr, message = message
await mixProto.pHandler(exitConn, msg.codec)
try:
await exitConn.close()
except CatchableError as e:
error "Failed to close exit connection: ", err = e.msg
return
proc handleMixNodeConnection(
mixProto: MixProtocol, conn: Connection
) {.async: (raises: [CancelledError]).} =
@@ -118,54 +106,12 @@ proc handleMixNodeConnection(
return
trace "Exit node - Received mix message: ",
receiver = multiAddr, message = deserialized.message
receiver = multiAddr, message = deserialized.message, codec = deserialized.codec
if nextHop == Hop() and delay == @[]:
await mixProto.exitNodeIsDestination(deserialized)
return
await mixProto.exitLayer.onMessage(
deserialized.codec, deserialized.message, nextHop
)
# Add delay
let delayMillis = (delay[0].int shl 8) or delay[1].int
await sleepAsync(milliseconds(delayMillis))
# Forward to destination
let destBytes = getHop(nextHop)
let fullAddrStr = bytesToMultiAddr(destBytes).valueOr:
error "Failed to convert bytes to multiaddress", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
return
let parts = fullAddrStr.split("/p2p/")
if parts.len != 2:
error "Invalid multiaddress format", parts = parts
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
return
let
locationAddrStr = parts[0]
peerIdStr = parts[1]
# Create MultiAddress and PeerId
let locationAddr = MultiAddress.init(locationAddrStr).valueOr:
error "Failed to parse location multiaddress: ", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
return
let peerId = PeerId.init(peerIdStr).valueOr:
error "Failed to initialize PeerId", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
return
var destConn: Connection
try:
destConn = await mixProto.switch.dial(peerId, @[locationAddr], deserialized.codec)
await destConn.writeLp(deserialized.message)
#TODO: When response is implemented, we can read the response here
await destConn.close()
except CatchableError as e:
error "Failed to dial next hop: ", err = e.msg
mix_messages_error.inc(labelValues = ["Exit", "DAIL_FAILED"])
mix_messages_forwarded.inc(labelValues = ["Exit"])
of Intermediate:
trace "# Intermediate: ", multiAddr = multiAddr
@@ -391,30 +337,26 @@ proc anonymizeLocalProtocolSend*(
except CatchableError as e:
error "Failed to close outgoing stream: ", err = e.msg
method init*(mixProtocol: MixProtocol) {.gcsafe, raises: [].} =
proc handle(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
await mixProtocol.handleMixNodeConnection(conn)
mixProtocol.codecs = @[MixProtocolID]
mixProtocol.handler = handle
proc new*(
T: typedesc[MixProtocol],
mixNodeInfo: MixNodeInfo,
pubNodeInfo: Table[PeerId, MixPubInfo],
switch: Switch,
tagManager: TagManager,
handler: ProtocolHandler,
): Result[T, string] =
): T =
let mixProto = new(T)
mixProto.mixNodeInfo = mixNodeInfo
mixProto.pubNodeInfo = pubNodeInfo
mixProto.switch = switch
mixProto.tagManager = tagManager
mixProto.pHandler = handler
mixProto.init() # TODO: constructor should probably not call init
mixProto.exitLayer = ExitLayer.init(switch)
mixProto.codecs = @[MixProtocolID]
mixProto.handler = proc(
conn: Connection, proto: string
) {.async: (raises: [CancelledError]).} =
await mixProto.handleMixNodeConnection(conn)
return ok(mixProto)
mixProto
proc new*(
T: typedesc[MixProtocol],
@@ -430,50 +372,6 @@ proc new*(
).valueOr:
return err("Failed to load mix pub info for index " & $index & " - err: " & error)
var sendHandlerFunc = proc(
conn: Connection, codec: string
): Future[void] {.async: (raises: [CancelledError]).} =
try:
await callHandler(switch, conn, codec)
except CatchableError as e:
error "Error during execution of MixProtocol handler: ", err = e.msg
return
let mixProto =
?MixProtocol.new(
mixNodeInfo, pubNodeInfo, switch, TagManager.new(), sendHandlerFunc
)
mixProto.init() # TODO: constructor should probably not call init
let mixProto = MixProtocol.new(mixNodeInfo, pubNodeInfo, switch, TagManager.new())
return ok(mixProto)
# TODO: is this needed
proc initialize*(
mixProtocol: MixProtocol,
localMixNodeInfo: MixNodeInfo,
switch: Switch,
mixNodeTable: Table[PeerId, MixPubInfo],
) =
#if mixNodeTable.len == 0:
# TODO:This is temporary check for testing, needs to be removed later
# probably protocol can be initiated without any mix nodes itself,
# and can be later supplied with nodes as they are discovered.
#return err("No mix nodes passed for the protocol initialization.")
mixProtocol.mixNodeInfo = localMixNodeInfo
mixProtocol.switch = switch
mixProtocol.pubNodeInfo = mixNodeTable
mixProtocol.tagManager = TagManager.new()
mixProtocol.init()
# TODO: is this needed
method setNodePool*(
mixProtocol: MixProtocol, mixNodeTable: Table[PeerId, MixPubInfo]
) {.base, gcsafe, raises: [].} =
mixProtocol.pubNodeInfo = mixNodeTable
# TODO: is this needed
method getNodePoolSize*(mixProtocol: MixProtocol): int {.base, gcsafe, raises: [].} =
mixProtocol.pubNodeInfo.len

View File

@@ -1,15 +0,0 @@
import chronos, chronicles, std/enumerate
import libp2p/[builders, stream/connection]
type ProtocolHandler* = proc(conn: Connection, codec: string): Future[void] {.
async: (raises: [CancelledError])
.}
method callHandler*(
switch: Switch, conn: Connection, codec: string
): Future[void] {.base, async.} =
for index, handler in enumerate(switch.ms.handlers):
if codec in handler.protos:
await handler.protocol.handler(conn, codec)
return
error "Handler doesn't exist", codec = codec

View File

@@ -62,36 +62,3 @@ suite "mix_message_tests":
error "Deserialized codec does not match the original",
original = codec, deserialized = dMixMsg.codec
fail()
test "serialize_and_deserialize_mix_message_and_destination":
let
message = "Hello World!"
codec = "/test/codec/1.0.0"
destination =
"/ip4/0.0.0.0/tcp/4242/p2p/16Uiu2HAmFkwLVsVh6gGPmSm9R3X4scJ5thVdKfWYeJsKeVrbcgVC"
mixMsg = MixMessage.init(message.toBytes(), codec)
let serializedResult = mixMsg.serializeWithDestination(destination)
if serializedResult.isErr:
error "Serialization with destination failed", err = serializedResult.error
fail()
let serialized = serializedResult.get()
let deserializedResult = MixMessage.deserializeWithDestination(serialized)
if deserializedResult.isErr:
error "Deserialization with destination failed", err = deserializedResult.error
fail()
let (dMixMessage, dDest) = deserializedResult.get()
if message != string.fromBytes(dMixMessage.message):
error "Deserialized message does not match the original",
original = message, deserialized = string.fromBytes(dMixMessage.message)
fail()
if codec != dMixMessage.codec:
error "Deserialized codec does not match the original",
original = codec, deserialized = dMixMessage.codec
fail()
if destination != dDest:
error "Deserialized destination does not match the original",
original = destination, deserialized = dDest
fail()