feat(mix): mix_protocol and entry connection (#1703)

This commit is contained in:
richΛrd
2025-09-22 09:09:18 -04:00
committed by GitHub
parent fbf96bb2ce
commit 647f76341e
7 changed files with 580 additions and 6 deletions

View File

@@ -0,0 +1,133 @@
import hashes, chronos, stew/byteutils, results, chronicles
import ../../stream/connection
import ../../varint
import ../../utils/sequninit
import ./mix_protocol
from fragmentation import DataSize
type MixDialer* = proc(
msg: seq[byte], codec: string, destination: MixDestination
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true).}
type MixEntryConnection* = ref object of Connection
destination: MixDestination
codec: string
mixDialer: MixDialer
func shortLog*(conn: MixEntryConnection): string =
if conn == nil:
"MixEntryConnection(nil)"
else:
"MixEntryConnection(" & $conn.destination & ")"
chronicles.formatIt(MixEntryConnection):
shortLog(it)
method readOnce*(
s: MixEntryConnection, pbytes: pointer, nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError]), public.} =
# TODO: implement
raise newLPStreamEOFError()
method readExactly*(
s: MixEntryConnection, pbytes: pointer, nbytes: int
): Future[void] {.async: (raises: [CancelledError, LPStreamError]), public.} =
# TODO: implement
raise newLPStreamEOFError()
method readLine*(
s: MixEntryConnection, limit = 0, sep = "\r\n"
): Future[string] {.async: (raises: [CancelledError, LPStreamError]), public.} =
# TODO: implement
raise newLPStreamEOFError()
method readVarint*(
conn: MixEntryConnection
): Future[uint64] {.async: (raises: [CancelledError, LPStreamError]), public.} =
# TODO: implement
raise newLPStreamEOFError()
method readLp*(
s: MixEntryConnection, maxSize: int
): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]), public.} =
# TODO: implement
raise newLPStreamEOFError()
method write*(
self: MixEntryConnection, msg: seq[byte]
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
self.mixDialer(msg, self.codec, self.destination)
proc write*(
self: MixEntryConnection, msg: string
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
self.write(msg.toBytes())
method writeLp*(
self: MixEntryConnection, msg: openArray[byte]
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
if msg.len() > DataSize:
let fut = newFuture[void]()
fut.fail(
newException(LPStreamError, "exceeds max msg size of " & $DataSize & " bytes")
)
return fut
## Write `msg` with a varint-encoded length prefix
let vbytes = PB.toBytes(msg.len().uint64)
var buf = newSeqUninit[byte](msg.len() + vbytes.len)
buf[0 ..< vbytes.len] = vbytes.toOpenArray()
buf[vbytes.len ..< buf.len] = msg
self.write(buf)
method writeLp*(
self: MixEntryConnection, msg: string
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true), public.} =
self.writeLp(msg.toOpenArrayByte(0, msg.high))
proc shortLog*(self: MixEntryConnection): string {.raises: [].} =
"[MixEntryConnection] Destination: " & $self.destination
method closeImpl*(
self: MixEntryConnection
): Future[void] {.async: (raises: [], raw: true).} =
let fut = newFuture[void]()
fut.complete()
return fut
func hash*(self: MixEntryConnection): Hash =
hash($self.destination)
when defined(libp2p_agents_metrics):
proc setShortAgent*(self: MixEntryConnection, shortAgent: string) =
discard
proc new*(
T: typedesc[MixEntryConnection],
srcMix: MixProtocol,
destination: MixDestination,
codec: string,
): T {.raises: [].} =
var instance = T()
instance.destination = destination
instance.codec = codec
instance.mixDialer = proc(
msg: seq[byte], codec: string, dest: MixDestination
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
await srcMix.anonymizeLocalProtocolSend(
nil, msg, codec, dest, 0 # TODO: set incoming queue for replies and surbs
)
when defined(libp2p_agents_metrics):
instance.shortAgent = connection.shortAgent
instance
proc toConnection*(
srcMix: MixProtocol, destination: MixDestination, codec: string
): Result[Connection, string] {.gcsafe, raises: [].} =
## Create a stream to send and optionally receive responses.
## Under the hood it will wrap the message in a sphinx packet
## and send it via a random mix path.
ok(MixEntryConnection.new(srcMix, destination, codec))

View File

@@ -0,0 +1,36 @@
import chronicles, chronos, metrics
import ../../builders
import ../../stream/connection
import ./mix_metrics
type ExitLayer* = object
switch: Switch
proc init*(T: typedesc[ExitLayer], switch: Switch): T =
ExitLayer(switch: switch)
proc onMessage*(
self: ExitLayer,
codec: string,
message: seq[byte],
destAddr: MultiAddress,
destPeerId: PeerId,
) {.async: (raises: [CancelledError]).} =
# If dialing destination fails, no response is returned to
# the sender, so, flow can just end here. Only log errors
# for now
# https://github.com/vacp2p/mix/issues/86
try:
let destConn = await self.switch.dial(destPeerId, @[destAddr], codec)
defer:
await destConn.close()
await destConn.write(message)
except LPStreamError as exc:
error "Stream error while writing to next hop: ", err = exc.msg
mix_messages_error.inc(labelValues = ["ExitLayer", "LPSTREAM_ERR"])
except DialFailedError as exc:
error "Failed to dial next hop: ", err = exc.msg
mix_messages_error.inc(labelValues = ["ExitLayer", "DIAL_FAILED"])
except CancelledError as exc:
raise exc

View File

@@ -0,0 +1,13 @@
{.push raises: [].}
import metrics
declarePublicCounter mix_messages_recvd, "number of mix messages received", ["type"]
declarePublicCounter mix_messages_forwarded,
"number of mix messages forwarded", ["type"]
declarePublicCounter mix_messages_error,
"number of mix messages failed processing", ["type", "error"]
declarePublicGauge mix_pool_size, "number of nodes in the pool"

View File

@@ -0,0 +1,392 @@
import chronicles, chronos, sequtils, strutils, os, results
import std/[strformat, tables], metrics
import
./[
curve25519, fragmentation, mix_message, mix_node, sphinx, serialization,
tag_manager, mix_metrics, exit_layer, multiaddr,
]
import stew/endians2
import ../protocol
import ../../stream/[connection, lpstream]
import ../../[switch, multicodec, peerinfo]
const MixProtocolID* = "/mix/1.0.0"
## Mix Protocol defines a decentralized anonymous message routing layer for libp2p networks.
## It enables sender anonymity by routing each message through a decentralized mix overlay
## network composed of participating libp2p nodes, known as mix nodes. Each message is
## routed independently in a stateless manner, allowing other libp2p protocols to selectively
## anonymize messages without modifying their core protocol behavior.
type MixProtocol* = ref object of LPProtocol
mixNodeInfo: MixNodeInfo
pubNodeInfo: Table[PeerId, MixPubInfo]
switch: Switch
tagManager: TagManager
exitLayer: ExitLayer
rng: ref HmacDrbgContext
proc loadAllButIndexMixPubInfo*(
index, numNodes: int, pubInfoFolderPath: string = "./pubInfo"
): Result[Table[PeerId, MixPubInfo], string] =
var pubInfoTable = initTable[PeerId, MixPubInfo]()
for i in 0 ..< numNodes:
if i == index:
continue
let pubInfo = MixPubInfo.readFromFile(i, pubInfoFolderPath).valueOr:
return err("Failed to load pub info from file: " & error)
pubInfoTable[pubInfo.peerId] = pubInfo
return ok(pubInfoTable)
proc cryptoRandomInt(rng: ref HmacDrbgContext, max: int): Result[int, string] =
if max == 0:
return err("Max cannot be zero.")
let res = rng[].generate(uint64) mod uint64(max)
ok(res.int)
proc handleMixNodeConnection(
mixProto: MixProtocol, conn: Connection
) {.async: (raises: [LPStreamError, CancelledError]).} =
let receivedBytes =
try:
await conn.readLp(PacketSize)
except CancelledError as exc:
raise exc
finally:
await conn.close()
if receivedBytes.len == 0:
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "NO_DATA"])
return # No data, end of stream
# Process the packet
let (peerId, multiAddr, _, mixPrivKey, _, _) = mixProto.mixNodeInfo.get()
let sphinxPacket = SphinxPacket.deserialize(receivedBytes).valueOr:
error "Sphinx packet deserialization error", err = error
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "INVALID_SPHINX"])
return
let processedSP = processSphinxPacket(sphinxPacket, mixPrivKey, mixProto.tagManager).valueOr:
error "Failed to process Sphinx packet", err = error
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "INVALID_SPHINX"])
return
case processedSP.status
of Exit:
mix_messages_recvd.inc(labelValues = ["Exit"])
# This is the exit node, forward to destination
let msgChunk = MessageChunk.deserialize(processedSP.messageChunk).valueOr:
error "Deserialization failed", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_SPHINX"])
return
let unpaddedMsg = msgChunk.removePadding().valueOr:
error "Unpadding message failed", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_SPHINX"])
return
let deserialized = MixMessage.deserialize(unpaddedMsg).valueOr:
error "Deserialization failed", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_SPHINX"])
return
if processedSP.destination == Hop():
error "no destination available"
mix_messages_error.inc(labelValues = ["Exit", "NO_DESTINATION"])
return
let destBytes = processedSP.destination.get()
let (destPeerId, destAddr) = bytesToMultiAddr(destBytes).valueOr:
error "Failed to convert bytes to multiaddress", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
return
trace "Exit node - Received mix message",
peerId,
message = deserialized.message,
codec = deserialized.codec,
to = destPeerId
await mixProto.exitLayer.onMessage(
deserialized.codec, deserialized.message, destAddr, destPeerId
)
mix_messages_forwarded.inc(labelValues = ["Exit"])
of Reply:
# TODO: implement
discard
of Intermediate:
trace "# Intermediate: ", peerId, multiAddr
# Add delay
mix_messages_recvd.inc(labelValues = ["Intermediate"])
await sleepAsync(milliseconds(processedSP.delayMs))
# Forward to next hop
let nextHopBytes = processedSP.nextHop.get()
let (nextPeerId, nextAddr) = bytesToMultiAddr(nextHopBytes).valueOr:
error "Failed to convert bytes to multiaddress", err = error
mix_messages_error.inc(labelValues = ["Intermediate", "INVALID_DEST"])
return
try:
let nextHopConn =
await mixProto.switch.dial(nextPeerId, @[nextAddr], MixProtocolID)
defer:
await nextHopConn.close()
await nextHopConn.writeLp(processedSP.serializedSphinxPacket)
mix_messages_forwarded.inc(labelValues = ["Intermediate"])
except CancelledError as exc:
raise exc
except DialFailedError as exc:
error "Failed to dial next hop: ", err = exc.msg
mix_messages_error.inc(labelValues = ["Intermediate", "DIAL_FAILED"])
except LPStreamError as exc:
error "Failed to write to next hop: ", err = exc.msg
mix_messages_error.inc(labelValues = ["Intermediate", "DIAL_FAILED"])
of Duplicate:
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "DUPLICATE"])
of InvalidMAC:
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "INVALID_MAC"])
proc getMaxMessageSizeForCodec*(
codec: string, numberOfSurbs: uint8 = 0
): Result[int, string] =
## Computes the maximum payload size (in bytes) available for a message when encoded
## with the given `codec`
## Returns an error if the codec length would cause it to exceeds the data capacity.
let serializedMsg = MixMessage.init(@[], codec).serialize()
if serializedMsg.len > DataSize:
return err("cannot encode messages for this codec")
return ok(DataSize - serializedMsg.len)
proc sendPacket(
mixProto: MixProtocol,
multiAddrs: MultiAddress,
sphinxPacket: seq[byte],
label: string,
) {.async: (raises: [CancelledError]).} =
## Send the wrapped message to the first mix node in the selected path
let (firstMixPeerId, firstMixAddr) = parseFullAddress(multiAddrs).valueOr:
error "Invalid multiaddress", err = error
mix_messages_error.inc(labelValues = [label, "NON_RECOVERABLE"])
return
try:
let nextHopConn =
await mixProto.switch.dial(firstMixPeerId, @[firstMixAddr], @[MixProtocolID])
defer:
await nextHopConn.close()
await nextHopConn.writeLp(sphinxPacket)
except DialFailedError as exc:
error "Failed to dial next hop: ",
peerId = firstMixPeerId, address = firstMixAddr, err = exc.msg
mix_messages_error.inc(labelValues = [label, "SEND_FAILED"])
except LPStreamError as exc:
error "Failed to write to next hop: ",
peerId = firstMixPeerId, address = firstMixAddr, err = exc.msg
mix_messages_error.inc(labelValues = [label, "SEND_FAILED"])
except CancelledError as exc:
raise exc
mix_messages_forwarded.inc(labelValues = ["Entry"])
proc buildMessage(
msg: seq[byte], codec: string, peerId: PeerId
): Result[Message, (string, string)] =
let
mixMsg = MixMessage.init(msg, codec)
serialized = mixMsg.serialize()
if serialized.len > DataSize:
return err(("message size exceeds maximum payload size", "INVALID_SIZE"))
let
paddedMsg = addPadding(serialized, peerId)
serializedMsgChunk = paddedMsg.serialize()
ok(serializedMsgChunk)
## Represents the final target of a mixnet message.
## contains the peer id and multiaddress of the destination node.
type MixDestination* = object
peerId: PeerId
address: MultiAddress
proc init*(T: typedesc[MixDestination], peerId: PeerId, address: MultiAddress): T =
## Initializes a destination object with the given peer id and multiaddress.
T(peerId: peerId, address: address)
proc `$`*(d: MixDestination): string =
$d.address & "/p2p/" & $d.peerId
proc anonymizeLocalProtocolSend*(
mixProto: MixProtocol,
incoming: AsyncQueue[seq[byte]],
msg: seq[byte],
codec: string,
destination: MixDestination,
numSurbs: uint8,
) {.async: (raises: [CancelledError, LPStreamError]).} =
mix_messages_recvd.inc(labelValues = ["Entry"])
var
multiAddrs: seq[MultiAddress] = @[]
publicKeys: seq[FieldElement] = @[]
hop: seq[Hop] = @[]
delay: seq[seq[byte]] = @[]
exitPeerId: PeerId
# Select L mix nodes at random
let numMixNodes = mixProto.pubNodeInfo.len
var numAvailableNodes = numMixNodes
debug "Destination data", destination
if mixProto.pubNodeInfo.hasKey(destination.peerId):
numAvailableNodes = numMixNodes - 1
if numAvailableNodes < PathLength:
error "No. of public mix nodes less than path length.",
numMixNodes = numAvailableNodes, pathLength = PathLength
mix_messages_error.inc(labelValues = ["Entry", "LOW_MIX_POOL"])
return
# Skip the destination peer
var pubNodeInfoKeys =
mixProto.pubNodeInfo.keys.toSeq().filterIt(it != destination.peerId)
var availableIndices = toSeq(0 ..< pubNodeInfoKeys.len)
var i = 0
while i < PathLength:
let randomIndexPosition = cryptoRandomInt(mixProto.rng, availableIndices.len).valueOr:
error "Failed to generate random number", err = error
mix_messages_error.inc(labelValues = ["Entry", "NON_RECOVERABLE"])
return
let selectedIndex = availableIndices[randomIndexPosition]
let randPeerId = pubNodeInfoKeys[selectedIndex]
availableIndices.del(randomIndexPosition)
# Last hop will be the exit node that will forward the request
if i == PathLength - 1:
exitPeerId = randPeerId
debug "Selected mix node: ", indexInPath = i, peerId = randPeerId
# Extract multiaddress, mix public key, and hop
let (peerId, multiAddr, mixPubKey, _) =
mixProto.pubNodeInfo.getOrDefault(randPeerId).get()
multiAddrs.add(multiAddr)
publicKeys.add(mixPubKey)
let multiAddrBytes = multiAddrToBytes(peerId, multiAddr).valueOr:
error "Failed to convert multiaddress to bytes", err = error
mix_messages_error.inc(labelValues = ["Entry", "INVALID_MIX_INFO"])
#TODO: should we skip and pick a different node here??
return
hop.add(Hop.init(multiAddrBytes))
# Compute delay
let delayMillisec =
if i != PathLength - 1:
cryptoRandomInt(mixProto.rng, 3).valueOr:
error "Failed to generate random number", err = error
mix_messages_error.inc(labelValues = ["Entry", "NON_RECOVERABLE"])
return
else:
0 # Last hop does not require a delay
delay.add(@(delayMillisec.uint16.toBytesBE()))
i = i + 1
#Encode destination
let destAddrBytes = multiAddrToBytes(destination.peerId, destination.address).valueOr:
error "Failed to convert multiaddress to bytes", err = error
mix_messages_error.inc(labelValues = ["Entry", "INVALID_DEST"])
return
let destHop = Hop.init(destAddrBytes)
let message = buildMessage(msg, codec, mixProto.mixNodeInfo.peerId).valueOr:
error "Error building message", err = error[0]
mix_messages_error.inc(labelValues = ["Entry", error[1]])
return
# Wrap in Sphinx packet
let sphinxPacket = wrapInSphinxPacket(message, publicKeys, delay, hop, destHop).valueOr:
error "Failed to wrap in sphinx packet", err = error
mix_messages_error.inc(labelValues = ["Entry", "NON_RECOVERABLE"])
return
# Send the wrapped message to the first mix node in the selected path
await mixProto.sendPacket(multiAddrs[0], sphinxPacket, "Entry")
proc init*(
mixProto: MixProtocol,
mixNodeInfo: MixNodeInfo,
pubNodeInfo: Table[PeerId, MixPubInfo],
switch: Switch,
tagManager: TagManager = TagManager.new(),
rng: ref HmacDrbgContext = newRng(),
) =
mixProto.mixNodeInfo = mixNodeInfo
mixProto.pubNodeInfo = pubNodeInfo
mixProto.switch = switch
mixProto.tagManager = tagManager
mixProto.exitLayer = ExitLayer.init(switch)
mixProto.codecs = @[MixProtocolID]
mixProto.rng = rng
mixProto.handler = proc(
conn: Connection, proto: string
) {.async: (raises: [CancelledError]).} =
try:
await mixProto.handleMixNodeConnection(conn)
except LPStreamError as e:
debug "Stream error", conn = conn, err = e.msg
proc new*(
T: typedesc[MixProtocol],
mixNodeInfo: MixNodeInfo,
pubNodeInfo: Table[PeerId, MixPubInfo],
switch: Switch,
tagManager: TagManager = TagManager.new(),
rng: ref HmacDrbgContext = newRng(),
): T =
let mixProto = new(T)
mixProto.init(mixNodeInfo, pubNodeInfo, switch)
mixProto
proc new*(
T: typedesc[MixProtocol],
index, numNodes: int,
switch: Switch,
nodeFolderInfoPath: string = ".",
rng: ref HmacDrbgContext = newRng(),
): Result[T, string] =
## Constructs a new `MixProtocol` instance for the mix node at `index`,
## loading its private info from `nodeInfo` and the public info of all other nodes from `pubInfo`.
let mixNodeInfo = MixNodeInfo.readFromFile(index, nodeFolderInfoPath / fmt"nodeInfo").valueOr:
return err("Failed to load mix node info for index " & $index & " - err: " & error)
let pubNodeInfo = loadAllButIndexMixPubInfo(
index, numNodes, nodeFolderInfoPath / fmt"pubInfo"
).valueOr:
return err("Failed to load mix pub info for index " & $index & " - err: " & error)
let mixProto =
MixProtocol.new(mixNodeInfo, pubNodeInfo, switch, TagManager.new(), rng)
return ok(mixProto)
proc setNodePool*(
mixProtocol: MixProtocol, mixNodeTable: Table[PeerId, MixPubInfo]
) {.gcsafe, raises: [].} =
mixProtocol.pubNodeInfo = mixNodeTable
proc getNodePoolSize*(mixProtocol: MixProtocol): int {.gcsafe, raises: [].} =
mixProtocol.pubNodeInfo.len

