mirror of
https://github.com/vacp2p/mix.git
synced 2026-01-09 21:18:01 -05:00
add: upstream the node tester.
* Add: Bring in test-node infrastructure from dst-gossipsub-test-node * fix: Log metadata info correctly * fix: message length issue * add: check at massively scaled msgid vals * fix: correctly report origin timestamu * fix: Log the origin-time correctly * refactor: comment out byte-reader
This commit is contained in:
75
Dockerfile
Normal file
75
Dockerfile
Normal file
@@ -0,0 +1,75 @@
|
||||
# Build nim
|
||||
FROM debian:bookworm-slim AS build_nim
|
||||
|
||||
WORKDIR /
|
||||
|
||||
ENV DEBIAN_FRONTEND=noninteractive
|
||||
|
||||
RUN apt-get update && apt install -y git curl build-essential bash ca-certificates libssl-dev
|
||||
|
||||
ENV NIM_VERSION=version-2-0
|
||||
|
||||
RUN curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_nim.sh
|
||||
|
||||
RUN env MAKE="make -j$(nproc)" \
|
||||
ARCH_OVERRIDE=amd64 \
|
||||
NIM_COMMIT=$NIM_VERSION \
|
||||
QUICK_AND_DIRTY_COMPILER=1 \
|
||||
QUICK_AND_DIRTY_NIMBLE=1 \
|
||||
CC=gcc \
|
||||
bash build_nim.sh nim csources dist/nimble NimBinaries
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Build the app
|
||||
FROM debian:bookworm-slim AS build_app
|
||||
|
||||
WORKDIR /node
|
||||
|
||||
# Copy nim
|
||||
COPY --from=build_nim /nim /nim
|
||||
|
||||
ENV PATH="/nim/bin:${PATH}"
|
||||
|
||||
ENV DEBIAN_FRONTEND=noninteractive
|
||||
|
||||
RUN apt-get update && apt install -y git build-essential bash ca-certificates libssl-dev
|
||||
|
||||
# Configure git and install dependencies
|
||||
RUN git config --global http.sslVerify false
|
||||
|
||||
# Copy only files needed to install Nimble deps (optimizes layer caching)
|
||||
COPY mix.nimble .
|
||||
RUN nimble install -y --depsOnly.
|
||||
|
||||
# Copy full source AFTER deps are cached
|
||||
COPY . .
|
||||
|
||||
# Compile the Nim application
|
||||
RUN nimble c -d:chronicles_colors=None --threads:on -d:metrics -d:libp2p_network_protocols_metrics -d:release main
|
||||
|
||||
# =============================================================================
|
||||
# Run the app
|
||||
FROM debian:bookworm AS prod
|
||||
|
||||
ENV DEBIAN_FRONTEND=noninteractive
|
||||
|
||||
RUN apt update && apt -y install cron libpcre3 libssl-dev
|
||||
|
||||
# Set the working directory
|
||||
WORKDIR /node
|
||||
|
||||
# Copy the compiled binary from the build stage
|
||||
COPY --from=build_app /node/main /node/main
|
||||
|
||||
COPY cron_runner.sh .
|
||||
|
||||
RUN chmod +x cron_runner.sh
|
||||
RUN chmod +x main
|
||||
|
||||
EXPOSE 5000 8008
|
||||
|
||||
ENV FILEPATH=/data
|
||||
VOLUME ["/data"]
|
||||
|
||||
ENTRYPOINT ["./cron_runner.sh"]
|
||||
22
cron_runner.sh
Normal file
22
cron_runner.sh
Normal file
@@ -0,0 +1,22 @@
|
||||
#!/bin/bash
|
||||
|
||||
if [ "$#" -ne 2 ]; then
|
||||
echo "Usage: $0 <minutes> <hours>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
minutes="$1"
|
||||
hours="$2"
|
||||
|
||||
cron_expression="$minutes $hours * * *"
|
||||
|
||||
cron_job_file="/etc/cron.d/my-cron-job"
|
||||
echo -e "$cron_expression /node/main > /proc/1/fd/1 2>&1 \n" > "$cron_job_file"
|
||||
|
||||
echo "Cron job file created at $cron_job_file"
|
||||
|
||||
env >> /etc/environment
|
||||
|
||||
crontab /etc/cron.d/my-cron-job
|
||||
|
||||
cron -f
|
||||
330
main.nim
Normal file
330
main.nim
Normal file
@@ -0,0 +1,330 @@
|
||||
import chronos, chronicles, hashes, math, sequtils, strutils, tables, os
|
||||
import nimcrypto/sysrand
|
||||
import metrics, metrics/chronos_httpserver
|
||||
import stew/[byteutils, endians2]
|
||||
import
|
||||
std/[
|
||||
enumerate, options, strformat, sysrand, os, sequtils, dirs, parseutils, random,
|
||||
posix, algorithm,
|
||||
]
|
||||
import node
|
||||
import json
|
||||
import
|
||||
mix/[
|
||||
entry_connection, entry_connection_callbacks, mix_node, mix_protocol, protocol,
|
||||
utils,
|
||||
]
|
||||
import
|
||||
libp2p,
|
||||
libp2p/[
|
||||
crypto/secp,
|
||||
multiaddress,
|
||||
builders,
|
||||
muxers/mplex/lpchannel,
|
||||
protocols/pubsub/gossipsub,
|
||||
protocols/pubsub/pubsubpeer,
|
||||
protocols/pubsub/rpc/messages,
|
||||
transports/tcptransport,
|
||||
]
|
||||
from times import getTime, toUnix, fromUnix, `-`, initTime, `$`, inMilliseconds
|
||||
from nativesockets import getHostname
|
||||
|
||||
proc createSwitch(id, port: int, isMix: bool, filePath: string): Switch =
|
||||
{.gcsafe.}:
|
||||
var
|
||||
multiAddrStr: string
|
||||
libp2pPubKey: SkPublicKey
|
||||
libp2pPrivKey: SkPrivateKey
|
||||
|
||||
if isMix:
|
||||
discard initializeMixNodes(1, port)
|
||||
|
||||
let mixNodeInfo = getMixNodeInfo(mixNodes[0])
|
||||
multiAddrStr = mixNodeInfo[0]
|
||||
libp2pPubKey = mixNodeInfo[3]
|
||||
libp2pPrivKey = mixNodeInfo[4]
|
||||
else:
|
||||
discard initializeNodes(1, port)
|
||||
|
||||
(multiAddrStr, libp2pPubKey, libp2pPrivKey) = getNodeInfo(nodes[0])
|
||||
|
||||
let multiAddrParts = multiAddrStr.split("/p2p/")
|
||||
let multiAddr = MultiAddress.init(multiAddrParts[0]).valueOr:
|
||||
error "Failed to initialize MultiAddress", err = error
|
||||
return
|
||||
|
||||
let switch = SwitchBuilder
|
||||
.new()
|
||||
.withPrivateKey(PrivateKey(scheme: Secp256k1, skkey: libp2pPrivKey))
|
||||
.withAddress(multiAddr)
|
||||
.withRng(crypto.newRng())
|
||||
.withMplex()
|
||||
.withTcpTransport()
|
||||
.withNoise()
|
||||
.build()
|
||||
|
||||
if switch.isNil:
|
||||
warn "Failed to set up node", nodeId = id
|
||||
return
|
||||
|
||||
let addresses = getInterfaces().filterIt(it.name == "eth0").mapIt(it.addresses)
|
||||
if addresses.len < 1 or addresses[0].len < 1:
|
||||
error "Can't find local ip!"
|
||||
return
|
||||
|
||||
let
|
||||
externalAddr = ($addresses[0][0].host).split(":")[0]
|
||||
peerId = switch.peerInfo.peerId
|
||||
externalMultiAddr = fmt"/ip4/{externalAddr}/tcp/{port}/p2p/{peerId}"
|
||||
|
||||
if isMix:
|
||||
discard initMixMultiAddrByIndex(0, externalMultiAddr)
|
||||
let writeNodeRes =
|
||||
writeMixNodeInfoToFile(mixNodes[0], id, filePath / fmt"nodeInfo")
|
||||
if writeNodeRes.isErr:
|
||||
error "Failed to write mix info to file", nodeId = id, err = writeNodeRes.error
|
||||
return
|
||||
|
||||
let nodePubInfo = getMixPubInfoByIndex(0).valueOr:
|
||||
error "Get mix pub info by index error", err = error
|
||||
return
|
||||
|
||||
let writeMixPubInfoRes =
|
||||
writeMixPubInfoToFile(nodePubInfo, id, filePath / fmt"pubInfo")
|
||||
if writeMixPubInfoRes.isErr:
|
||||
error "Failed to write mix pub info to file", nodeId = id
|
||||
return
|
||||
|
||||
let pubInfo = initPubInfo(externalMultiAddr, libp2pPubKey)
|
||||
|
||||
let writePubInfoRes = writePubInfoToFile(pubInfo, id, filePath / fmt"libp2pPubInfo")
|
||||
if writePubInfoRes.isErr:
|
||||
error "Failed to write pub info to file", nodeId = id
|
||||
return
|
||||
|
||||
return switch
|
||||
|
||||
proc msgIdProvider(m: Message): Result[MessageId, ValidationResult] =
|
||||
return ok(($m.data.hash).toBytes())
|
||||
|
||||
proc startMetricsServer(
|
||||
serverIp: IpAddress, serverPort: Port
|
||||
): Result[MetricsHttpServerRef, string] =
|
||||
info "Starting metrics HTTP server", serverIp = $serverIp, serverPort = $serverPort
|
||||
|
||||
let metricsServerRes = MetricsHttpServerRef.new($serverIp, serverPort)
|
||||
if metricsServerRes.isErr():
|
||||
return err("metrics HTTP server start failed: " & $metricsServerRes.error)
|
||||
|
||||
let server = metricsServerRes.value
|
||||
try:
|
||||
waitFor server.start()
|
||||
except CatchableError:
|
||||
return err("metrics HTTP server start failed: " & getCurrentExceptionMsg())
|
||||
|
||||
info "Metrics HTTP server started", serverIp = $serverIp, serverPort = $serverPort
|
||||
ok(metricsServerRes.value)
|
||||
|
||||
const uidLen = 32
|
||||
|
||||
func byteToHex(b: byte): string =
|
||||
b.toHex(2)
|
||||
|
||||
proc main() {.async.} =
|
||||
randomize()
|
||||
|
||||
let
|
||||
hostname = getHostname()
|
||||
node_count = parseInt(getEnv("NODES"))
|
||||
msg_rate = parseInt(getEnv("MSGRATE"))
|
||||
msg_size = parseInt(getEnv("MSGSIZE"))
|
||||
publisherCount = parseInt(getEnv("PUBLISHERS"))
|
||||
mixCount = publisherCount # Publishers will be the mix nodes for now
|
||||
connectTo = parseInt(getEnv("CONNECTTO"))
|
||||
filePath = getEnv("FILEPATH", "./")
|
||||
rng = libp2p.newRng()
|
||||
|
||||
if publisherCount > node_count:
|
||||
error "Publisher count is greater than total node count"
|
||||
return
|
||||
|
||||
info "Hostname", host = hostname
|
||||
let myId = getHostname().split('-')[^1].parseInt()
|
||||
info "ID", id = myId
|
||||
|
||||
let
|
||||
isPublisher = myId < publisherCount
|
||||
# [0..<publisherCount] contains all the publishers
|
||||
isMix = isPublisher # Publishers will be the mix nodes for now
|
||||
myport = parseInt(getEnv("PORT", "5000"))
|
||||
switch = createSwitch(myId, myport, isMix, filePath)
|
||||
|
||||
await sleepAsync(10.seconds)
|
||||
|
||||
var gossipSub: GossipSub
|
||||
|
||||
if isMix:
|
||||
let mixProto = MixProtocol.new(myId, mixCount, switch, filePath).expect(
|
||||
"could not instantiate mix"
|
||||
)
|
||||
|
||||
let mixConn = proc(
|
||||
destAddr: Option[MultiAddress], destPeerId: PeerId, codec: string
|
||||
): Connection {.gcsafe, raises: [].} =
|
||||
try:
|
||||
return mixProto.createMixEntryConnection(destAddr, destPeerId, codec)
|
||||
except CatchableError as e:
|
||||
error "Error during execution of MixEntryConnection callback", err = e.msg
|
||||
return nil
|
||||
|
||||
let mixPeerSelect = proc(
|
||||
allPeers: HashSet[PubSubPeer],
|
||||
directPeers: HashSet[PubSubPeer],
|
||||
meshPeers: HashSet[PubSubPeer],
|
||||
fanoutPeers: HashSet[PubSubPeer],
|
||||
): HashSet[PubSubPeer] {.gcsafe, raises: [].} =
|
||||
try:
|
||||
return mixPeerSelection(allPeers, directPeers, meshPeers, fanoutPeers)
|
||||
except CatchableError as e:
|
||||
error "Error during execution of MixPeerSelection callback", err = e.msg
|
||||
return initHashSet[PubSubPeer]()
|
||||
|
||||
gossipSub = GossipSub.init(
|
||||
switch = switch,
|
||||
triggerSelf = true,
|
||||
msgIdProvider = msgIdProvider,
|
||||
verifySignature = false,
|
||||
anonymize = true,
|
||||
customConnCallbacks = some(
|
||||
CustomConnectionCallbacks(
|
||||
customConnCreationCB: mixConn, peerSelectionCB: mixPeerSelect
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
switch.mount(mixProto)
|
||||
else:
|
||||
gossipSub = GossipSub.init(
|
||||
switch = switch,
|
||||
triggerSelf = true,
|
||||
msgIdProvider = msgIdProvider,
|
||||
verifySignature = false,
|
||||
anonymize = true,
|
||||
)
|
||||
|
||||
# Metrics
|
||||
info "Starting metrics HTTP server"
|
||||
let metricsServer = startMetricsServer(parseIpAddress("0.0.0.0"), Port(8008))
|
||||
|
||||
gossipSub.parameters.floodPublish = true
|
||||
gossipSub.parameters.opportunisticGraftThreshold = -10000
|
||||
gossipSub.parameters.heartbeatInterval = 1.seconds
|
||||
gossipSub.parameters.pruneBackoff = 60.seconds
|
||||
gossipSub.parameters.gossipFactor = 0.25
|
||||
gossipSub.parameters.d = 6
|
||||
gossipSub.parameters.dLow = 4
|
||||
gossipSub.parameters.dHigh = 8
|
||||
gossipSub.parameters.dScore = 6
|
||||
gossipSub.parameters.dOut = 6 div 2
|
||||
gossipSub.parameters.dLazy = 6
|
||||
gossipSub.topicParams["test"] = TopicParams(
|
||||
topicWeight: 1,
|
||||
firstMessageDeliveriesWeight: 1,
|
||||
firstMessageDeliveriesCap: 30,
|
||||
firstMessageDeliveriesDecay: 0.9,
|
||||
)
|
||||
|
||||
proc messageHandler(topic: string, data: seq[byte]) {.async.} =
|
||||
if data.len < 16:
|
||||
warn "Message too short"
|
||||
return
|
||||
|
||||
let
|
||||
timestampNs = uint64.fromBytesLE(data[0 ..< 8])
|
||||
msgId = uint64.fromBytesLE(data[8 ..< 16])
|
||||
sentMoment = nanoseconds(int64(timestampNs))
|
||||
sentNanosecs = nanoseconds(sentMoment - seconds(sentMoment.seconds))
|
||||
sentDate = initTime(sentMoment.seconds, sentNanosecs)
|
||||
recvTime = getTime()
|
||||
delay = recvTime - sentDate
|
||||
|
||||
info "Received message", msgId = msgId, sentAt = timestampNs, delayMs = delay.inMilliseconds()
|
||||
|
||||
proc messageValidator(
|
||||
topic: string, msg: Message
|
||||
): Future[ValidationResult] {.async.} =
|
||||
return ValidationResult.Accept
|
||||
|
||||
gossipSub.subscribe("test", messageHandler)
|
||||
gossipSub.addValidator(["test"], messageValidator)
|
||||
switch.mount(gossipSub)
|
||||
await switch.start()
|
||||
|
||||
info "Listening", addrs = switch.peerInfo.addrs
|
||||
|
||||
let sleeptime = 20
|
||||
info "Waiting for: ", time = sleeptime
|
||||
|
||||
await sleepAsync(sleeptime.seconds)
|
||||
|
||||
var connected = 0
|
||||
var addrs: seq[MultiAddress]
|
||||
|
||||
for i in 0 ..< node_count:
|
||||
if i == myId:
|
||||
continue
|
||||
|
||||
let pubInfo = readPubInfoFromFile(i, filePath / fmt"libp2pPubInfo").expect(
|
||||
"should be able to read pubinfo"
|
||||
)
|
||||
let (multiAddr, _) = getPubInfo(pubInfo)
|
||||
let ma = MultiAddress.init(multiAddr).expect("should be a multiaddr")
|
||||
addrs.add ma
|
||||
|
||||
rng.shuffle(addrs)
|
||||
var index = 0
|
||||
while true:
|
||||
if connected >= connectTo:
|
||||
break
|
||||
while true:
|
||||
try:
|
||||
info "Trying to connect", addrs = addrs[index]
|
||||
let peerId =
|
||||
await switch.connect(addrs[index], allowUnknownPeerId = true).wait(5.seconds)
|
||||
connected.inc()
|
||||
index.inc()
|
||||
info "Connected!"
|
||||
break
|
||||
except CatchableError as exc:
|
||||
error "Failed to dial", err = exc.msg
|
||||
info "Waiting 15 seconds..."
|
||||
await sleepAsync(15.seconds)
|
||||
|
||||
await sleepAsync(2.seconds)
|
||||
|
||||
info "Mesh size", meshSize = gossipSub.mesh.getOrDefault("test").len
|
||||
|
||||
info "Publishing turn", id = myId
|
||||
|
||||
let count = 50
|
||||
for msg in high(int) - count ..< high(int): #client.param(int, "message_count"):
|
||||
if msg mod publisherCount == myId:
|
||||
# info "Sending message", time = times.getTime()
|
||||
let now = getTime()
|
||||
let timestampNs = now.toUnix().int64 * 1_000_000_000 + times.nanosecond(now).int64
|
||||
let msgId = uint64(msg)
|
||||
|
||||
var payload: seq[byte]
|
||||
payload.add(toBytesLE(uint64(timestampNs)))
|
||||
payload.add(toBytesLE(msgId))
|
||||
payload.add(newSeq[byte](msg_size - 16)) # Fill the rest with padding
|
||||
|
||||
info "Publishing", msgId = msgId, timestamp = timestampNs
|
||||
|
||||
let pub_res = await gossipSub.publish("test", payload, useCustomConn = true)
|
||||
if pub_res <= 0:
|
||||
error "publish fail", res = pub_res
|
||||
doAssert(pub_res > 0)
|
||||
await sleepAsync(msg_rate)
|
||||
|
||||
waitFor(main())
|
||||
@@ -1,9 +1,11 @@
|
||||
packageName = "mix"
|
||||
version = "0.1.0"
|
||||
author = "Akshaya"
|
||||
description = "A custom Mix Protocol"
|
||||
license = "MIT"
|
||||
|
||||
# Dependencies
|
||||
requires "ggplotnim"
|
||||
requires "stew >= 0.3.0"
|
||||
requires "chronos >= 4.0.3"
|
||||
requires "https://github.com/AkshayaMani/nim-libp2p#gossipsub-custom-conn"
|
||||
@@ -32,4 +34,4 @@ task test, "Run the test suite":
|
||||
runTest("test_serialization")
|
||||
runTest("test_sphinx")
|
||||
runTest("test_tag_manager")
|
||||
runTest("test_utils")
|
||||
runTest("test_utils")
|
||||
|
||||
@@ -57,6 +57,12 @@ proc cryptoRandomInt(max: int): Result[int, string] =
|
||||
proc toUnixNs(t: Time): int64 =
|
||||
t.toUnix().int64 * 1_000_000_000 + times.nanosecond(t).int64
|
||||
|
||||
# func byteToHex(b: byte): string =
|
||||
# b.toHex(2)
|
||||
# func bytesToHex(data: seq[byte]): string =
|
||||
# data.map(byteToHex).join(" ")
|
||||
|
||||
|
||||
proc handleMixNodeConnection(
|
||||
mixProto: MixProtocol, conn: Connection
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
@@ -65,7 +71,7 @@ proc handleMixNodeConnection(
|
||||
metadata: seq[byte]
|
||||
fromPeerID: string
|
||||
try:
|
||||
metadata = await conn.readLp(16)
|
||||
metadata = await conn.readLp(21)
|
||||
receivedBytes = await conn.readLp(packetSize)
|
||||
fromPeerID = shortLog(conn.peerId)
|
||||
except Exception as e:
|
||||
@@ -98,11 +104,11 @@ proc handleMixNodeConnection(
|
||||
error "Failed to initialize my PeerId", err = error
|
||||
return
|
||||
|
||||
let
|
||||
orig = uint64.fromBytesLE(metadata[0 ..< 8])
|
||||
msgid = uint64.fromBytesLE(metadata[8 ..< 16])
|
||||
myPeerId = shortLog(ownPeerId)
|
||||
|
||||
let
|
||||
orig = uint64.fromBytesLE(metadata[5 ..< 13])
|
||||
msgid = uint64.fromBytesLE(metadata[13 ..< 21])
|
||||
myPeerId = shortLog(ownPeerId)
|
||||
case status
|
||||
of Exit:
|
||||
if (nextHop != Hop()) or (delay != @[]):
|
||||
@@ -142,7 +148,6 @@ proc handleMixNodeConnection(
|
||||
info "Exit", msgid=msgid, fromPeerID=fromPeerID, toPeerID="None", myPeerId=myPeerId, orig=orig, current=startTimeNs, procDelay=processingDelay
|
||||
|
||||
of Success:
|
||||
trace "# Intermediate: ", multiAddr = multiAddr
|
||||
# Add delay
|
||||
let delayMillis = (delay[0].int shl 8) or delay[1].int
|
||||
await sleepAsync(milliseconds(delayMillis))
|
||||
@@ -299,8 +304,9 @@ proc anonymizeLocalProtocolSend*(
|
||||
return
|
||||
|
||||
let
|
||||
orig = uint64.fromBytesLE(msg[0 ..< 8])
|
||||
msgid = uint64.fromBytesLE(msg[8 ..< 16])
|
||||
orig = uint64.fromBytesLE(msg[5 ..< 13])
|
||||
# whats happening bytes 8..13
|
||||
msgid = uint64.fromBytesLE(msg[13 ..< 21])
|
||||
toPeerID = shortLog(firstMixPeerId)
|
||||
myPeerId = shortLog(ownPeerId)
|
||||
endTime = getTime()
|
||||
@@ -313,7 +319,7 @@ proc anonymizeLocalProtocolSend*(
|
||||
try:
|
||||
nextHopConn =
|
||||
await mixProto.switch.dial(firstMixPeerId, @[firstMixAddr], @[MixProtocolID])
|
||||
await nextHopConn.writeLp(msg[0 ..< 16])
|
||||
await nextHopConn.writeLp(msg[0 ..< 21])
|
||||
await nextHopConn.writeLp(sphinxPacket)
|
||||
except CatchableError as e:
|
||||
error "Failed to send message to next hop: ", err = e.msg
|
||||
|
||||
32
mixrunner.sh
Executable file
32
mixrunner.sh
Executable file
@@ -0,0 +1,32 @@
|
||||
#! /usr/bin/env nix-shell
|
||||
#! nix-shell -i bash -p bash
|
||||
|
||||
N="$1"
|
||||
DATADIR="$2"
|
||||
|
||||
if [[ -z "$N" || -z "$DATADIR" ]]; then
|
||||
echo "usage: $0 <N> <datadir>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
mkdir -p "$DATADIR"
|
||||
if [[ "$(ls -A "$DATADIR")" ]]; then
|
||||
echo "error: $DATADIR is not empty"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
for i in $(seq 0 $((N-1))); do
|
||||
docker run \
|
||||
-d \
|
||||
--name node-$i \
|
||||
--hostname node-$i \
|
||||
-v "$DATADIR":/data \
|
||||
-e NODES="$N" \
|
||||
-e MSGRATE=10 \
|
||||
-e MSGSIZE=20 \
|
||||
-e PUBLISHERS=5 \
|
||||
-e CONNECTTO=4 \
|
||||
-e LOG_LEVEL=TRACE \
|
||||
--entrypoint /node/main \
|
||||
mixrunner
|
||||
done
|
||||
164
node.nim
Normal file
164
node.nim
Normal file
@@ -0,0 +1,164 @@
|
||||
import strformat, os
|
||||
import std/streams
|
||||
import mix/[config, utils]
|
||||
import libp2p/[crypto/crypto, crypto/secp, multiaddress, peerid]
|
||||
|
||||
const NodeInfoSize* = addrSize + (SkRawPublicKeySize + SkRawPrivateKeySize)
|
||||
const PubInfoSize* = addrSize + SkRawPublicKeySize
|
||||
|
||||
type NodeInfo* = object
|
||||
multiAddr: string
|
||||
libp2pPubKey: SkPublicKey
|
||||
libp2pPrivKey: SkPrivateKey
|
||||
|
||||
var nodes*: seq[NodeInfo] = @[]
|
||||
|
||||
proc initNodeInfo*(
|
||||
multiAddr: string, libp2pPubKey: SkPublicKey, libp2pPrivKey: SkPrivateKey
|
||||
): NodeInfo =
|
||||
NodeInfo(
|
||||
multiAddr: multiAddr, libp2pPubKey: libp2pPubKey, libp2pPrivKey: libp2pPrivKey
|
||||
)
|
||||
|
||||
proc getNodeInfo*(info: NodeInfo): (string, SkPublicKey, SkPrivateKey) =
|
||||
(info.multiAddr, info.libp2pPubKey, info.libp2pPrivKey)
|
||||
|
||||
proc serializeNodeInfo*(nodeInfo: NodeInfo): Result[seq[byte], string] =
|
||||
let addrBytes = multiAddrToBytes(nodeInfo.multiAddr).valueOr:
|
||||
return err("Error in multiaddress conversion to bytes: " & error)
|
||||
|
||||
let
|
||||
libp2pPubKeyBytes = nodeInfo.libp2pPubKey.getBytes()
|
||||
libp2pPrivKeyBytes = nodeInfo.libp2pPrivKey.getBytes()
|
||||
|
||||
return ok(addrBytes & libp2pPubKeyBytes & libp2pPrivKeyBytes)
|
||||
|
||||
proc deserializeNodeInfo*(data: openArray[byte]): Result[NodeInfo, string] =
|
||||
if len(data) != NodeInfoSize:
|
||||
return err("Serialized node info must be exactly " & $NodeInfoSize & " bytes")
|
||||
|
||||
let multiAddr = bytesToMultiAddr(data[0 .. addrSize - 1]).valueOr:
|
||||
return err("Error in multiaddress conversion to bytes: " & error)
|
||||
|
||||
let libp2pPubKey = SkPublicKey.init(
|
||||
data[addrSize .. addrSize + SkRawPublicKeySize - 1]
|
||||
).valueOr:
|
||||
return err("Failed to initialize libp2p public key")
|
||||
|
||||
let libp2pPrivKey = SkPrivateKey.init(data[addrSize + SkRawPublicKeySize ..^ 1]).valueOr:
|
||||
return err("Failed to initialize libp2p private key")
|
||||
|
||||
ok(
|
||||
NodeInfo(
|
||||
multiAddr: multiAddr, libp2pPubKey: libp2pPubKey, libp2pPrivKey: libp2pPrivKey
|
||||
)
|
||||
)
|
||||
|
||||
type PubInfo* = object
|
||||
multiAddr: string
|
||||
libp2pPubKey: SkPublicKey
|
||||
|
||||
proc initPubInfo*(multiAddr: string, libp2pPubKey: SkPublicKey): PubInfo =
|
||||
PubInfo(multiAddr: multiAddr, libp2pPubKey: libp2pPubKey)
|
||||
|
||||
proc getPubInfo*(info: PubInfo): (string, SkPublicKey) =
|
||||
(info.multiAddr, info.libp2pPubKey)
|
||||
|
||||
proc serializePubInfo*(nodeInfo: PubInfo): Result[seq[byte], string] =
|
||||
let addrBytes = multiAddrToBytes(nodeInfo.multiAddr).valueOr:
|
||||
return err("Error in multiaddress conversion to bytes: " & error)
|
||||
let libp2pPubKeyBytes = nodeInfo.libp2pPubKey.getBytes()
|
||||
|
||||
return ok(addrBytes & libp2pPubKeyBytes)
|
||||
|
||||
proc deserializePubInfo*(data: openArray[byte]): Result[PubInfo, string] =
|
||||
if len(data) != PubInfoSize:
|
||||
return err("Serialized public info must be exactly " & $PubInfoSize & " bytes")
|
||||
|
||||
let multiAddr = bytesToMultiAddr(data[0 .. addrSize - 1]).valueOr:
|
||||
return err("Error in bytes to multiaddress conversion: " & error)
|
||||
|
||||
let libp2pPubKey = SkPublicKey.init(data[addrSize ..^ 1]).valueOr:
|
||||
return err("Failed to initialize libp2p public key: ")
|
||||
|
||||
ok(PubInfo(multiAddr: multiAddr, libp2pPubKey: libp2pPubKey))
|
||||
|
||||
proc writePubInfoToFile*(
|
||||
node: PubInfo, index: int, pubInfoFolderPath: string = "./libp2pPubInfo"
|
||||
): Result[void, string] =
|
||||
if not dirExists(pubInfoFolderPath):
|
||||
createDir(pubInfoFolderPath)
|
||||
let filename = pubInfoFolderPath / fmt"node_{index}"
|
||||
var file = newFileStream(filename, fmWrite)
|
||||
if file == nil:
|
||||
return err("Failed to create file stream for " & filename)
|
||||
defer:
|
||||
file.close()
|
||||
|
||||
let serializedData = serializePubInfo(node).valueOr:
|
||||
return err("Failed to serialize pub info: " & error)
|
||||
|
||||
file.writeData(addr serializedData[0], serializedData.len)
|
||||
return ok()
|
||||
|
||||
proc readPubInfoFromFile*(
|
||||
index: int, pubInfoFolderPath: string = "./libp2pPubInfo"
|
||||
): Result[PubInfo, string] =
|
||||
try:
|
||||
let filename = pubInfoFolderPath / fmt"node_{index}"
|
||||
if not fileExists(filename):
|
||||
return err("File does not exist")
|
||||
var file = newFileStream(filename, fmRead)
|
||||
if file == nil:
|
||||
return err(
|
||||
"Failed to open file: " & filename &
|
||||
". Check permissions or if the path is correct."
|
||||
)
|
||||
defer:
|
||||
file.close()
|
||||
let data = file.readAll()
|
||||
if data.len != PubInfoSize:
|
||||
return err(
|
||||
"Invalid data size for NodeInfo: expected " & $NodeInfoSize & " bytes, but got " &
|
||||
$(data.len) & " bytes."
|
||||
)
|
||||
let dPubInfo = deserializePubInfo(cast[seq[byte]](data)).valueOr:
|
||||
return err("Pub info deserialize error: " & error)
|
||||
return ok(dPubInfo)
|
||||
except IOError as e:
|
||||
return err("File read error: " & $e.msg)
|
||||
except OSError as e:
|
||||
return err("OS error: " & $e.msg)
|
||||
|
||||
proc deletePubInfoFolder*(pubInfoFolderPath: string = "./libp2pPubInfo") =
|
||||
if dirExists(pubInfoFolderPath):
|
||||
removeDir(pubInfoFolderPath)
|
||||
|
||||
proc getPubInfoByIndex*(index: int): Result[PubInfo, string] =
|
||||
if index < 0 or index >= nodes.len:
|
||||
return err("Index must be between 0 and " & $(nodes.len))
|
||||
ok(
|
||||
PubInfo(multiAddr: nodes[index].multiAddr, libp2pPubKey: nodes[index].libp2pPubKey)
|
||||
)
|
||||
|
||||
proc generateNodes(count: int, basePort: int = 4242): Result[seq[NodeInfo], string] =
|
||||
var nodes = newSeq[NodeInfo](count)
|
||||
for i in 0 ..< count:
|
||||
let
|
||||
rng = newRng()
|
||||
keyPair = SkKeyPair.random(rng[])
|
||||
libp2pPrivKey = keyPair.seckey
|
||||
libp2pPubKey = keyPair.pubkey
|
||||
pubKeyProto = PublicKey(scheme: Secp256k1, skkey: libp2pPubKey)
|
||||
peerId = PeerId.init(pubKeyProto).get()
|
||||
multiAddr = fmt"/ip4/0.0.0.0/tcp/{basePort + i}/p2p/{peerId}"
|
||||
|
||||
nodes[i] = NodeInfo(
|
||||
multiAddr: multiAddr, libp2pPubKey: libp2pPubKey, libp2pPrivKey: libp2pPrivKey
|
||||
)
|
||||
|
||||
ok(nodes)
|
||||
|
||||
proc initializeNodes*(count: int, basePort: int = 4242): Result[void, string] =
|
||||
nodes = generateNodes(count, basePort).valueOr:
|
||||
return err("Node initialization error: " & error)
|
||||
Reference in New Issue
Block a user