refactor: remove exit == destination (#81)

This commit is contained in:
richΛrd
2025-09-08 10:09:34 -04:00
committed by GitHub
parent 70a17d138b
commit f380a0eb8c
9 changed files with 71 additions and 367 deletions

View File

@@ -114,7 +114,10 @@ proc makeMixConnCb(mixProto: MixProtocol): CustomConnCreationProc =
destAddr: Option[MultiAddress], destPeerId: PeerId, codec: string
): Connection {.gcsafe, raises: [].} =
try:
return mixProto.toConnection(destPeerId, codec).get()
let dest = destAddr.valueOr:
debug "No destination address available"
return
return mixProto.toConnection(MixDestination.init(destPeerId, dest), codec).get()
except CatchableError as e:
error "Error during execution of MixEntryConnection callback: ", err = e.msg
return nil

View File

@@ -116,7 +116,10 @@ proc makeMixConnCb(mixProto: MixProtocol): CustomConnCreationProc =
destAddr: Option[MultiAddress], destPeerId: PeerId, codec: string
): Connection {.gcsafe, raises: [].} =
try:
return mixProto.toConnection(destPeerId, codec).get()
let dest = destAddr.valueOr:
debug "No destination address available"
return
return mixProto.toConnection(MixDestination.init(destPeerId, dest), codec).get()
except CatchableError as e:
error "Error during execution of MixEntryConnection callback: ", err = e.msg
return nil

View File

@@ -105,7 +105,7 @@ proc mixnetSimulation() {.async: (raises: [Exception]).} =
receiverIndex = senderIndex + 1
let conn = mixProto[senderIndex].toConnection(
Destination.forwardToAddr(
MixDestination.init(
nodes[receiverIndex].peerInfo.peerId, nodes[receiverIndex].peerInfo.addrs[0]
),
NoRespPingCodec,

View File

@@ -85,7 +85,7 @@ proc mixnetSimulation() {.async: (raises: [Exception]).} =
return
# We'll fwd requests, so let's register how should the exit node will read responses
proto.registerFwdReadBehavior(PingCodec, readExactly(32))
proto.registerDestReadBehavior(PingCodec, readExactly(32))
mixProto.add(proto)
@@ -105,7 +105,7 @@ proc mixnetSimulation() {.async: (raises: [Exception]).} =
receiverIndex = senderIndex + 1
let conn = mixProto[senderIndex].toConnection(
Destination.forwardToAddr(
MixDestination.init(
nodes[receiverIndex].peerInfo.peerId, nodes[receiverIndex].peerInfo.addrs[0]
),
PingCodec,

16
mix.nim
View File

@@ -15,27 +15,23 @@ export writeMixPubInfoToFile
export writeMixNodeInfoToFile
export getMixNodeInfo
export `new`
export init
export getMaxMessageSizeForCodec
export deleteNodeInfoFolder
export deletePubInfoFolder
export Destination
export DestinationType
export forwardToAddr
export mixNode
export MixDestination
export MixParameters
export fwdReadBehaviorCb
export registerFwdReadBehavior
export destReadBehaviorCb
export registerDestReadBehavior
export MixNodes
proc readLp*(maxSize: int): fwdReadBehaviorCb =
proc readLp*(maxSize: int): destReadBehaviorCb =
return proc(
conn: Connection
): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]).} =
await conn.readLp(maxSize)
proc readExactly*(nBytes: int): fwdReadBehaviorCb =
proc readExactly*(nBytes: int): destReadBehaviorCb =
return proc(
conn: Connection
): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]).} =

View File

