feat: benchmark logs (#80)

This commit is contained in:
richΛrd
2025-09-08 14:30:14 -04:00
committed by GitHub
parent eb4b8d4ff1
commit bc0a9be913
6 changed files with 175 additions and 42 deletions

View File

@@ -49,10 +49,12 @@ Execute the test suite with:
## Usage
Run the Mix protocol proof-of-concept:
Run the Mix protocol proof-of-concepts:
```bash
nim c -r src/mix_poc.nim
nim c -r examples/poc_gossipsub.nim
nim c -r examples/poc_resp_ping.nim
nim c -r examples/poc_noresp_ping.nim
```
## Current Implementation Challenges

View File

@@ -114,7 +114,8 @@ proc mixnetSimulation() {.async: (raises: [Exception]).} =
return
discard await noRespPingProto[senderIndex].noRespPing(conn)
await sleepAsync(1.seconds)
await sleepAsync(3.seconds)
deleteNodeInfoFolder()
deletePubInfoFolder()

View File

@@ -24,6 +24,7 @@ export MixParameters
export destReadBehaviorCb
export registerDestReadBehavior
export MixNodes
export initMixMultiAddrByIndex
proc readLp*(maxSize: int): destReadBehaviorCb =
## create callback to read length prefixed msg, with the length encoded as a varint

View File

@@ -1,7 +1,6 @@
import std/strutils
import chronicles, chronos, metrics
import libp2p, libp2p/[builders, stream/connection]
import ./[mix_metrics, reply_connection, serialization, utils]
import ./[mix_metrics, reply_connection, serialization]
type OnReplyDialer* =
proc(surb: SURB, message: seq[byte]) {.async: (raises: [CancelledError]).}
@@ -58,43 +57,17 @@ proc reply(
mix_messages_error.inc(labelValues = ["ExitLayer", "REPLY_FAILED"])
proc onMessage*(
self: ExitLayer, codec: string, message: seq[byte], nextHop: Hop, surbs: seq[SURB]
self: ExitLayer,
codec: string,
message: seq[byte],
destAddr: MultiAddress,
destPeerId: PeerId,
surbs: seq[SURB],
) {.async: (raises: [CancelledError]).} =
if nextHop == Hop():
error "no destination available"
return
# Forward to destination
let destBytes = getHop(nextHop)
let fullAddrStr = bytesToMultiAddr(destBytes).valueOr:
error "Failed to convert bytes to multiaddress", err = error
mix_messages_error.inc(labelValues = ["ExitLayer", "INVALID_DEST"])
return
let parts = fullAddrStr.split("/p2p/")
if parts.len != 2:
error "Invalid multiaddress format", parts
mix_messages_error.inc(labelValues = ["ExitLayer", "INVALID_DEST"])
return
# Create MultiAddress and PeerId
let locationAddr = MultiAddress.init(parts[0]).valueOr:
error "Failed to parse location multiaddress: ", err = error
mix_messages_error.inc(labelValues = ["ExitLayer", "INVALID_DEST"])
return
let peerId = PeerId.init(parts[1]).valueOr:
error "Failed to initialize PeerId", err = error
mix_messages_error.inc(labelValues = ["ExitLayer", "INVALID_DEST"])
return
trace "onMessage - exit is not destination", peerId, locationAddr, codec, message
var destConn: Connection
var response: seq[byte]
try:
destConn = await self.switch.dial(peerId, @[locationAddr], codec)
destConn = await self.switch.dial(destPeerId, @[destAddr], codec)
await destConn.write(message)
if surbs.len != 0:

View File

@@ -304,3 +304,11 @@ proc findByPeerId*(self: MixNodes, peerId: PeerId): Result[MixNodeInfo, string]
if peerIdRes == peerId:
return ok(node)
return err("No node with peer id: " & $peerId)
proc initMixMultiAddrByIndex*(
self: var MixNodes, index: int, multiAddr: string
): Result[void, string] =
if index < 0 or index >= self.len:
return err("Index must be between 0 and " & $(self.len))
self[index].multiAddr = multiAddr
ok()

View File

@@ -10,6 +10,9 @@ import
libp2p/
[protocols/ping, protocols/protocol, stream/connection, stream/lpstream, switch]
when defined(enable_mix_benchmarks):
import stew/endians2
const MixProtocolID* = "/mix/1.0.0"
type ConnCreds = object
@@ -32,6 +35,30 @@ type MixProtocol* = ref object of LPProtocol
connCreds: Table[I, ConnCreds]
destReadBehavior: TableRef[string, destReadBehaviorCb]
proc benchmarkLog*(
eventName: static[string],
myPeerId: PeerId,
startTime: Moment,
msgId: uint64,
orig: uint64,
fromPeerId: Opt[PeerId],
toPeerId: Opt[PeerId],
) =
let endTime = Moment.now()
let procDelay = (endTime - startTime).milliseconds()
let fromPeerId =
if fromPeerId.isNone:
"None"
else:
fromPeerId.get().shortLog()
let toPeerId =
if toPeerId.isNone:
"None"
else:
toPeerId.get().shortLog()
info eventName,
msgId, fromPeerId, toPeerId, myPeerId, orig, current = startTime, procDelay
proc hasDestReadBehavior*(mixProto: MixProtocol, codec: string): bool =
return mixProto.destReadBehavior.hasKey(codec)
@@ -77,7 +104,16 @@ proc handleMixNodeConnection(
mixProto: MixProtocol, conn: Connection
) {.async: (raises: [CancelledError]).} =
var receivedBytes: seq[byte]
when defined(enable_mix_benchmarks):
var metadata: seq[byte]
var fromPeerId: PeerId
try:
when defined(enable_mix_benchmarks):
metadata = await conn.readLp(16)
fromPeerId = conn.peerId
receivedBytes = await conn.readLp(packetSize)
except Exception as e:
error "Failed to read: ", err = e.msg
@@ -88,6 +124,13 @@ proc handleMixNodeConnection(
except CatchableError as e:
error "Failed to close incoming stream: ", err = e.msg
when defined(enable_mix_benchmarks):
let startTime = Moment.now()
if metadata.len == 0:
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "NO_DATA"])
return # No data, end of stream
if receivedBytes.len == 0:
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "NO_DATA"])
return # No data, end of stream
@@ -105,6 +148,11 @@ proc handleMixNodeConnection(
mix_messages_error.inc(labelValues = ["Intermediate/Exit", "INVALID_SPHINX"])
return
when defined(enable_mix_benchmarks):
let
orig = uint64.fromBytesLE(metadata[0 ..< 8])
msgId = uint64.fromBytesLE(metadata[8 ..< 16])
case processedSP.status
of Exit:
mix_messages_recvd.inc(labelValues = [$processedSP.status])
@@ -132,8 +180,46 @@ proc handleMixNodeConnection(
trace "Exit node - Received mix message",
receiver = multiAddr, message = deserialized.message, codec = deserialized.codec
if processedSP.destination == Hop():
error "no destination available"
mix_messages_error.inc(labelValues = ["Exit", "NO_DESTINATION"])
return
let destBytes = getHop(processedSP.destination)
let fullAddrStr = bytesToMultiAddr(destBytes).valueOr:
error "Failed to convert bytes to multiaddress", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
return
let parts = fullAddrStr.split("/p2p/")
if parts.len != 2:
error "Invalid multiaddress format", parts
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
return
# Create MultiAddress and PeerId
let destAddr = MultiAddress.init(parts[0]).valueOr:
error "Failed to parse location multiaddress: ", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
return
let destPeerId = PeerId.init(parts[1]).valueOr:
error "Failed to initialize PeerId", err = error
mix_messages_error.inc(labelValues = ["Exit", "INVALID_DEST"])
return
when defined(enable_mix_benchmarks):
benchmarkLog "Exit",
mixProto.switch.peerInfo.peerId,
startTime,
msgId,
orig,
Opt.some(fromPeerId),
Opt.some(destPeerId)
await mixProto.exitLayer.onMessage(
deserialized.codec, message, processedSP.destination, surbs
deserialized.codec, message, destAddr, destPeerId, surbs
)
mix_messages_forwarded.inc(labelValues = ["Exit"])
@@ -172,6 +258,15 @@ proc handleMixNodeConnection(
mix_messages_error.inc(labelValues = ["Reply", "INVALID_SPHINX"])
return
when defined(enable_mix_benchmarks):
benchmarkLog "Reply",
mixProto.switch.peerInfo.peerId,
startTime,
msgId,
orig,
Opt.some(fromPeerId),
Opt.none(PeerId)
await connCred.incoming.put(deserialized.message)
else:
error "could not process reply", id = processedSP.id
@@ -212,9 +307,22 @@ proc handleMixNodeConnection(
mix_messages_error.inc(labelValues = ["Intermediate", "INVALID_NEXTHOP"])
return
when defined(enable_mix_benchmarks):
benchmarkLog "Intermediate",
mixProto.switch.peerInfo.peerId,
startTime,
msgId,
orig,
Opt.some(fromPeerId),
Opt.some(peerId)
var nextHopConn: Connection
try:
nextHopConn = await mixProto.switch.dial(peerId, @[locationAddr], MixProtocolID)
when defined(enable_mix_benchmarks):
await nextHopConn.writeLp(metadata)
await nextHopConn.writeLp(processedSP.serializedSphinxPacket)
mix_messages_forwarded.inc(labelValues = ["Intermediate"])
except CatchableError as e:
@@ -342,9 +450,25 @@ proc prepareMsgWithSurbs(
ok(serialized)
type SendPacketType* = enum
Entry
Reply
type SendPacketConfig = object
logType: SendPacketType
when defined(enable_mix_benchmarks):
startTime: Moment
orig: uint64
msgId: uint64
origAndMsgId: seq[byte]
proc sendPacket(
mixProto: MixProtocol, multiAddrs: string, sphinxPacket: seq[byte], label: string
mixProto: MixProtocol,
multiAddrs: string,
sphinxPacket: seq[byte],
config: SendPacketConfig,
) {.async: (raises: []).} =
let label = $config.logType
# Send the wrapped message to the first mix node in the selected path
let parts = multiAddrs.split("/p2p/")
if parts.len != 2:
@@ -362,10 +486,24 @@ proc sendPacket(
mix_messages_error.inc(labelValues = [label, "NON_RECOVERABLE"])
return
when defined(enable_mix_benchmarks):
if config.logType == Entry:
benchmarkLog "Sender",
mixProto.switch.peerInfo.peerId,
config.startTime,
config.msgId,
config.orig,
Opt.none(PeerId),
Opt.some(firstMixPeerId)
var nextHopConn: Connection
try:
nextHopConn =
await mixProto.switch.dial(firstMixPeerId, @[firstMixAddr], @[MixProtocolID])
when defined(enable_mix_benchmarks):
await nextHopConn.writeLp(config.origAndMsgId)
await nextHopConn.writeLp(sphinxPacket)
mix_messages_forwarded.inc(labelValues = ["Entry"])
except CatchableError as e:
@@ -420,8 +558,18 @@ proc anonymizeLocalProtocolSend*(
destination: MixDestination,
numSurbs: uint8,
) {.async.} =
var config = SendPacketConfig(logType: Entry)
when defined(enable_mix_benchmarks):
config.startTime = Moment.now()
let (multiAddr, _, _, _, _) = getMixNodeInfo(mixProto.mixNodeInfo)
when defined(enable_mix_benchmarks):
# Assumes a fixed gossipsub message layout of 100
config.orig = uint64.fromBytesLE(msg[5 ..< 13])
config.msgId = uint64.fromBytesLE(msg[13 ..< 21])
config.origAndMsgId = msg[5 ..< 21]
mix_messages_recvd.inc(labelValues = ["Entry"])
var
@@ -521,7 +669,7 @@ proc anonymizeLocalProtocolSend*(
return
# Send the wrapped message to the first mix node in the selected path
await mixProto.sendPacket(multiAddrs[0], sphinxPacket, "Entry")
await mixProto.sendPacket(multiAddrs[0], sphinxPacket, config)
proc reply(
mixProto: MixProtocol, surb: SURB, msg: seq[byte]
@@ -539,7 +687,7 @@ proc reply(
error "Use SURB error", err = error
return
await mixProto.sendPacket(multiAddr, sphinxPacket, "Reply")
await mixProto.sendPacket(multiAddr, sphinxPacket, SendPacketConfig(logType: Reply))
proc new*(
T: typedesc[MixProtocol],