View File

@@ -6,7 +6,6 @@ const
k* = 16 # Security parameter
r* = 5 # Maximum path length
t* = 6 # t.k - combined length of next hop address and delay
L* = 3 # Path length
AlphaSize* = 32 # Group element
BetaSize* = ((r * (t + 1)) + 1) * k # bytes
GammaSize* = 16 # Output of HMAC-SHA-256, truncated to 16 bytes
@@ -15,7 +14,7 @@ const
AddrSize* = (t * k) - DelaySize # Address size
PacketSize* = 4608 # Total packet size (from spec)
MessageSize* = PacketSize - HeaderSize - k # Size of the message itself
payloadSize* = MessageSize + k # Total payload size
PayloadSize* = MessageSize + k # Total payload size
SurbSize* = HeaderSize + k + AddrSize
# Size of a surb packet inside the message payload
SurbLenSize* = 1 # Size of the field storing the number of surbs
@@ -70,8 +69,8 @@ proc serialize*(message: Message): seq[byte] =
proc deserialize*(
T: typedesc[Message], serializedMessage: openArray[byte]
): Result[T, string] =
if len(serializedMessage) != payloadSize:
return err("Serialized message must be exactly " & $payloadSize & " bytes")
if len(serializedMessage) != PayloadSize:
return err("Serialized message must be exactly " & $PayloadSize & " bytes")
return ok(serializedMessage[k ..^ 1])
type Hop* = object

View File

@@ -3,7 +3,8 @@ import ./[crypto, curve25519, serialization, tag_manager]
import ../../crypto/crypto
import ../../utils/sequninit
const PaddingLength = (((t + 1) * (r - L)) + 1) * k
const PathLength* = 3 # Path length (L)
const PaddingLength = (((t + 1) * (r - PathLength)) + 1) * k
type ProcessingStatus* = enum
Exit

View File

@@ -56,7 +56,7 @@ suite "serialization_tests":
header = Header.init(
newSeq[byte](AlphaSize), newSeq[byte](BetaSize), newSeq[byte](GammaSize)
)
payload = newSeq[byte](payloadSize)
payload = newSeq[byte](PayloadSize)
packet = SphinxPacket.init(header, payload)
let serialized = packet.serialize()