@@ -5,34 +5,8 @@ import ./mix_protocol
import ./config
from fragmentation import dataSize
type
DestinationType* = enum
MixNode
ForwardAddr
Destination* = object
peerId*: PeerId
case kind*: DestinationType
of ForwardAddr:
address*: MultiAddress
else:
discard
proc mixNode*(T: typedesc[Destination], p: PeerId): T =
T(kind: DestinationType.MixNode, peerId: p)
proc forwardToAddr*(T: typedesc[Destination], p: PeerId, address: MultiAddress): T =
T(kind: DestinationType.ForwardAddr, peerId: p, address: address)
proc `$`*(d: Destination): string =
case d.kind
of MixNode:
"Destination[MixNode](" & $d.peerId & ")"
of ForwardAddr:
"Destination[ForwardAddr](" & $d.address & "/p2p/" & $d.peerId & ")"
type MixDialer* = proc(
msg: seq[byte], codec: string, destination: Destination
msg: seq[byte], codec: string, destination: MixDestination
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true).}
type MixParameters* = object
@@ -40,7 +14,7 @@ type MixParameters* = object
numSurbs*: Opt[uint8]
type MixEntryConnection* = ref object of Connection
destination: Destination
destination: MixDestination
codec: string
mixDialer: MixDialer
params: Opt[MixParameters]
@@ -235,7 +209,7 @@ when defined(libp2p_agents_metrics):
proc new*(
T: typedesc[MixEntryConnection],
srcMix: MixProtocol,
destination: Destination,
destination: MixDestination,
codec: string,
params: Opt[MixParameters],
): T {.raises: [].} =
@@ -261,17 +235,11 @@ proc new*(
instance.incomingFut = checkForIncoming()
instance.mixDialer = proc(
msg: seq[byte], codec: string, dest: Destination
msg: seq[byte], codec: string, dest: MixDestination
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
try:
let (peerId, destination) =
if dest.kind == DestinationType.MixNode:
(Opt.some(dest.peerId), Opt.none(MixDestination))
else:
(Opt.none(PeerId), Opt.some(MixDestination.init(dest.peerId, dest.address)))
await srcMix.anonymizeLocalProtocolSend(
instance.incoming, msg, codec, peerId, destination, numSurbs
instance.incoming, msg, codec, dest, numSurbs
)
except CatchableError as e:
error "Error during execution of anonymizeLocalProtocolSend: ", err = e.msg
@@ -284,19 +252,14 @@ proc new*(
proc toConnection*(
srcMix: MixProtocol,
destination: Destination | PeerId,
destination: MixDestination,
codec: string,
params: Opt[MixParameters] = Opt.none(MixParameters),
): Result[Connection, string] {.gcsafe, raises: [].} =
let dest =
when destination is PeerId:
Destination.mixNode(destination)
if not srcMix.hasDestReadBehavior(codec):
if params.get(MixParameters()).expectReply.get(false):
return err("no destination read behavior for codec")
else:
destination
warn "no destination read behavior for codec", codec
if dest.kind == DestinationType.ForwardAddr and
params.get(MixParameters()).expectReply.get(false) and
not srcMix.hasFwdBehavior(codec):
return err("no forward behavior for codec")
ok(MixEntryConnection.new(srcMix, dest, codec, params))
ok(MixEntryConnection.new(srcMix, destination, codec, params))

View File

@@ -1,189 +0,0 @@
import hashes, chronos, libp2p/varint, stew/byteutils
import libp2p/stream/connection
from fragmentation import dataSize
type MixExitConnection* = ref object of Connection
message: seq[byte]
response: seq[byte]
method join*(
self: MixExitConnection
): Future[void] {.async: (raises: [CancelledError], raw: true), public.} =
discard
method readExactly*(
self: MixExitConnection, pbytes: pointer, nbytes: int
): Future[void] {.async: (raises: [CancelledError, LPStreamError]), public.} =
if nbytes == 0:
return
if self.message.len < nbytes:
raise newException(
LPStreamError, "Not enough data in to read exactly " & $nbytes & " bytes."
)
var pbuffer = cast[ptr UncheckedArray[byte]](pbytes)
for i in 0 ..< nbytes:
pbuffer[i] = self.message[i]
if nbytes < self.message.len:
self.message = self.message[nbytes .. ^1]
else:
self.isEof = true
self.message = @[]
# ToDo: Check readLine, readVarint implementations
method readLine*(
self: MixExitConnection, limit = 0, sep = "\r\n"
): Future[string] {.async: (raises: [CancelledError, LPStreamError]), public.} =
var
lim = if limit <= 0: -1 else: limit
result: seq[byte] = @[]
state = 0
while true:
if state < len(sep):
if self.message.len == 0:
raise newException(LPStreamError, "Not enough data to read line.")
let ch = self.message[0]
self.message.delete(0)
if byte(sep[state]) == ch:
inc(state)
if state == len(sep):
break
else:
result.add(ch)
state = 0
if lim > 0 and len(result) == lim:
break
else:
break
return cast[string](result)
method readVarint*(
self: MixExitConnection
): Future[uint64] {.async: (raises: [CancelledError, LPStreamError]), public.} =
var
buffer: array[10, byte]
bytesRead = 0
while bytesRead < buffer.len:
if self.message.len == 0:
raise newException(LPStreamError, "Not enough data to read varint")
buffer[bytesRead] = self.message[0]
self.message.delete(0)
bytesRead += 1
var
varint: uint64
length: int
let res = PB.getUVarint(buffer.toOpenArray(0, bytesRead - 1), length, varint)
if res.isOk():
return varint
if res.error() != VarintError.Incomplete:
break
raise newException(LPStreamError, "Cannot parse varint")
method readLp*(
self: MixExitConnection, maxSize: int
): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]), public.} =
let
length = await self.readVarint()
maxLen = uint64(if maxSize < 0: int.high else: maxSize)
if length > maxLen:
raise (ref MaxSizeError)(msg: "Message exceeds maximum length")
if length == 0:
self.isEof = true
return @[]
if self.message.len < int(length):
raise newException(LPStreamError, "Not enough data to read " & $length & " bytes.")
result = self.message[0 ..< int(length)]
if int(length) == self.message.len:
self.isEof = true
self.message = @[]
else:
self.message = self.message[int(length) .. ^1]
return result
method write*(
self: MixExitConnection, msg: seq[byte]
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
self.response.add(msg)
proc write*(
self: MixExitConnection, msg: string
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
self.write(msg.toBytes())
method writeLp*(
self: MixExitConnection, msg: openArray[byte]
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
if msg.len() > dataSize:
let fut = newFuture[void]()
fut.fail(
newException(LPStreamError, "exceeds max msg size of " & $dataSize & " bytes")
)
return fut
var
vbytes: seq[byte] = @[]
value = msg.len().uint64
while value >= 128:
vbytes.add(byte((value and 127) or 128))
value = value shr 7
vbytes.add(byte(value))
var buf = newSeqUninitialized[byte](msg.len() + vbytes.len)
buf[0 ..< vbytes.len] = vbytes.toOpenArray(0, vbytes.len - 1)
buf[vbytes.len ..< buf.len] = msg
self.write(buf)
method writeLp*(
self: MixExitConnection, msg: string
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
self.writeLp(msg.toOpenArrayByte(0, msg.high))
func shortLog*(self: MixExitConnection): string {.raises: [].} =
"MixExitConnection"
method initStream*(self: MixExitConnection) =
discard
method closeImpl*(
self: MixExitConnection
): Future[void] {.async: (raises: [], raw: true).} =
let fut = newFuture[void]()
fut.complete()
return fut
func hash*(self: MixExitConnection): Hash =
discard
proc getResponse*(self: MixExitConnection): seq[byte] =
let r = self.response
self.response = @[]
return r
proc new*(T: typedesc[MixExitConnection], message: seq[byte]): T =
let instance = T(message: message)
when defined(libp2p_agents_metrics):
instance.shortAgent = connection.shortAgent
instance
when defined(libp2p_agents_metrics):
proc setShortAgent*(self: MixExitConnection, shortAgent: string) =
discard

View File

@@ -1,57 +1,27 @@
import std/[enumerate, strutils]
import std/strutils
import chronicles, chronos, metrics
import libp2p, libp2p/[builders, stream/connection]
import
./[
mix_metrics, exit_connection, reply_connection, serialization, utils # fragmentation
]
import ./[mix_metrics, reply_connection, serialization, utils]
type OnReplyDialer* =
proc(surb: SURB, message: seq[byte]) {.async: (raises: [CancelledError]).}
type ProtocolHandler* = proc(conn: Connection, codec: string): Future[void] {.
async: (raises: [CancelledError])
.}
type fwdReadBehaviorCb* = proc(conn: Connection): Future[seq[byte]] {.
type destReadBehaviorCb* = proc(conn: Connection): Future[seq[byte]] {.
async: (raises: [CancelledError, LPStreamError])
.}
type ExitLayer* = object
switch: Switch
pHandler: ProtocolHandler
onReplyDialer: OnReplyDialer
fwdRBehavior: TableRef[string, fwdReadBehaviorCb]
proc callHandler(
switch: Switch, conn: Connection, codec: string
): Future[void] {.async: (raises: [CatchableError]).} =
for index, handler in enumerate(switch.ms.handlers):
if codec in handler.protos:
await handler.protocol.handler(conn, codec)
return
error "Handler doesn't exist", codec = codec
destReadBehavior: TableRef[string, destReadBehaviorCb]
proc init*(
T: typedesc[ExitLayer],
switch: Switch,
onReplyDialer: OnReplyDialer,
fwdRBehavior: TableRef[string, fwdReadBehaviorCb],
destReadBehavior: TableRef[string, destReadBehaviorCb],
): T =
ExitLayer(
switch: switch,
onReplyDialer: onReplyDialer,
fwdRBehavior: fwdRBehavior,
pHandler: proc(
conn: Connection, codec: string
): Future[void] {.async: (raises: [CancelledError]).} =
try:
await callHandler(switch, conn, codec)
except CatchableError as e:
error "Error during execution of MixProtocol handler: ", err = e.msg
,
)
ExitLayer(switch: switch, onReplyDialer: onReplyDialer, destReadBehavior: destReadBehavior)
proc replyDialerCbFactory(self: ExitLayer): MixReplyDialer =
return proc(
@@ -84,26 +54,11 @@ proc reply(
error "could not reply", description = exc.msg
mix_messages_error.inc(labelValues = ["ExitLayer", "REPLY_FAILED"])
proc runHandler(
self: ExitLayer, codec: string, message: seq[byte], surbs: seq[SURB]
) {.async: (raises: [CancelledError]).} =
let exitConn = MixExitConnection.new(message)
defer:
if not exitConn.isNil:
await exitConn.close()
await self.pHandler(exitConn, codec)
if surbs.len != 0:
let response = exitConn.getResponse()
await self.reply(surbs, response)
proc onMessage*(
self: ExitLayer, codec: string, message: seq[byte], nextHop: Hop, surbs: seq[SURB]
) {.async: (raises: [CancelledError]).} =
if nextHop == Hop():
trace "onMessage - exit is destination", codec, message
await self.runHandler(codec, message, surbs)
error "no destination available"
return
# Forward to destination
@@ -140,13 +95,13 @@ proc onMessage*(
await destConn.write(message)
if surbs.len != 0:
if not self.fwdRBehavior.hasKey(codec):
error "No fwdRBehavior for codec", codec
if not self.destReadBehavior.hasKey(codec):
error "No destReadBehavior for codec", codec
return
var behaviorCb: fwdReadBehaviorCb
var behaviorCb: destReadBehaviorCb
try:
behaviorCb = self.fwdRBehavior[codec]
behaviorCb = self.destReadBehavior[codec]
except KeyError:
doAssert false, "checked with HasKey"

View File

@@ -2,8 +2,8 @@ import chronicles, chronos, sequtils, strutils, os, results
import std/[strformat, sysrand, tables], metrics
import
./[
config, curve25519, exit_connection, fragmentation, mix_message, mix_node, sphinx,
serialization, tag_manager, utils, mix_metrics, exit_layer,
config, curve25519, fragmentation, mix_message, mix_node, sphinx, serialization,
tag_manager, utils, mix_metrics, exit_layer,
]
import libp2p
import
@@ -25,15 +25,15 @@ type MixProtocol* = ref object of LPProtocol
rng: ref HmacDrbgContext
# TODO: verify if this requires cleanup for cases in which response never arrives (and connection is closed)
connCreds: Table[I, ConnCreds]
fwdRBehavior: TableRef[string, fwdReadBehaviorCb]
destReadBehavior: TableRef[string, destReadBehaviorCb]
proc hasFwdBehavior*(mixProto: MixProtocol, codec: string): bool =
return mixProto.fwdRBehavior.hasKey(codec)
proc hasDestReadBehavior*(mixProto: MixProtocol, codec: string): bool =
return mixProto.destReadBehavior.hasKey(codec)
proc registerFwdReadBehavior*(
mixProto: MixProtocol, codec: string, fwdBehavior: fwdReadBehaviorCb
proc registerDestReadBehavior*(
mixProto: MixProtocol, codec: string, fwdBehavior: destReadBehaviorCb
) =
mixProto.fwdRBehavior[codec] = fwdBehavior
mixProto.destReadBehavior[codec] = fwdBehavior
proc loadMixNodeInfo*(
index: int, nodeFolderInfoPath: string = "./nodeInfo"
@@ -406,17 +406,9 @@ proc anonymizeLocalProtocolSend*(
incoming: AsyncQueue[seq[byte]],
msg: seq[byte],
codec: string,
destPeerId: Opt[PeerId],
fwdDestination: Opt[MixDestination],
destination: MixDestination,
numSurbs: uint8,
) {.async.} =
## destPeerId: use when dest == exit
## fwdDestination: use when dest != exit
doAssert (destPeerId.isSome and fwdDestination.isNone) or
(destPeerId.isNone and fwdDestination.isSome),
"specify either the destPeerId or destination but not both"
let (multiAddr, _, _, _, _) = getMixNodeInfo(mixProto.mixNodeInfo)
mix_messages_recvd.inc(labelValues = ["Entry"])
@@ -432,11 +424,9 @@ proc anonymizeLocalProtocolSend*(
let numMixNodes = mixProto.pubNodeInfo.len
var numAvailableNodes = numMixNodes
info "Destination data", destPeerId, fwdDestination
debug "Destination data", destination
let skipDest = destPeerId.valueOr:
fwdDestination.value.peerId
if mixProto.pubNodeInfo.hasKey(skipDest):
if mixProto.pubNodeInfo.hasKey(destination.peerId):
numAvailableNodes = numMixNodes - 1
if numAvailableNodes < L:
@@ -450,35 +440,22 @@ proc anonymizeLocalProtocolSend*(
randPeerId: PeerId
availableIndices = toSeq(0 ..< numMixNodes)
if destPeerId.isSome:
let index = pubNodeInfoKeys.find(destPeerId.value())
if index != -1:
availableIndices.del(index)
else:
error "Destination does not support Mix"
return
var i = 0
while i < L:
if destPeerId.isSome and i == L - 1:
randPeerId = destPeerId.value()
exitPeerId = destPeerId.value()
else:
let randomIndexPosition = cryptoRandomInt(availableIndices.len).valueOr:
error "Failed to genanrate random number", err = error
mix_messages_error.inc(labelValues = ["Entry", "NON_RECOVERABLE"])
return
let selectedIndex = availableIndices[randomIndexPosition]
randPeerId = pubNodeInfoKeys[selectedIndex]
availableIndices.del(randomIndexPosition)
let randomIndexPosition = cryptoRandomInt(availableIndices.len).valueOr:
error "Failed to genanrate random number", err = error
mix_messages_error.inc(labelValues = ["Entry", "NON_RECOVERABLE"])
return
let selectedIndex = availableIndices[randomIndexPosition]
randPeerId = pubNodeInfoKeys[selectedIndex]
availableIndices.del(randomIndexPosition)
if fwdDestination.isSome:
# Skip the destination peer
if randPeerId == fwdDestination.value().peerId:
continue
# Last hop will be the exit node that will forward the request
if i == L - 1:
exitPeerId = randPeerId
# Skip the destination peer
if randPeerId == destination.peerId:
continue
# Last hop will be the exit node that will forward the request
if i == L - 1:
exitPeerId = randPeerId
debug "Selected mix node: ", indexInPath = i, peerId = randPeerId
@@ -510,16 +487,12 @@ proc anonymizeLocalProtocolSend*(
i = i + 1
let destHop =
if fwdDestination.isSome:
#Encode destination
let destAddrBytes = multiAddrToBytes($fwdDestination.value()).valueOr:
error "Failed to convert multiaddress to bytes", err = error
mix_messages_error.inc(labelValues = ["Entry", "INVALID_DEST"])
return
Hop.init(destAddrBytes)
else:
Hop()
#Encode destination
let destAddrBytes = multiAddrToBytes($destination).valueOr:
error "Failed to convert multiaddress to bytes", err = error
mix_messages_error.inc(labelValues = ["Entry", "INVALID_DEST"])
return
let destHop = Hop.init(destAddrBytes)
let msgWithSurbs = mixProto.prepareMsgWithSurbs(incoming, msg, numSurbs, exitPeerId).valueOr:
error "Could not prepend SURBs", err = error
@@ -570,14 +543,14 @@ proc new*(
mixProto.pubNodeInfo = pubNodeInfo
mixProto.switch = switch
mixProto.tagManager = tagManager
mixProto.fwdRBehavior = newTable[string, fwdReadBehaviorCb]()
mixProto.destReadBehavior = newTable[string, destReadBehaviorCb]()
let onReplyDialer = proc(
surb: SURB, message: seq[byte]
) {.async: (raises: [CancelledError]).} =
await mixProto.reply(surb, message)
mixProto.exitLayer = ExitLayer.init(switch, onReplyDialer, mixProto.fwdRBehavior)
mixProto.exitLayer = ExitLayer.init(switch, onReplyDialer, mixProto.destReadBehavior)
mixProto.codecs = @[MixProtocolID]
mixProto.rng = rng
mixProto.handler = proc(