Compare commits

..

8 Commits

Author SHA1 Message Date
alrevuelta
5782cf5901 Fix typo + break outer loop 2023-09-01 12:12:37 +02:00
alrevuelta
40648c106f Add validationStrategy Paralel/Seq + ordered validators 2023-09-01 11:35:49 +02:00
Jacek Sieka
b76bac752f avoid importing ecnist when not needed (#942) 2023-08-30 11:39:48 +02:00
diegomrsantos
c6aa085e98 Prevent concurrent IWANT of the same message (#943) 2023-08-21 16:34:24 +02:00
Ludovic Chenut
e03547ea3e Perf protocol (#925) 2023-08-14 17:25:55 +02:00
diegomrsantos
f80ce3133c Bandwidth estimate as a parameter (#941) 2023-08-14 17:03:46 +02:00
Tanguy
d6263bf751 nim-websock new version compatibility (#939) 2023-08-02 17:10:31 +02:00
Tanguy
56c23a286a Add specs crypto tests (#938) 2023-08-01 15:28:38 +02:00
17 changed files with 542 additions and 5281 deletions

View File

@@ -65,11 +65,13 @@ when supported(PKScheme.Ed25519):
import ed25519/ed25519
when supported(PKScheme.Secp256k1):
import secp
when supported(PKScheme.ECDSA):
import ecnist
# We are still importing `ecnist` because, it is used for SECIO handshake,
# but it will be impossible to create ECNIST keys or import ECNIST keys.
# These used to be declared in `crypto` itself
export ecnist.ephemeral, ecnist.ECDHEScheme
import ecnist, bearssl/rand, bearssl/hash as bhash
import bearssl/rand, bearssl/hash as bhash
import ../protobuf/minprotobuf, ../vbuffer, ../multihash, ../multicodec
import nimcrypto/[rijndael, twofish, sha2, hash, hmac]
# We use `ncrutils` for constant-time hexadecimal encoding/decoding procedures.
@@ -86,8 +88,6 @@ type
Sha256,
Sha512
ECDHEScheme* = EcCurveKind
PublicKey* = object
case scheme*: PKScheme
of PKScheme.RSA:
@@ -879,34 +879,6 @@ proc mac*(secret: Secret, id: int): seq[byte] {.inline.} =
offset += secret.ivsize + secret.keysize
copyMem(addr result[0], unsafeAddr secret.data[offset], secret.macsize)
proc ephemeral*(
scheme: ECDHEScheme,
rng: var HmacDrbgContext): CryptoResult[EcKeyPair] =
## Generate ephemeral keys used to perform ECDHE.
var keypair: EcKeyPair
if scheme == Secp256r1:
keypair = ? EcKeyPair.random(Secp256r1, rng).orError(KeyError)
elif scheme == Secp384r1:
keypair = ? EcKeyPair.random(Secp384r1, rng).orError(KeyError)
elif scheme == Secp521r1:
keypair = ? EcKeyPair.random(Secp521r1, rng).orError(KeyError)
ok(keypair)
proc ephemeral*(
scheme: string, rng: var HmacDrbgContext): CryptoResult[EcKeyPair] =
## Generate ephemeral keys used to perform ECDHE using string encoding.
##
## Currently supported encoding strings are P-256, P-384, P-521, if encoding
## string is not supported P-521 key will be generated.
if scheme == "P-256":
ephemeral(Secp256r1, rng)
elif scheme == "P-384":
ephemeral(Secp384r1, rng)
elif scheme == "P-521":
ephemeral(Secp521r1, rng)
else:
ephemeral(Secp521r1, rng)
proc getOrder*(remotePubkey, localNonce: openArray[byte],
localPubkey, remoteNonce: openArray[byte]): CryptoResult[int] =
## Compare values and calculate `order` parameter.

View File

@@ -994,3 +994,33 @@ proc verify*[T: byte|char](sig: EcSignature, message: openArray[T],
# Clear context with initial value
kv.init(addr hc.vtable)
result = (res == 1)
type ECDHEScheme* = EcCurveKind
proc ephemeral*(
scheme: ECDHEScheme,
rng: var HmacDrbgContext): EcResult[EcKeyPair] =
## Generate ephemeral keys used to perform ECDHE.
var keypair: EcKeyPair
if scheme == Secp256r1:
keypair = ? EcKeyPair.random(Secp256r1, rng)
elif scheme == Secp384r1:
keypair = ? EcKeyPair.random(Secp384r1, rng)
elif scheme == Secp521r1:
keypair = ? EcKeyPair.random(Secp521r1, rng)
ok(keypair)
proc ephemeral*(
scheme: string, rng: var HmacDrbgContext): EcResult[EcKeyPair] =
## Generate ephemeral keys used to perform ECDHE using string encoding.
##
## Currently supported encoding strings are P-256, P-384, P-521, if encoding
## string is not supported P-521 key will be generated.
if scheme == "P-256":
ephemeral(Secp256r1, rng)
elif scheme == "P-384":
ephemeral(Secp384r1, rng)
elif scheme == "P-521":
ephemeral(Secp521r1, rng)
else:
ephemeral(Secp521r1, rng)

View File

@@ -0,0 +1,47 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
## `Perf <https://github.com/libp2p/specs/blob/master/perf/perf.md>`_ protocol specification
import chronos, chronicles, sequtils
import stew/endians2
import ./core, ../../stream/connection
logScope:
topics = "libp2p perf"
type PerfClient* = ref object of RootObj
proc perf*(_: typedesc[PerfClient], conn: Connection,
sizeToWrite: uint64 = 0, sizeToRead: uint64 = 0):
Future[Duration] {.async, public.} =
var
size = sizeToWrite
buf: array[PerfSize, byte]
let start = Moment.now()
trace "starting performance benchmark", conn, sizeToWrite, sizeToRead
await conn.write(toSeq(toBytesBE(sizeToRead)))
while size > 0:
let toWrite = min(size, PerfSize)
await conn.write(buf[0..<toWrite])
size -= toWrite
await conn.close()
size = sizeToRead
while size > 0:
let toRead = min(size, PerfSize)
await conn.readExactly(addr buf[0], toRead.int)
size = size - toRead
let duration = Moment.now() - start
trace "finishing performance benchmark", duration
return duration

View File

@@ -0,0 +1,14 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
## `Perf <https://github.com/libp2p/specs/blob/master/perf/perf.md>`_ protocol specification
const
PerfCodec* = "/perf/1.0.0"
PerfSize* = 65536

View File

@@ -0,0 +1,60 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
## `Perf <https://github.com/libp2p/specs/blob/master/perf/perf.md>`_ protocol specification
{.push raises: [].}
import chronos, chronicles
import stew/endians2
import ./core,
../protocol,
../../stream/connection,
../../utility
export chronicles, connection
logScope:
topics = "libp2p perf"
type Perf* = ref object of LPProtocol
proc new*(T: typedesc[Perf]): T {.public.} =
var p = T()
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var bytesRead = 0
try:
trace "Received benchmark performance check", conn
var
sizeBuffer: array[8, byte]
size: uint64
await conn.readExactly(addr sizeBuffer[0], 8)
size = uint64.fromBytesBE(sizeBuffer)
var toReadBuffer: array[PerfSize, byte]
try:
while true:
bytesRead += await conn.readOnce(addr toReadBuffer[0], PerfSize)
except CatchableError as exc:
discard
var buf: array[PerfSize, byte]
while size > 0:
let toWrite = min(size, PerfSize)
await conn.write(buf[0..<toWrite])
size -= toWrite
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in perf handler", exc = exc.msg, conn
await conn.close()
p.handler = handle
p.codec = PerfCodec
return p

View File

@@ -74,7 +74,9 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
behaviourPenaltyWeight: -1.0,
behaviourPenaltyDecay: 0.999,
disconnectBadPeers: false,
enablePX: false
enablePX: false,
bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps
iwantTimeout: 3 * GossipSubHeartbeatInterval
)
proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
@@ -400,6 +402,9 @@ method rpcHandler*(g: GossipSub,
let
msgId = msgIdResult.get
msgIdSalted = msgId & g.seenSalt
g.outstandingIWANTs.withValue(msgId, iwantRequest):
if iwantRequest.peer.peerId == peer.peerId:
g.outstandingIWANTs.del(msgId)
# addSeen adds salt to msgId to avoid
# remote attacking the hash function
@@ -521,14 +526,16 @@ method publish*(g: GossipSub,
# With flood publishing enabled, the mesh is used when propagating messages from other peers,
# but a peer's own messages will always be published to all known peers in the topic, limited
# to the amount of peers we can send it to in one heartbeat
let
bandwidth = 12_500_000 div 1000 # 100 Mbps or 12.5 MBps TODO replace with bandwidth estimate
msToTransmit = max(data.len div bandwidth, 1)
maxPeersToFlod =
max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow)
var maxPeersToFlodOpt: Opt[int64]
if g.parameters.bandwidthEstimatebps > 0:
let
bandwidth = (g.parameters.bandwidthEstimatebps) div 8 div 1000 # Divisions are to convert it to Bytes per ms TODO replace with bandwidth estimate
msToTransmit = max(data.len div bandwidth, 1)
maxPeersToFlodOpt = Opt.some(max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow))
for peer in g.gossipsub.getOrDefault(topic):
if peers.len >= maxPeersToFlod: break
maxPeersToFlodOpt.withValue(maxPeersToFlod):
if peers.len >= maxPeersToFlod: break
if peer.score >= g.parameters.publishThreshold:
trace "publish: including flood/high score peer", peer
peers.incl(peer)

View File

@@ -253,7 +253,8 @@ proc handleIHave*(g: GossipSub,
if not g.hasSeen(msgId):
if peer.iHaveBudget <= 0:
break
elif msgId notin res.messageIds:
elif msgId notin res.messageIds and msgId notin g.outstandingIWANTs:
g.outstandingIWANTs[msgId] = IWANTRequest(messageId: msgId, peer: peer, timestamp: Moment.now())
res.messageIds.add(msgId)
dec peer.iHaveBudget
trace "requested message via ihave", messageID=msgId
@@ -299,6 +300,17 @@ proc handleIWant*(g: GossipSub,
messages.add(msg)
return messages
proc checkIWANTTimeouts(g: GossipSub, timeoutDuration: Duration) {.raises: [].} =
let currentTime = Moment.now()
var idsToRemove = newSeq[MessageId]()
for msgId, request in g.outstandingIWANTs.pairs():
if currentTime - request.timestamp > timeoutDuration:
trace "IWANT request timed out", messageID=msgId, peer=request.peer
request.peer.behaviourPenalty += 0.1
idsToRemove.add(msgId)
for msgId in idsToRemove:
g.outstandingIWANTs.del(msgId)
proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} =
libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics)
libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics)
@@ -704,3 +716,5 @@ proc heartbeat*(g: GossipSub) {.async.} =
for trigger in g.heartbeatEvents:
trace "firing heartbeat event", instance = cast[int](g)
trigger.fire()
checkIWANTTimeouts(g, g.parameters.iwantTimeout)

View File

@@ -142,6 +142,9 @@ type
disconnectBadPeers*: bool
enablePX*: bool
bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely
iwantTimeout*: Duration
BackoffTable* = Table[string, Table[PeerId, Moment]]
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]
@@ -175,6 +178,7 @@ type
routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange
heartbeatEvents*: seq[AsyncEvent]
outstandingIWANTs*: Table[MessageId, IWANTRequest]
MeshMetrics* = object
# scratch buffers for metrics
@@ -185,3 +189,8 @@ type
lowPeersTopics*: int64 # npeers < dlow
healthyPeersTopics*: int64 # npeers >= dlow
underDoutTopics*: int64
IWANTRequest* = object
messageId*: MessageId
peer*: PubSubPeer
timestamp*: Moment

View File

@@ -85,6 +85,9 @@ type
TopicHandler* {.public.} = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe, raises: [].}
ValidationStrategy* {.pure, public.} = enum
Parallel, Sequential
ValidatorHandler* {.public.} = proc(topic: string,
message: Message): Future[ValidationResult] {.gcsafe, raises: [].}
@@ -109,7 +112,8 @@ type
triggerSelf*: bool ## trigger own local handler on publish
verifySignature*: bool ## enable signature verification
sign*: bool ## enable message signing
validators*: Table[string, HashSet[ValidatorHandler]]
validators*: Table[string, OrderedSet[ValidatorHandler]]
validationStrategy*: ValidationStrategy
observers: ref seq[PubSubObserver] # ref as in smart_ptr
msgIdProvider*: MsgIdProvider ## Turn message into message id (not nil)
msgSeqno*: uint64
@@ -500,7 +504,7 @@ method addValidator*(p: PubSub,
## `Ignore` or `Reject` (which can descore the peer)
for t in topic:
trace "adding validator for topic", topicId = t
p.validators.mgetOrPut(t, HashSet[ValidatorHandler]()).incl(hook)
p.validators.mgetOrPut(t, OrderedSet[ValidatorHandler]()).incl(hook)
method removeValidator*(p: PubSub,
topic: varargs[string],
@@ -513,26 +517,37 @@ method removeValidator*(p: PubSub,
method validate*(p: PubSub, message: Message): Future[ValidationResult] {.async, base.} =
var pending: seq[Future[ValidationResult]]
trace "about to validate message"
for topic in message.topicIds:
trace "looking for validators on topic", topicId = topic,
registered = toSeq(p.validators.keys)
if topic in p.validators:
trace "running validators for topic", topicId = topic
for validator in p.validators[topic]:
pending.add(validator(topic, message))
result = ValidationResult.Accept
let futs = await allFinished(pending)
for fut in futs:
if fut.failed:
result = ValidationResult.Reject
break
let res = fut.read()
if res != ValidationResult.Accept:
result = res
if res == ValidationResult.Reject:
trace "about to validate message"
block outer:
for topic in message.topicIds:
trace "looking for validators on topic", topicId = topic,
registered = toSeq(p.validators.keys)
if topic in p.validators:
trace "running validators for topic", topicId = topic
for validator in p.validators[topic]:
case p.validationStrategy
of ValidationStrategy.Parallel:
pending.add(validator(topic, message))
of ValidationStrategy.Sequential:
let validatorRes = await validator(topic, message)
# early break on first Reject/Ignore
if validatorRes != ValidationResult.Accept:
result = validatorRes
break outer
if p.validationStrategy == ValidationStrategy.Parallel:
let futs = await allFinished(pending)
for fut in futs:
if fut.failed:
result = ValidationResult.Reject
break
let res = fut.read()
if res != ValidationResult.Accept:
result = res
if res == ValidationResult.Reject:
break
case result
of ValidationResult.Accept:
@@ -549,6 +564,7 @@ proc init*[PubParams: object | bool](
anonymize: bool = false,
verifySignature: bool = true,
sign: bool = true,
validationStrategy: ValidationStrategy = ValidationStrategy.Parallel,
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
subscriptionValidator: SubscriptionValidator = nil,
maxMessageSize: int = 1024 * 1024,
@@ -563,6 +579,7 @@ proc init*[PubParams: object | bool](
anonymize: anonymize,
verifySignature: verifySignature,
sign: sign,
validationStrategy: validationStrategy,
msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator,
maxMessageSize: maxMessageSize,
@@ -575,6 +592,7 @@ proc init*[PubParams: object | bool](
anonymize: anonymize,
verifySignature: verifySignature,
sign: sign,
validationStrategy: validationStrategy,
msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator,
parameters: parameters,

View File

@@ -108,7 +108,7 @@ type
flags: set[ServerFlags]
handshakeTimeout: Duration
factories: seq[ExtFactory]
rng: Rng
rng: ref HmacDrbgContext
proc secure*(self: WsTransport): bool =
not (isNil(self.tlsPrivateKey) or isNil(self.tlsCertificate))
@@ -327,7 +327,7 @@ proc new*(
tlsFlags: set[TLSFlags] = {},
flags: set[ServerFlags] = {},
factories: openArray[ExtFactory] = [],
rng: Rng = nil,
rng: ref HmacDrbgContext = nil,
handshakeTimeout = DefaultHeadersTimeout): T {.public.} =
## Creates a secure WebSocket transport
@@ -346,7 +346,7 @@ proc new*(
upgrade: Upgrade,
flags: set[ServerFlags] = {},
factories: openArray[ExtFactory] = [],
rng: Rng = nil,
rng: ref HmacDrbgContext = nil,
handshakeTimeout = DefaultHeadersTimeout): T {.public.} =
## Creates a clear-text WebSocket transport

View File

@@ -1,160 +0,0 @@
import stew/endians2, stew/byteutils, tables, strutils, os
import ../libp2p, ../libp2p/protocols/pubsub/rpc/messages
import ../libp2p/muxers/mplex/lpchannel, ../libp2p/protocols/ping
import chronos
import sequtils, hashes, math, metrics
from times import getTime, toUnix, fromUnix, `-`, initTime, `$`, inMilliseconds
from nativesockets import getHostname
const chunks = 1
proc msgIdProvider(m: Message): Result[MessageId, ValidationResult] =
return ok(($m.data.hash).toBytes())
proc main {.async.} =
let
hostname = getHostname()
myId = parseInt(hostname[4..^1])
#publisherCount = client.param(int, "publisher_count")
publisherCount = 10
isPublisher = myId <= publisherCount
#isAttacker = (not isPublisher) and myId - publisherCount <= client.param(int, "attacker_count")
isAttacker = false
rng = libp2p.newRng()
#randCountry = rng.rand(distribCumSummed[^1])
#country = distribCumSummed.find(distribCumSummed.filterIt(it >= randCountry)[0])
let
address = initTAddress("0.0.0.0:5000")
switch =
SwitchBuilder
.new()
.withAddress(MultiAddress.init(address).tryGet())
.withRng(rng)
#.withYamux()
.withMplex()
.withMaxConnections(10000)
.withTcpTransport(flags = {ServerFlags.TcpNoDelay})
#.withPlainText()
.withNoise()
.build()
gossipSub = GossipSub.init(
switch = switch,
# triggerSelf = true,
msgIdProvider = msgIdProvider,
verifySignature = false,
anonymize = true,
)
pingProtocol = Ping.new(rng=rng)
gossipSub.parameters.floodPublish = false
#gossipSub.parameters.lazyPushThreshold = 1_000_000_000
#gossipSub.parameters.lazyPushThreshold = 0
gossipSub.parameters.opportunisticGraftThreshold = -10000
gossipSub.parameters.heartbeatInterval = 700.milliseconds
gossipSub.parameters.pruneBackoff = 3.seconds
gossipSub.parameters.gossipFactor = 0.05
gossipSub.parameters.d = 8
gossipSub.parameters.dLow = 6
gossipSub.parameters.dHigh = 12
gossipSub.parameters.dScore = 6
gossipSub.parameters.dOut = 6 div 2
gossipSub.parameters.dLazy = 6
gossipSub.topicParams["test"] = TopicParams(
topicWeight: 1,
firstMessageDeliveriesWeight: 1,
firstMessageDeliveriesCap: 30,
firstMessageDeliveriesDecay: 0.9
)
var messagesChunks: CountTable[uint64]
proc messageHandler(topic: string, data: seq[byte]) {.async.} =
let sentUint = uint64.fromBytesLE(data)
# warm-up
if sentUint < 1000000: return
#if isAttacker: return
messagesChunks.inc(sentUint)
if messagesChunks[sentUint] < chunks: return
let
sentMoment = nanoseconds(int64(uint64.fromBytesLE(data)))
sentNanosecs = nanoseconds(sentMoment - seconds(sentMoment.seconds))
sentDate = initTime(sentMoment.seconds, sentNanosecs)
diff = getTime() - sentDate
echo sentUint, " milliseconds: ", diff.inMilliseconds()
var
startOfTest: Moment
attackAfter = 10000.hours
proc messageValidator(topic: string, msg: Message): Future[ValidationResult] {.async.} =
if isAttacker and Moment.now - startOfTest >= attackAfter:
return ValidationResult.Ignore
return ValidationResult.Accept
gossipSub.subscribe("test", messageHandler)
gossipSub.addValidator(["test"], messageValidator)
switch.mount(gossipSub)
switch.mount(pingProtocol)
await switch.start()
#TODO
#defer: await switch.stop()
echo "Listening on ", switch.peerInfo.addrs
echo myId, ", ", isPublisher, ", ", switch.peerInfo.peerId
var peersInfo = toSeq(1..parseInt(getEnv("PEERS")))
rng.shuffle(peersInfo)
proc pinger(peerId: PeerId) {.async.} =
try:
await sleepAsync(20.seconds)
while true:
let stream = await switch.dial(peerId, PingCodec)
let delay = await pingProtocol.ping(stream)
await stream.close()
#echo delay
await sleepAsync(delay)
except:
echo "Failed to ping"
let connectTo = parseInt(getEnv("CONNECTTO"))
var connected = 0
for peerInfo in peersInfo:
if connected >= connectTo: break
let tAddress = "peer" & $peerInfo & ":5000"
echo tAddress
let addrs = resolveTAddress(tAddress).mapIt(MultiAddress.init(it).tryGet())
try:
let peerId = await switch.connect(addrs[0], allowUnknownPeerId=true).wait(5.seconds)
#asyncSpawn pinger(peerId)
connected.inc()
except CatchableError as exc:
echo "Failed to dial", exc.msg
#let
# maxMessageDelay = client.param(int, "max_message_delay")
# warmupMessages = client.param(int, "warmup_messages")
#startOfTest = Moment.now() + milliseconds(warmupMessages * maxMessageDelay div 2)
await sleepAsync(10.seconds)
echo "Mesh size: ", gossipSub.mesh.getOrDefault("test").len
for msg in 0 ..< 10:#client.param(int, "message_count"):
await sleepAsync(12.seconds)
if msg mod publisherCount == myId - 1:
#if myId == 1:
let
now = getTime()
nowInt = seconds(now.toUnix()) + nanoseconds(times.nanosecond(now))
#var nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](500_000 div chunks)
var nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](50)
#echo "sending ", uint64(nowInt.nanoseconds)
for chunk in 0..<chunks:
nowBytes[10] = byte(chunk)
doAssert((await gossipSub.publish("test", nowBytes)) > 0)
#echo "BW: ", libp2p_protocols_bytes.value(labelValues=["/meshsub/1.1.0", "in"]) + libp2p_protocols_bytes.value(labelValues=["/meshsub/1.1.0", "out"])
#echo "DUPS: ", libp2p_gossipsub_duplicate.value(), " / ", libp2p_gossipsub_received.value()
waitFor(main())

View File

@@ -1,3 +0,0 @@
#!/bin/sh
nim c -d:chronicles_colors=None --threads:on -d:metrics -d:libp2p_network_protocols_metrics -d:release main && rm -rf shadow.data/ && shadow shadow.yaml && grep -rne 'milliseconds\|BW' shadow.data/ > latencies

File diff suppressed because it is too large Load Diff

View File

@@ -727,3 +727,101 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "two IHAVEs should generate only one IWANT":
let gossipSub = TestGossipSub.init(newStandardSwitch())
var iwantCount = 0
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
check false
proc handler2(topic: string, data: seq[byte]) {.async.} = discard
let topic = "foobar"
var conns = newSeq[Connection]()
gossipSub.subscribe(topic, handler2)
# Setup two connections and two peers
var ihaveMessageId: string
var firstPeer: PubSubPeer
let seqno = @[0'u8, 1, 2, 3]
for i in 0..<2:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
if isNil(firstPeer):
firstPeer = peer
ihaveMessageId = byteutils.toHex(seqno) & $firstPeer.peerId
peer.handler = handler
# Simulate that each peer sends an IHAVE message to our node
let msg = ControlIHave(
topicID: topic,
messageIDs: @[ihaveMessageId.toBytes()]
)
let iwants = gossipSub.handleIHave(peer, @[msg])
if iwants.messageIds.len > 0:
iwantCount += 1
# Verify that our node responds with only one IWANT message
check: iwantCount == 1
check: gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes())
# Simulate that our node receives the RPCMsg in response to the IWANT
let actualMessageData = "Hello, World!".toBytes
let rpcMsg = RPCMsg(
messages: @[Message(
fromPeer: firstPeer.peerId,
seqno: seqno,
data: actualMessageData
)]
)
await gossipSub.rpcHandler(firstPeer, rpcMsg)
check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes())
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "handle unanswered IWANT messages":
let gossipSub = TestGossipSub.init(newStandardSwitch())
gossipSub.parameters.heartbeatInterval = 50.milliseconds
gossipSub.parameters.iwantTimeout = 10.milliseconds
await gossipSub.start()
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard
proc handler2(topic: string, data: seq[byte]) {.async.} = discard
let topic = "foobar"
var conns = newSeq[Connection]()
gossipSub.subscribe(topic, handler2)
# Setup a connection and a peer
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler
# Simulate that the peer sends an IHAVE message to our node
let ihaveMessageId = @[0'u8, 1, 2, 3]
let ihaveMsg = ControlIHave(
topicID: topic,
messageIDs: @[ihaveMessageId]
)
discard gossipSub.handleIHave(peer, @[ihaveMsg])
check: gossipSub.outstandingIWANTs.contains(ihaveMessageId)
check: peer.behaviourPenalty == 0.0
await sleepAsync(60.milliseconds)
check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId)
check: peer.behaviourPenalty == 0.1
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()

View File

@@ -135,6 +135,137 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub with multiple validators (Sequential): early return":
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check false # if we get here, it should fail
let
nodes = generateNodes(2, gossip = true, validationStrategy = ValidationStrategy.Sequential)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
await subscribeNodes(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
await waitSubGraph(nodes, "foobar")
let gossip1 = GossipSub(nodes[0])
let gossip2 = GossipSub(nodes[1])
# define 3 validators executed sequentially
# validator1(Rejects)->validator2(Accepts)->validator3(Accepts)
var validatorFut1 = newFuture[bool]() # Reject
var validatorFut2 = newFuture[bool]() # Accept
var validatorFut3 = newFuture[bool]() # Accept
proc validator1(topic: string,
message: Message): Future[ValidationResult] {.async.} =
result = ValidationResult.Reject
validatorFut1.complete(true)
proc validator2(topic: string,
message: Message): Future[ValidationResult] {.async.} =
result = ValidationResult.Accept
validatorFut2.complete(true)
proc validator3(topic: string,
message: Message): Future[ValidationResult] {.async.} =
result = ValidationResult.Accept
validatorFut3.complete(true)
nodes[1].addValidator("foobar", validator1)
nodes[1].addValidator("foobar", validator2)
nodes[1].addValidator("foobar", validator3)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
# first failed, so the rest are not executed
check (await validatorFut1.withTimeout(chronos.seconds(1))) == true
check (await validatorFut2.withTimeout(chronos.seconds(1))) == false
check (await validatorFut3.withTimeout(chronos.seconds(1))) == false
# handler above verifies the message was not received
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub with multiple validators (Sequential): all valid":
var messageFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
# if we complete the future, means the message arrives
messageFut.complete(true)
let
nodes = generateNodes(2, gossip = true, validationStrategy = ValidationStrategy.Sequential)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
await subscribeNodes(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
await waitSubGraph(nodes, "foobar")
let gossip1 = GossipSub(nodes[0])
let gossip2 = GossipSub(nodes[1])
# define 3 validators that are executed sequentially
# validator1(Accept)->validator2(Accept)->validator3(Accept)
var validatorFut1 = newFuture[bool]() # Accept
var validatorFut2 = newFuture[bool]() # Accept
var validatorFut3 = newFuture[bool]() # Accept
proc validator1(topic: string,
message: Message): Future[ValidationResult] {.async.} =
result = ValidationResult.Accept
validatorFut1.complete(true)
proc validator2(topic: string,
message: Message): Future[ValidationResult] {.async.} =
result = ValidationResult.Accept
validatorFut2.complete(true)
proc validator3(topic: string,
message: Message): Future[ValidationResult] {.async.} =
result = ValidationResult.Accept
validatorFut3.complete(true)
nodes[1].addValidator("foobar", validator1)
nodes[1].addValidator("foobar", validator2)
nodes[1].addValidator("foobar", validator3)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
# all validators were executed
check (await validatorFut1.withTimeout(chronos.seconds(1))) == true
check (await validatorFut2.withTimeout(chronos.seconds(1))) == true
check (await validatorFut3.withTimeout(chronos.seconds(1))) == true
# message was indeed received by the subscriber
check (await messageFut.withTimeout(chronos.seconds(1))) == true
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub validation should fail (ignore)":
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check false # if we get here, it should fail
@@ -636,27 +767,31 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - GossipSub floodPublish limit":
var passed: Future[bool] = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
let
nodes = generateNodes(
20,
gossip = true)
# Helper procedures to avoid repetition
proc setupNodes(count: int): seq[PubSub] =
generateNodes(count, gossip = true)
proc startNodes(nodes: seq[PubSub]) {.async.} =
await allFuturesThrowing(
nodes.mapIt(it.switch.start())
)
var gossip1: GossipSub = GossipSub(nodes[0])
gossip1.parameters.floodPublish = true
gossip1.parameters.heartbeatInterval = milliseconds(700)
proc stopNodes(nodes: seq[PubSub]) {.async.} =
await allFuturesThrowing(
nodes.mapIt(it.switch.stop())
)
for node in nodes[1..^1]:
proc connectNodes(nodes: seq[PubSub], target: PubSub) {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
for node in nodes:
node.subscribe("foobar", handler)
await node.switch.connect(nodes[0].peerInfo.peerId, nodes[0].peerInfo.addrs)
await node.switch.connect(target.peerInfo.peerId, target.peerInfo.addrs)
proc baseTestProcedure(nodes: seq[PubSub], gossip1: GossipSub, numPeersFirstMsg: int, numPeersSecondMsg: int) {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
block setup:
for i in 0..<50:
@@ -665,20 +800,45 @@ suite "GossipSub":
await sleepAsync(10.milliseconds)
check false
check (await nodes[0].publish("foobar", newSeq[byte](2_500_000))) == gossip1.parameters.dLow
check (await nodes[0].publish("foobar", newSeq[byte](500_001))) == 17
check (await nodes[0].publish("foobar", newSeq[byte](2_500_000))) == numPeersFirstMsg
check (await nodes[0].publish("foobar", newSeq[byte](500_001))) == numPeersSecondMsg
# Now try with a mesh
gossip1.subscribe("foobar", handler)
checkExpiring: gossip1.mesh.peers("foobar") > 5
# use a different length so that the message is not equal to the last
check (await nodes[0].publish("foobar", newSeq[byte](500_000))) == 17
check (await nodes[0].publish("foobar", newSeq[byte](500_000))) == numPeersSecondMsg
await allFuturesThrowing(
nodes.mapIt(it.switch.stop())
)
# Actual tests
asyncTest "e2e - GossipSub floodPublish limit":
let
nodes = setupNodes(20)
gossip1 = GossipSub(nodes[0])
gossip1.parameters.floodPublish = true
gossip1.parameters.heartbeatInterval = milliseconds(700)
await startNodes(nodes)
await connectNodes(nodes[1..^1], nodes[0])
await baseTestProcedure(nodes, gossip1, gossip1.parameters.dLow, 17)
await stopNodes(nodes)
asyncTest "e2e - GossipSub floodPublish limit with bandwidthEstimatebps = 0":
let
nodes = setupNodes(20)
gossip1 = GossipSub(nodes[0])
gossip1.parameters.floodPublish = true
gossip1.parameters.heartbeatInterval = milliseconds(700)
gossip1.parameters.bandwidthEstimatebps = 0
await startNodes(nodes)
await connectNodes(nodes[1..^1], nodes[0])
await baseTestProcedure(nodes, gossip1, nodes.len - 1, nodes.len - 1)
await stopNodes(nodes)
asyncTest "e2e - GossipSub with multiple peers":
var runs = 10

View File

@@ -41,6 +41,7 @@ proc generateNodes*(
verifySignature: bool = libp2p_pubsub_verify,
anonymize: bool = libp2p_pubsub_anonymize,
sign: bool = libp2p_pubsub_sign,
validationStrategy: ValidationStrategy = ValidationStrategy.Parallel,
sendSignedPeerRecord = false,
unsubscribeBackoff = 1.seconds,
maxMessageSize: int = 1024 * 1024,
@@ -54,6 +55,7 @@ proc generateNodes*(
triggerSelf = triggerSelf,
verifySignature = verifySignature,
sign = sign,
validationStrategy = validationStrategy,
msgIdProvider = msgIdProvider,
anonymize = anonymize,
maxMessageSize = maxMessageSize,

View File

@@ -11,6 +11,7 @@
## Test vectors was made using Go implementation
## https://github.com/libp2p/go-libp2p-crypto/blob/master/key.go
from std/strutils import toUpper
import unittest2
import bearssl/hash
import nimcrypto/utils
@@ -382,6 +383,31 @@ suite "Key interface test suite":
toHex(checkseckey) == stripSpaces(PrivateKeys[i])
toHex(checkpubkey) == stripSpaces(PublicKeys[i])
test "Spec test vectors":
# https://github.com/libp2p/specs/pull/537
const keys = [
(private: "08031279307702010104203E5B1FE9712E6C314942A750BD67485DE3C1EFE85B1BFB520AE8F9AE3DFA4A4CA00A06082A8648CE3D030107A14403420004DE3D300FA36AE0E8F5D530899D83ABAB44ABF3161F162A4BC901D8E6ECDA020E8B6D5F8DA30525E71D6851510C098E5C47C646A597FB4DCEC034E9F77C409E62",
public: "0803125b3059301306072a8648ce3d020106082a8648ce3d03010703420004de3d300fa36ae0e8f5d530899d83abab44abf3161f162a4bc901d8e6ecda020e8b6d5f8da30525e71d6851510c098e5c47c646a597fb4dcec034e9f77c409e62"),
(private: "080112407e0830617c4a7de83925dfb2694556b12936c477a0e1feb2e148ec9da60fee7d1ed1e8fae2c4a144b8be8fd4b47bf3d3b34b871c3cacf6010f0e42d474fce27e",
public: "080112201ed1e8fae2c4a144b8be8fd4b47bf3d3b34b871c3cacf6010f0e42d474fce27e"),
(private: "0802122053DADF1D5A164D6B4ACDB15E24AA4C5B1D3461BDBD42ABEDB0A4404D56CED8FB",
public: "08021221037777e994e452c21604f91de093ce415f5432f701dd8cd1a7a6fea0e630bfca99"),
(private: "080012ae123082092a0201000282020100e1beab071d08200bde24eef00d049449b07770ff9910257b2d7d5dda242ce8f0e2f12e1af4b32d9efd2c090f66b0f29986dbb645dae9880089704a94e5066d594162ae6ee8892e6ec70701db0a6c445c04778eb3de1293aa1a23c3825b85c6620a2bc3f82f9b0c309bc0ab3aeb1873282bebd3da03c33e76c21e9beb172fd44c9e43be32e2c99827033cf8d0f0c606f4579326c930eb4e854395ad941256542c793902185153c474bed109d6ff5141ebf9cd256cf58893a37f83729f97e7cb435ec679d2e33901d27bb35aa0d7e20561da08885ef0abbf8e2fb48d6a5487047a9ecb1ad41fa7ed84f6e3e8ecd5d98b3982d2a901b4454991766da295ab78822add5612a2df83bcee814cf50973e80d7ef38111b1bd87da2ae92438a2c8cbcc70b31ee319939a3b9c761dbc13b5c086d6b64bf7ae7dacc14622375d92a8ff9af7eb962162bbddebf90acb32adb5e4e4029f1c96019949ecfbfeffd7ac1e3fbcc6b6168c34be3d5a2e5999fcbb39bba7adbca78eab09b9bc39f7fa4b93411f4cc175e70c0a083e96bfaefb04a9580b4753c1738a6a760ae1afd851a1a4bdad231cf56e9284d832483df215a46c1c21bdf0c6cfe951c18f1ee4078c79c13d63edb6e14feaeffabc90ad317e4875fe648101b0864097e998f0ca3025ef9638cd2b0caecd3770ab54a1d9c6ca959b0f5dcbc90caeefc4135baca6fd475224269bbe1b02030100010282020100a472ffa858efd8588ce59ee264b957452f3673acdf5631d7bfd5ba0ef59779c231b0bc838a8b14cae367b6d9ef572c03c7883b0a3c652f5c24c316b1ccfd979f13d0cd7da20c7d34d9ec32dfdc81ee7292167e706d705efde5b8f3edfcba41409e642f8897357df5d320d21c43b33600a7ae4e505db957c1afbc189d73f0b5d972d9aaaeeb232ca20eebd5de6fe7f29d01470354413cc9a0af1154b7af7c1029adcd67c74b4798afeb69e09f2cb387305e73a1b5f450202d54f0ef096fe1bde340219a1194d1ac9026e90b366cce0c59b239d10e4888f52ca1780824d39ae01a6b9f4dd6059191a7f12b2a3d8db3c2868cd4e5a5862b8b625a4197d52c6ac77710116ebd3ced81c4d91ad5fdfbed68312ebce7eea45c1833ca3acf7da2052820eacf5c6b07d086dabeb893391c71417fd8a4b1829ae2cf60d1749d0e25da19530d889461c21da3492a8dc6ccac7de83ac1c2185262c7473c8cc42f547cc9864b02a8073b6aa54a037d8c0de3914784e6205e83d97918b944f11b877b12084c0dd1d36592f8a4f8b8da5bb404c3d2c079b22b6ceabfbcb637c0dbe0201f0909d533f8bf308ada47aee641a012a494d31b54c974e58b87f140258258bb82f31692659db7aa07e17a5b2a0832c24e122d3a8babcc9ee74cbb07d3058bb85b15f6f6b2674aba9fd34367be9782d444335fbed31e3c4086c652597c27104938b47fa10282010100e9fdf843c1550070ca711cb8ff28411466198f0e212511c3186623890c0071bf6561219682fe7dbdfd81176eba7c4faba21614a20721e0fcd63768e6d925688ecc90992059ac89256e0524de90bf3d8a052ce6a9f6adafa712f3107a016e20c80255c9e37d8206d1bc327e06e66eb24288da866b55904fd8b59e6b2ab31bc5eab47e597093c63fab7872102d57b4c589c66077f534a61f5f65127459a33c91f6db61fc431b1ae90be92b4149a3255291baf94304e3efb77b1107b5a3bda911359c40a53c347ff9100baf8f36dc5cd991066b5bdc28b39ed644f404afe9213f4d31c9d4e40f3a5f5e3c39bebeb244e84137544e1a1839c1c8aaebf0c78a7fad590282010100f6fa1f1e6b803742d5490b7441152f500970f46feb0b73a6e4baba2aaf3c0e245ed852fc31d86a8e46eb48e90fac409989dfee45238f97e8f1f8e83a136488c1b04b8a7fb695f37b8616307ff8a8d63e8cfa0b4fb9b9167ffaebabf111aa5a4344afbabd002ae8961c38c02da76a9149abdde93eb389eb32595c29ba30d8283a7885218a5a9d33f7f01dbdf85f3aad016c071395491338ec318d39220e1c7bd69d3d6b520a13a30d745c102b827ad9984b0dd6aed73916ffa82a06c1c111e7047dcd2668f988a0570a71474992eecf416e068f029ec323d5d635fd24694fc9bf96973c255d26c772a95bf8b7f876547a5beabf86f06cd21b67994f944e7a5493028201010095b02fd30069e547426a8bea58e8a2816f33688dac6c6f6974415af8402244a22133baedf34ce499d7036f3f19b38eb00897c18949b0c5a25953c71aeeccfc8f6594173157cc854bd98f16dffe8f28ca13b77eb43a2730585c49fc3f608cd811bb54b03b84bddaa8ef910988567f783012266199667a546a18fd88271fbf63a45ae4fd4884706da8befb9117c0a4d73de5172f8640b1091ed8a4aea3ed4641463f5ff6a5e3401ad7d0c92811f87956d1fd5f9a1d15c7f3839a08698d9f35f9d966e5000f7cb2655d7b6c4adcd8a9d950ea5f61bb7c9a33c17508f9baa313eecfee4ae493249ebe05a5d7770bbd3551b2eeb752e3649e0636de08e3d672e66cb90282010100ad93e4c31072b063fc5ab5fe22afacece775c795d0efdf7c704cfc027bde0d626a7646fc905bb5a80117e3ca49059af14e0160089f9190065be9bfecf12c3b2145b211c8e89e42dd91c38e9aa23ca73697063564f6f6aa6590088a738722df056004d18d7bccac62b3bafef6172fc2a4b071ea37f31eff7a076bcab7dd144e51a9da8754219352aef2c73478971539fa41de4759285ea626fa3c72e7085be47d554d915bbb5149cb6ef835351f231043049cd941506a034bf2f8767f3e1e42ead92f91cb3d75549b57ef7d56ac39c2d80d67f6a2b4ca192974bfc5060e2dd171217971002193dba12e7e4133ab201f07500a90495a38610279b13a48d54f0c99028201003e3a1ac0c2b67d54ed5c4bbe04a7db99103659d33a4f9d35809e1f60c282e5988dddc964527f3b05e6cc890eab3dcb571d66debf3a5527704c87264b3954d7265f4e8d2c637dd89b491b9cf23f264801f804b90454d65af0c4c830d1aef76f597ef61b26ca857ecce9cb78d4f6c2218c00d2975d46c2b013fbf59b750c3b92d8d3ed9e6d1fd0ef1ec091a5c286a3fe2dead292f40f380065731e2079ebb9f2a7ef2c415ecbb488da98f3a12609ca1b6ec8c734032c8bd513292ff842c375d4acd1b02dfb206b24cd815f8e2f9d4af8e7dea0370b19c1b23cc531d78b40e06e1119ee2e08f6f31c6e2e8444c568d13c5d451a291ae0c9f1d4f27d23b3a00d60ad",
public: "080012a60430820222300d06092a864886f70d01010105000382020f003082020a0282020100e1beab071d08200bde24eef00d049449b07770ff9910257b2d7d5dda242ce8f0e2f12e1af4b32d9efd2c090f66b0f29986dbb645dae9880089704a94e5066d594162ae6ee8892e6ec70701db0a6c445c04778eb3de1293aa1a23c3825b85c6620a2bc3f82f9b0c309bc0ab3aeb1873282bebd3da03c33e76c21e9beb172fd44c9e43be32e2c99827033cf8d0f0c606f4579326c930eb4e854395ad941256542c793902185153c474bed109d6ff5141ebf9cd256cf58893a37f83729f97e7cb435ec679d2e33901d27bb35aa0d7e20561da08885ef0abbf8e2fb48d6a5487047a9ecb1ad41fa7ed84f6e3e8ecd5d98b3982d2a901b4454991766da295ab78822add5612a2df83bcee814cf50973e80d7ef38111b1bd87da2ae92438a2c8cbcc70b31ee319939a3b9c761dbc13b5c086d6b64bf7ae7dacc14622375d92a8ff9af7eb962162bbddebf90acb32adb5e4e4029f1c96019949ecfbfeffd7ac1e3fbcc6b6168c34be3d5a2e5999fcbb39bba7adbca78eab09b9bc39f7fa4b93411f4cc175e70c0a083e96bfaefb04a9580b4753c1738a6a760ae1afd851a1a4bdad231cf56e9284d832483df215a46c1c21bdf0c6cfe951c18f1ee4078c79c13d63edb6e14feaeffabc90ad317e4875fe648101b0864097e998f0ca3025ef9638cd2b0caecd3770ab54a1d9c6ca959b0f5dcbc90caeefc4135baca6fd475224269bbe1b0203010001"),
]
for (private, public) in keys:
var seckey = PrivateKey.init(fromHex(private)).expect("private key")
var pubkey = PublicKey.init(fromHex(public)).expect("public key")
var calckey = seckey.getPublicKey().expect("public key")
check:
pubkey == calckey
var checkseckey = seckey.getBytes().expect("private key")
var checkpubkey = pubkey.getBytes().expect("public key")
check:
toHex(checkseckey) == stripSpaces(private).toUpper()
toHex(checkpubkey) == stripSpaces(public).toUpper()
test "Generate/Sign/Serialize/Deserialize/Verify test":
var msg = "message to sign"
var bmsg = cast[seq[byte]](msg)